...
1
2
3
4
5
6
7
8
9
10
11
12
13
14 package collector
15
16 import (
17 "os"
18
19 "k8s.io/apimachinery/pkg/runtime"
20 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
21 ctrl "sigs.k8s.io/controller-runtime"
22 "sigs.k8s.io/controller-runtime/pkg/client"
23
24 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
25 "github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
26 "github.com/chaos-mesh/chaos-mesh/pkg/config"
27 "github.com/chaos-mesh/chaos-mesh/pkg/core"
28 )
29
30 var (
31 scheme = runtime.NewScheme()
32 log = ctrl.Log.WithName("collector")
33 )
34
35 func init() {
36 _ = clientgoscheme.AddToScheme(scheme)
37
38 _ = v1alpha1.AddToScheme(scheme)
39 }
40
41
42 type Server struct {
43 Manager ctrl.Manager
44 }
45
46
47 func NewServer(
48 conf *config.ChaosDashboardConfig,
49 archive core.ExperimentStore,
50 event core.EventStore,
51 ) (*Server, client.Client, client.Reader, *runtime.Scheme) {
52 s := &Server{}
53
54
55 options := ctrl.Options{
56 Scheme: scheme,
57 MetricsBindAddress: conf.MetricAddress,
58 LeaderElection: conf.EnableLeaderElection,
59 Port: 9443,
60 }
61 if conf.ClusterScoped {
62 log.Info("Chaos controller manager is running in cluster scoped mode.")
63 } else {
64 log.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", conf.TargetNamespace)
65 options.Namespace = conf.TargetNamespace
66 }
67
68 var err error
69
70 cfg := ctrl.GetConfigOrDie()
71 s.Manager, err = ctrl.NewManager(cfg, options)
72 if err != nil {
73 log.Error(err, "unable to start collector")
74 os.Exit(1)
75 }
76
77 if conf.SecurityMode {
78 clientpool.K8sClients, err = clientpool.NewClientPool(cfg, scheme, 100)
79 if err != nil {
80
81 log.Error(err, "fail to create client pool")
82 os.Exit(1)
83 }
84 } else {
85 clientpool.K8sClients, err = clientpool.NewLocalClient(cfg, scheme)
86 if err != nil {
87 log.Error(err, "fail to create client pool")
88 os.Exit(1)
89 }
90 }
91
92 for kind, chaosKind := range v1alpha1.AllKinds() {
93 if err = (&ChaosCollector{
94 Client: s.Manager.GetClient(),
95 Log: ctrl.Log.WithName("collector").WithName(kind),
96 archive: archive,
97 event: event,
98 }).Setup(s.Manager, chaosKind.Chaos); err != nil {
99 log.Error(err, "unable to create collector", "collector", kind)
100 os.Exit(1)
101 }
102 }
103
104 return s, s.Manager.GetClient(), s.Manager.GetAPIReader(), s.Manager.GetScheme()
105 }
106
107
108 func Register(s *Server, controllerRuntimeStopCh <-chan struct{}) {
109 go func() {
110 log.Info("Starting collector")
111 if err := s.Manager.Start(controllerRuntimeStopCh); err != nil {
112 log.Error(err, "could not start collector")
113 os.Exit(1)
114 }
115 }()
116 }
117