-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.go
157 lines (135 loc) · 4.17 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"github.com/cilium/ebpf/rlimit"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"github.com/kubescape/kapprofiler/pkg/collector"
"github.com/kubescape/kapprofiler/pkg/controller"
"github.com/kubescape/kapprofiler/pkg/eventsink"
"github.com/kubescape/kapprofiler/pkg/tracing"
)
// Global variables
var NodeName string
var k8sConfig *rest.Config
func checkKubernetesConnection() (*rest.Config, error) {
// Check if the Kubernetes cluster is reachable
// Load the Kubernetes configuration from the default location
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
config, err = rest.InClusterConfig()
if err != nil {
return nil, err
}
}
// Create a Kubernetes client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Printf("Failed to create Kubernetes client: %v\n", err)
return nil, err
}
// Send a request to the API server to check if it's reachable
_, err = clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Printf("Failed to communicate with Kubernetes API server: %v\n", err)
return nil, err
}
return config, nil
}
func serviceInitNChecks() error {
// Raise the rlimit for memlock to the maximum allowed (eBPF needs it)
if err := rlimit.RemoveMemlock(); err != nil {
return err
}
// Check Kubernetes cluster connection
config, err := checkKubernetesConnection()
if err != nil {
return err
}
k8sConfig = config
// Get Node name from environment variable
if nodeName := os.Getenv("NODE_NAME"); nodeName == "" {
return fmt.Errorf("NODE_NAME environment variable not set")
} else {
NodeName = nodeName
}
return nil
}
func main() {
// Initialize the service
if err := serviceInitNChecks(); err != nil {
log.Fatalf("Failed to initialize service: %v\n", err)
}
// Create the event sink
eventSink, err := eventsink.NewEventSink("", true)
if err != nil {
log.Fatalf("Failed to create event sink: %v\n", err)
}
// Start the event sink
if err := eventSink.Start(); err != nil {
log.Fatalf("Failed to start event sink: %v\n", err)
}
defer eventSink.Stop()
// Create the tracer
tracer := tracing.NewTracer(NodeName, k8sConfig, []tracing.EventSink{eventSink}, false)
// Start the collector manager
ignoreMounts := false
if os.Getenv("OPEN_IGNORE_MOUNTS") == "true" {
ignoreMounts = true
}
ignorePrefixes := []string{}
if os.Getenv("OPEN_IGNORE_PREFIXES") != "" {
ignorePrefixes = strings.Split(os.Getenv("OPEN_IGNORE_PREFIXES"), ",")
}
storeNamespace := ""
if os.Getenv("STORE_NAMESPACE") != "" {
storeNamespace = os.Getenv("STORE_NAMESPACE")
}
collectorManagerConfig := &collector.CollectorManagerConfig{
EventSink: eventSink,
Tracer: tracer,
Interval: 60, // 60 seconds for now, TODO: make it configurable
FinalizeTime: 80, // 0 seconds to disable finalization
FinalizeJitter: 10, // 0 seconds to disable finalization jitter
K8sConfig: k8sConfig,
RecordStrategy: collector.RecordStrategyOnlyIfNotExists,
NodeName: NodeName,
IgnoreMounts: ignoreMounts,
IgnorePrefixes: ignorePrefixes,
StoreNamespace: storeNamespace,
OnError: func(err error) {
log.Fatalf("Error in collector manager watcher: %v", err)
},
}
cm, err := collector.StartCollectorManager(collectorManagerConfig)
if err != nil {
log.Fatalf("Failed to start collector manager: %v\n", err)
}
defer cm.StopCollectorManager()
// Start the service
if err := tracer.Start(); err != nil {
log.Fatalf("Failed to start service: %v\n", err)
}
defer tracer.Stop()
// Start AppProfile controller
appProfileController := controller.NewController(k8sConfig, storeNamespace, func(err error) {
log.Fatalf("AppProfile controller failed: %v\n", err)
})
appProfileController.StartController()
defer appProfileController.StopController()
// Wait for shutdown signal
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
<-shutdown
log.Println("Shutting down...")
// Exit with success
os.Exit(0)
}