1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package collector
17
18 import (
19 "context"
20 "os"
21
22 v1 "k8s.io/api/core/v1"
23 "k8s.io/apimachinery/pkg/runtime"
24 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
25 ctrl "sigs.k8s.io/controller-runtime"
26 "sigs.k8s.io/controller-runtime/pkg/client"
27
28 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
29 "github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
30 config "github.com/chaos-mesh/chaos-mesh/pkg/config/dashboard"
31 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
32 )
33
34 var (
35 scheme = runtime.NewScheme()
36 log = ctrl.Log.WithName("collector")
37 )
38
39 func init() {
40 _ = clientgoscheme.AddToScheme(scheme)
41
42 _ = v1alpha1.AddToScheme(scheme)
43 }
44
45
46 type Server struct {
47 Manager ctrl.Manager
48 }
49
50
51 func NewServer(
52 conf *config.ChaosDashboardConfig,
53 experimentArchive core.ExperimentStore,
54 scheduleArchive core.ScheduleStore,
55 event core.EventStore,
56 workflowStore core.WorkflowStore,
57 ) (*Server, client.Client, client.Reader, *runtime.Scheme) {
58 s := &Server{}
59
60
61 options := ctrl.Options{
62 Scheme: scheme,
63 MetricsBindAddress: conf.MetricAddress,
64 LeaderElection: conf.EnableLeaderElection,
65 Port: 9443,
66 }
67 if conf.ClusterScoped {
68 log.Info("Chaos controller manager is running in cluster scoped mode.")
69 } else {
70 log.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", conf.TargetNamespace)
71 options.Namespace = conf.TargetNamespace
72 }
73
74 var err error
75
76 cfg := ctrl.GetConfigOrDie()
77 s.Manager, err = ctrl.NewManager(cfg, options)
78 if err != nil {
79 log.Error(err, "unable to start collector")
80 os.Exit(1)
81 }
82
83 if conf.SecurityMode {
84 clientpool.K8sClients, err = clientpool.NewClientPool(cfg, scheme, 100)
85 if err != nil {
86
87 log.Error(err, "fail to create client pool")
88 os.Exit(1)
89 }
90 } else {
91 clientpool.K8sClients, err = clientpool.NewLocalClient(cfg, scheme)
92 if err != nil {
93 log.Error(err, "fail to create client pool")
94 os.Exit(1)
95 }
96 }
97
98 for kind, chaosKind := range v1alpha1.AllKinds() {
99 if err = (&ChaosCollector{
100 Client: s.Manager.GetClient(),
101 Log: ctrl.Log.WithName("collector").WithName(kind),
102 archive: experimentArchive,
103 event: event,
104 }).Setup(s.Manager, chaosKind.SpawnObject()); err != nil {
105 log.Error(err, "unable to create collector", "collector", kind)
106 os.Exit(1)
107 }
108 }
109
110 if err = (&ScheduleCollector{
111 Client: s.Manager.GetClient(),
112 Log: ctrl.Log.WithName("schedule-collector").WithName(v1alpha1.KindSchedule),
113 archive: scheduleArchive,
114 }).Setup(s.Manager, &v1alpha1.Schedule{}); err != nil {
115 log.Error(err, "unable to create collector", "collector", v1alpha1.KindSchedule)
116 os.Exit(1)
117 }
118
119 if err = (&EventCollector{
120 Client: s.Manager.GetClient(),
121 Log: ctrl.Log.WithName("event-collector").WithName("Event"),
122 event: event,
123 }).Setup(s.Manager, &v1.Event{}); err != nil {
124 log.Error(err, "unable to create collector", "collector", v1alpha1.KindSchedule)
125 os.Exit(1)
126 }
127
128 if err = (&WorkflowCollector{
129 kubeClient: s.Manager.GetClient(),
130 Log: ctrl.Log.WithName("workflow-collector").WithName(v1alpha1.KindWorkflow),
131 store: workflowStore,
132 }).Setup(s.Manager, &v1alpha1.Workflow{}); err != nil {
133 log.Error(err, "unable to create collector", "collector", v1alpha1.KindWorkflow)
134 os.Exit(1)
135 }
136
137 return s, s.Manager.GetClient(), s.Manager.GetAPIReader(), s.Manager.GetScheme()
138 }
139
140
141 func Register(ctx context.Context, s *Server) {
142 go func() {
143 log.Info("Starting collector")
144 if err := s.Manager.Start(ctx); err != nil {
145 log.Error(err, "could not start collector")
146 os.Exit(1)
147 }
148 }()
149 }
150