1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package collector
17
18 import (
19 "context"
20 "net"
21 "os"
22 "strconv"
23
24 "github.com/go-logr/logr"
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/runtime"
27 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
28 "k8s.io/client-go/rest"
29 ctrl "sigs.k8s.io/controller-runtime"
30 "sigs.k8s.io/controller-runtime/pkg/cache"
31 "sigs.k8s.io/controller-runtime/pkg/client"
32 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
33 "sigs.k8s.io/controller-runtime/pkg/webhook"
34
35 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
36 "github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
37 config "github.com/chaos-mesh/chaos-mesh/pkg/config"
38 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
39 )
40
41 var (
42 scheme = runtime.NewScheme()
43 )
44
45 func init() {
46 _ = clientgoscheme.AddToScheme(scheme)
47
48 _ = v1alpha1.AddToScheme(scheme)
49 }
50
51
52 type Server struct {
53 Manager ctrl.Manager
54 logger logr.Logger
55 }
56
57
58 func NewServer(
59 conf *config.ChaosDashboardConfig,
60 experimentArchive core.ExperimentStore,
61 scheduleArchive core.ScheduleStore,
62 event core.EventStore,
63 workflowStore core.WorkflowStore,
64 logger logr.Logger,
65 ) (*Server, client.Client, client.Reader, *runtime.Scheme) {
66 s := &Server{logger: logger}
67
68
69 options := ctrl.Options{
70 Scheme: scheme,
71 Metrics: metricsserver.Options{
72 BindAddress: net.JoinHostPort(conf.MetricHost, strconv.Itoa(conf.MetricPort)),
73 },
74 LeaderElection: conf.EnableLeaderElection,
75 WebhookServer: webhook.NewServer(webhook.Options{
76 Port: 9443,
77 }),
78 }
79 if conf.ClusterScoped {
80 logger.Info("Chaos controller manager is running in cluster scoped mode.")
81 } else {
82 logger.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", conf.TargetNamespace)
83 options.NewCache = func(cfg *rest.Config, opts cache.Options) (cache.Cache, error) {
84 opts.DefaultNamespaces = map[string]cache.Config{
85 conf.TargetNamespace: {},
86 }
87 return cache.New(cfg, opts)
88 }
89 }
90
91 var err error
92
93 cfg := ctrl.GetConfigOrDie()
94
95 if conf.QPS > 0 {
96 cfg.QPS = conf.QPS
97 cfg.Burst = conf.Burst
98 }
99
100 s.Manager, err = ctrl.NewManager(cfg, options)
101 if err != nil {
102 logger.Error(err, "unable to start collector")
103 os.Exit(1)
104 }
105
106 if conf.SecurityMode {
107 clientpool.K8sClients, err = clientpool.NewClientPool(cfg, scheme, 100)
108 if err != nil {
109
110 logger.Error(err, "fail to create client pool")
111 os.Exit(1)
112 }
113 } else {
114 clientpool.K8sClients, err = clientpool.NewLocalClient(cfg, scheme)
115 if err != nil {
116 logger.Error(err, "fail to create client pool")
117 os.Exit(1)
118 }
119 }
120
121 for kind, chaosKind := range v1alpha1.AllKinds() {
122 if err = (&ChaosCollector{
123 Client: s.Manager.GetClient(),
124 Log: logger.WithName(kind),
125 archive: experimentArchive,
126 event: event,
127 }).Setup(s.Manager, chaosKind.SpawnObject()); err != nil {
128 logger.Error(err, "unable to create collector", "collector", kind)
129 os.Exit(1)
130 }
131 }
132
133 if err = (&ScheduleCollector{
134 Client: s.Manager.GetClient(),
135 Log: logger.WithName("schedule-collector").WithName(v1alpha1.KindSchedule),
136 archive: scheduleArchive,
137 }).Setup(s.Manager, &v1alpha1.Schedule{}); err != nil {
138 logger.Error(err, "unable to create collector", "collector", v1alpha1.KindSchedule)
139 os.Exit(1)
140 }
141
142 if err = (&EventCollector{
143 Client: s.Manager.GetClient(),
144 Log: logger.WithName("event-collector").WithName("Event"),
145 event: event,
146 }).Setup(s.Manager, &v1.Event{}); err != nil {
147 logger.Error(err, "unable to create collector", "collector", v1alpha1.KindSchedule)
148 os.Exit(1)
149 }
150
151 if err = (&WorkflowCollector{
152 kubeClient: s.Manager.GetClient(),
153 Log: logger.WithName("workflow-collector").WithName(v1alpha1.KindWorkflow),
154 store: workflowStore,
155 }).Setup(s.Manager, &v1alpha1.Workflow{}); err != nil {
156 logger.Error(err, "unable to create collector", "collector", v1alpha1.KindWorkflow)
157 os.Exit(1)
158 }
159
160 return s, s.Manager.GetClient(), s.Manager.GetAPIReader(), s.Manager.GetScheme()
161 }
162
163
164 func Register(ctx context.Context, s *Server) {
165 go func() {
166 s.logger.Info("Starting collector")
167 if err := s.Manager.Start(ctx); err != nil {
168 s.logger.Error(err, "could not start collector")
169 os.Exit(1)
170 }
171 }()
172 }
173