1
2
3
4
5
6
7
8
9
10
11
12
13
14 package main
15
16 import (
17 "flag"
18 "net/http"
19 _ "net/http/pprof"
20 "os"
21 "time"
22
23 "github.com/go-logr/logr"
24 "go.uber.org/fx"
25 "golang.org/x/time/rate"
26 authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
27 _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
28 "k8s.io/client-go/util/workqueue"
29 ctrl "sigs.k8s.io/controller-runtime"
30 "sigs.k8s.io/controller-runtime/pkg/log/zap"
31 controllermetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
32 "sigs.k8s.io/controller-runtime/pkg/webhook"
33
34 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
35 apiWebhook "github.com/chaos-mesh/chaos-mesh/api/webhook"
36 "github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager/provider"
37 "github.com/chaos-mesh/chaos-mesh/controllers"
38 ccfg "github.com/chaos-mesh/chaos-mesh/controllers/config"
39 "github.com/chaos-mesh/chaos-mesh/controllers/metrics"
40 "github.com/chaos-mesh/chaos-mesh/controllers/types"
41 grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
42 "github.com/chaos-mesh/chaos-mesh/pkg/selector"
43 "github.com/chaos-mesh/chaos-mesh/pkg/version"
44 "github.com/chaos-mesh/chaos-mesh/pkg/webhook/config"
45 "github.com/chaos-mesh/chaos-mesh/pkg/webhook/config/watcher"
46 )
47
48 var (
49 printVersion bool
50 setupLog = ctrl.Log.WithName("setup")
51 )
52
53 func parseFlags() {
54 flag.BoolVar(&printVersion, "version", false, "print version information and exit")
55 flag.Parse()
56 }
57
58 func main() {
59 parseFlags()
60 version.PrintVersionInfo("Controller manager")
61 if printVersion {
62 os.Exit(0)
63 }
64
65
66 grpcUtils.RPCTimeout = ccfg.ControllerCfg.RPCTimeout
67 ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
68
69 app := fx.New(
70 fx.Options(
71 provider.Module,
72 controllers.Module,
73 selector.Module,
74 types.ChaosObjects,
75 ),
76 fx.Invoke(Run),
77 )
78
79 app.Run()
80 }
81
82 type RunParams struct {
83 fx.In
84
85 Mgr ctrl.Manager
86 Logger logr.Logger
87 AuthCli *authorizationv1.AuthorizationV1Client
88
89 Controllers []types.Controller `group:"controller"`
90 Objs []types.Object `group:"objs"`
91 }
92
93 func Run(params RunParams) error {
94 mgr := params.Mgr
95 authCli := params.AuthCli
96
97 var err error
98 for _, obj := range params.Objs {
99 err = ctrl.NewWebhookManagedBy(mgr).
100 For(obj.Object).
101 Complete()
102 if err != nil {
103 return err
104 }
105 }
106
107
108 err = ctrl.NewWebhookManagedBy(mgr).
109 For(&v1alpha1.Schedule{}).
110 Complete()
111 if err != nil {
112 return err
113 }
114
115
116 err = ctrl.NewWebhookManagedBy(mgr).
117 For(&v1alpha1.Workflow{}).
118 Complete()
119 if err != nil {
120 return err
121 }
122
123
124 metricsCollector := metrics.NewChaosCollector(mgr.GetCache(), controllermetrics.Registry)
125
126 setupLog.Info("Setting up webhook server")
127 hookServer := mgr.GetWebhookServer()
128 hookServer.CertDir = ccfg.ControllerCfg.CertsDir
129 conf := config.NewConfigWatcherConf()
130
131 stopCh := ctrl.SetupSignalHandler()
132
133 if ccfg.ControllerCfg.PprofAddr != "0" {
134 go func() {
135 if err := http.ListenAndServe(ccfg.ControllerCfg.PprofAddr, nil); err != nil {
136 setupLog.Error(err, "unable to start pprof server")
137 os.Exit(1)
138 }
139 }()
140 }
141
142 if err = ccfg.ControllerCfg.WatcherConfig.Verify(); err != nil {
143 setupLog.Error(err, "invalid environment configuration")
144 os.Exit(1)
145 }
146 configWatcher, err := watcher.New(*ccfg.ControllerCfg.WatcherConfig, metricsCollector)
147 if err != nil {
148 setupLog.Error(err, "unable to create config watcher")
149 os.Exit(1)
150 }
151
152 go watchConfig(configWatcher, conf, stopCh)
153 hookServer.Register("/inject-v1-pod", &webhook.Admission{
154 Handler: &apiWebhook.PodInjector{
155 Config: conf,
156 ControllerCfg: ccfg.ControllerCfg,
157 Metrics: metricsCollector,
158 }},
159 )
160 hookServer.Register("/validate-auth", &webhook.Admission{
161 Handler: apiWebhook.NewAuthValidator(ccfg.ControllerCfg.SecurityMode, authCli,
162 ccfg.ControllerCfg.ClusterScoped, ccfg.ControllerCfg.TargetNamespace, ccfg.ControllerCfg.EnableFilterNamespace),
163 },
164 )
165
166 setupLog.Info("Starting manager")
167 if err := mgr.Start(stopCh); err != nil {
168 setupLog.Error(err, "unable to start manager")
169 os.Exit(1)
170 }
171
172 return nil
173 }
174
175 func setupWatchQueue(stopCh <-chan struct{}, configWatcher *watcher.K8sConfigMapWatcher) workqueue.Interface {
176
177
178 sigChan := make(chan interface{}, 10)
179
180 queue := workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(0.5), 1)})
181
182 go func() {
183 for {
184 select {
185 case <-stopCh:
186 queue.ShutDown()
187 return
188 case <-sigChan:
189 queue.AddRateLimited(struct{}{})
190 }
191 }
192 }()
193
194 go func() {
195 for {
196 setupLog.Info("Launching watcher for ConfigMaps")
197 if err := configWatcher.Watch(sigChan, stopCh); err != nil {
198 switch err {
199 case watcher.ErrWatchChannelClosed:
200
201 setupLog.Info("watcher channel has closed, restart watcher")
202 default:
203 setupLog.Error(err, "unable to watch new ConfigMaps")
204 os.Exit(1)
205 }
206 }
207
208 select {
209 case <-stopCh:
210 close(sigChan)
211 return
212 default:
213
214 time.Sleep(2 * time.Second)
215 }
216 }
217 }()
218
219 return queue
220 }
221
222 func watchConfig(configWatcher *watcher.K8sConfigMapWatcher, cfg *config.Config, stopCh <-chan struct{}) {
223 queue := setupWatchQueue(stopCh, configWatcher)
224
225 for {
226 item, shutdown := queue.Get()
227 if shutdown {
228 break
229 }
230 func() {
231 defer queue.Done(item)
232
233 setupLog.Info("Triggering ConfigMap reconciliation")
234 updatedInjectionConfigs, err := configWatcher.GetInjectionConfigs()
235 if err != nil {
236 setupLog.Error(err, "unable to get ConfigMaps")
237 return
238 }
239
240 setupLog.Info("Updating server with newly loaded configurations",
241 "original configs count", len(cfg.Injections), "updated configs count", len(updatedInjectionConfigs))
242 cfg.ReplaceInjectionConfigs(updatedInjectionConfigs)
243 setupLog.Info("Configuration replaced")
244 }()
245 }
246 }
247