1
2
3
4
5
6
7
8
9
10
11
12
13
14 package main
15
16 import (
17 "flag"
18 "net/http"
19 _ "net/http/pprof"
20 "os"
21 "time"
22
23 "golang.org/x/time/rate"
24
25 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
26 apiWebhook "github.com/chaos-mesh/chaos-mesh/api/webhook"
27 ccfg "github.com/chaos-mesh/chaos-mesh/controllers/config"
28 "github.com/chaos-mesh/chaos-mesh/controllers/metrics"
29 "github.com/chaos-mesh/chaos-mesh/controllers/podiochaos"
30 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos"
31 grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
32 "github.com/chaos-mesh/chaos-mesh/pkg/router"
33 "github.com/chaos-mesh/chaos-mesh/pkg/version"
34 "github.com/chaos-mesh/chaos-mesh/pkg/webhook/config"
35 "github.com/chaos-mesh/chaos-mesh/pkg/webhook/config/watcher"
36
37 _ "github.com/chaos-mesh/chaos-mesh/controllers/awschaos/detachvolume"
38 _ "github.com/chaos-mesh/chaos-mesh/controllers/awschaos/ec2restart"
39 _ "github.com/chaos-mesh/chaos-mesh/controllers/awschaos/ec2stop"
40 _ "github.com/chaos-mesh/chaos-mesh/controllers/dnschaos"
41 _ "github.com/chaos-mesh/chaos-mesh/controllers/httpchaos"
42 _ "github.com/chaos-mesh/chaos-mesh/controllers/iochaos"
43 _ "github.com/chaos-mesh/chaos-mesh/controllers/jvmchaos"
44 _ "github.com/chaos-mesh/chaos-mesh/controllers/kernelchaos"
45 _ "github.com/chaos-mesh/chaos-mesh/controllers/networkchaos/partition"
46 _ "github.com/chaos-mesh/chaos-mesh/controllers/networkchaos/trafficcontrol"
47 _ "github.com/chaos-mesh/chaos-mesh/controllers/podchaos/containerkill"
48 _ "github.com/chaos-mesh/chaos-mesh/controllers/podchaos/podfailure"
49 _ "github.com/chaos-mesh/chaos-mesh/controllers/podchaos/podkill"
50 _ "github.com/chaos-mesh/chaos-mesh/controllers/stresschaos"
51 _ "github.com/chaos-mesh/chaos-mesh/controllers/timechaos"
52
53 "k8s.io/apimachinery/pkg/runtime"
54 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
55 authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
56 _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
57 "k8s.io/client-go/rest"
58 "k8s.io/client-go/util/workqueue"
59
60 ctrl "sigs.k8s.io/controller-runtime"
61 "sigs.k8s.io/controller-runtime/pkg/log/zap"
62
63 controllermetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
64 "sigs.k8s.io/controller-runtime/pkg/webhook"
65
66 )
67
68 var (
69 scheme = runtime.NewScheme()
70 setupLog = ctrl.Log.WithName("setup")
71 )
72
73 var (
74 printVersion bool
75 restConfigQPS, restConfigBurst int
76 )
77
78 func init() {
79 _ = clientgoscheme.AddToScheme(scheme)
80
81 _ = v1alpha1.AddToScheme(scheme)
82
83 }
84
85 func parseFlags() {
86 flag.BoolVar(&printVersion, "version", false, "print version information and exit")
87 flag.IntVar(&restConfigQPS, "rest-config-qps", 30, "QPS of rest config.")
88 flag.IntVar(&restConfigBurst, "rest-config-burst", 50, "burst of rest config.")
89 flag.Parse()
90 }
91
92 func main() {
93 parseFlags()
94 version.PrintVersionInfo("Controller manager")
95 if printVersion {
96 os.Exit(0)
97 }
98
99
100 grpcUtils.RPCTimeout = ccfg.ControllerCfg.RPCTimeout
101
102 ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
103
104 options := ctrl.Options{
105 Scheme: scheme,
106 MetricsBindAddress: ccfg.ControllerCfg.MetricsAddr,
107 LeaderElection: ccfg.ControllerCfg.EnableLeaderElection,
108 Port: 9443,
109 }
110
111 if ccfg.ControllerCfg.ClusterScoped {
112 setupLog.Info("Chaos controller manager is running in cluster scoped mode.")
113
114 } else {
115 setupLog.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", ccfg.ControllerCfg.TargetNamespace)
116 options.Namespace = ccfg.ControllerCfg.TargetNamespace
117 }
118
119 cfg := ctrl.GetConfigOrDie()
120 setRestConfig(cfg)
121 mgr, err := ctrl.NewManager(cfg, options)
122 if err != nil {
123 setupLog.Error(err, "unable to start manager")
124 os.Exit(1)
125 }
126
127 authCli, err := authorizationv1.NewForConfig(cfg)
128 if err != nil {
129 setupLog.Error(err, "unable to get authorization client")
130 os.Exit(1)
131 }
132
133 err = router.SetupWithManagerAndConfigs(mgr, ccfg.ControllerCfg)
134 if err != nil {
135 setupLog.Error(err, "fail to setup with manager")
136 os.Exit(1)
137 }
138
139
140
141 v1alpha1.RegisterPodIoHandler(&podiochaos.Handler{
142 Client: mgr.GetClient(),
143 Log: ctrl.Log.WithName("handler").WithName("PodIOChaos"),
144 })
145 if err = (&v1alpha1.PodIoChaos{}).SetupWebhookWithManager(mgr); err != nil {
146 setupLog.Error(err, "unable to create webhook", "webhook", "PodIOChaos")
147 os.Exit(1)
148 }
149
150
151
152 v1alpha1.RegisterRawPodNetworkHandler(&podnetworkchaos.Handler{
153 Client: mgr.GetClient(),
154 Reader: mgr.GetAPIReader(),
155 Log: ctrl.Log.WithName("handler").WithName("PodNetworkChaos"),
156 AllowHostNetworkTesting: ccfg.ControllerCfg.AllowHostNetworkTesting,
157 })
158 if err = (&v1alpha1.PodNetworkChaos{}).SetupWebhookWithManager(mgr); err != nil {
159 setupLog.Error(err, "unable to create webhook", "webhook", "PodNetworkChaos")
160 os.Exit(1)
161 }
162
163
164 metricsCollector := metrics.NewChaosCollector(mgr.GetCache(), controllermetrics.Registry)
165
166 setupLog.Info("Setting up webhook server")
167 hookServer := mgr.GetWebhookServer()
168 hookServer.CertDir = ccfg.ControllerCfg.CertsDir
169 conf := config.NewConfigWatcherConf()
170
171 stopCh := ctrl.SetupSignalHandler()
172
173 if ccfg.ControllerCfg.PprofAddr != "0" {
174 go func() {
175 if err := http.ListenAndServe(ccfg.ControllerCfg.PprofAddr, nil); err != nil {
176 setupLog.Error(err, "unable to start pprof server")
177 os.Exit(1)
178 }
179 }()
180 }
181
182 if err = ccfg.ControllerCfg.WatcherConfig.Verify(); err != nil {
183 setupLog.Error(err, "invalid environment configuration")
184 os.Exit(1)
185 }
186 configWatcher, err := watcher.New(*ccfg.ControllerCfg.WatcherConfig, metricsCollector)
187 if err != nil {
188 setupLog.Error(err, "unable to create config watcher")
189 os.Exit(1)
190 }
191
192 go watchConfig(configWatcher, conf, stopCh)
193 hookServer.Register("/inject-v1-pod", &webhook.Admission{
194 Handler: &apiWebhook.PodInjector{
195 Config: conf,
196 ControllerCfg: ccfg.ControllerCfg,
197 Metrics: metricsCollector,
198 }},
199 )
200
201 hookServer.Register("/validate-auth", &webhook.Admission{
202 Handler: apiWebhook.NewAuthValidator(ccfg.ControllerCfg.SecurityMode, mgr.GetClient(), mgr.GetAPIReader(), authCli,
203 ccfg.ControllerCfg.ClusterScoped, ccfg.ControllerCfg.TargetNamespace, ccfg.ControllerCfg.EnableFilterNamespace),
204 },
205 )
206
207
208
209 setupLog.Info("Starting manager")
210 if err := mgr.Start(stopCh); err != nil {
211 setupLog.Error(err, "unable to start manager")
212 os.Exit(1)
213 }
214 }
215
216 func setRestConfig(c *rest.Config) {
217 if restConfigQPS > 0 {
218 c.QPS = float32(restConfigQPS)
219 }
220 if restConfigBurst > 0 {
221 c.Burst = restConfigBurst
222 }
223 }
224
225 func setupWatchQueue(stopCh <-chan struct{}, configWatcher *watcher.K8sConfigMapWatcher) workqueue.Interface {
226
227
228 sigChan := make(chan interface{}, 10)
229
230 queue := workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(0.5), 1)})
231
232 go func() {
233 for {
234 select {
235 case <-stopCh:
236 queue.ShutDown()
237 return
238 case <-sigChan:
239 queue.AddRateLimited(struct{}{})
240 }
241 }
242 }()
243
244 go func() {
245 for {
246 setupLog.Info("Launching watcher for ConfigMaps")
247 if err := configWatcher.Watch(sigChan, stopCh); err != nil {
248 switch err {
249 case watcher.ErrWatchChannelClosed:
250
251 setupLog.Info("watcher channel has closed, restart watcher")
252 default:
253 setupLog.Error(err, "unable to watch new ConfigMaps")
254 os.Exit(1)
255 }
256 }
257
258 select {
259 case <-stopCh:
260 close(sigChan)
261 return
262 default:
263
264 time.Sleep(2 * time.Second)
265 }
266 }
267 }()
268
269 return queue
270 }
271
272 func watchConfig(configWatcher *watcher.K8sConfigMapWatcher, cfg *config.Config, stopCh <-chan struct{}) {
273 queue := setupWatchQueue(stopCh, configWatcher)
274
275 for {
276 item, shutdown := queue.Get()
277 if shutdown {
278 break
279 }
280 func() {
281 defer queue.Done(item)
282
283 setupLog.Info("Triggering ConfigMap reconciliation")
284 updatedInjectionConfigs, err := configWatcher.GetInjectionConfigs()
285 if err != nil {
286 setupLog.Error(err, "unable to get ConfigMaps")
287 return
288 }
289
290 setupLog.Info("Updating server with newly loaded configurations",
291 "original configs count", len(cfg.Injections), "updated configs count", len(updatedInjectionConfigs))
292 cfg.ReplaceInjectionConfigs(updatedInjectionConfigs)
293 setupLog.Info("Configuration replaced")
294 }()
295 }
296 }
297