1
2
3
4
5
6
7
8
9
10
11
12
13
14 package watcher
15
16 import (
17 "errors"
18 "fmt"
19 "html/template"
20 "io/ioutil"
21 "os"
22 "strings"
23
24 "github.com/ghodss/yaml"
25
26 "github.com/chaos-mesh/chaos-mesh/controllers/metrics"
27 "github.com/chaos-mesh/chaos-mesh/pkg/webhook/config"
28
29 ctrl "sigs.k8s.io/controller-runtime"
30
31 apierrs "k8s.io/apimachinery/pkg/api/errors"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/labels"
34 "k8s.io/apimachinery/pkg/watch"
35 "k8s.io/client-go/kubernetes"
36 k8sv1 "k8s.io/client-go/kubernetes/typed/core/v1"
37 ctrlconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
38 )
39
40 var log = ctrl.Log.WithName("inject-webhook")
41 var restClusterConfig = ctrlconfig.GetConfig
42 var kubernetesNewForConfig = kubernetes.NewForConfig
43
44 const (
45 serviceAccountNamespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
46 templateItemKey = "data"
47 )
48
49
50 var ErrWatchChannelClosed = errors.New("watcher channel has closed")
51
52
53 type K8sConfigMapWatcher struct {
54 Config
55 client k8sv1.CoreV1Interface
56 metrics *metrics.ChaosCollector
57 }
58
59
60 func New(cfg Config, metrics *metrics.ChaosCollector) (*K8sConfigMapWatcher, error) {
61 c := K8sConfigMapWatcher{Config: cfg, metrics: metrics}
62 if strings.TrimSpace(c.TemplateNamespace) == "" {
63
64
65 nsBytes, err := ioutil.ReadFile(serviceAccountNamespaceFilePath)
66 if err != nil {
67 if os.IsNotExist(err) {
68 return nil, fmt.Errorf("%s: maybe you should specify ----template-namespace if you are running outside of kubernetes", err.Error())
69 }
70 return nil, err
71 }
72 ns := strings.TrimSpace(string(nsBytes))
73 if ns != "" {
74 c.TemplateNamespace = ns
75 log.Info("Inferred ConfigMap",
76 "template namespace", c.TemplateNamespace, "filepath", serviceAccountNamespaceFilePath)
77 } else {
78 return nil, errors.New("can not found namespace. maybe you should specify --template-namespace if you are running outside of kubernetes")
79 }
80 }
81
82 log.Info("Creating Kubernetes client to talk to the api-server")
83 k8sConfig, err := restClusterConfig()
84 if err != nil {
85 return nil, err
86 }
87
88 clientset, err := kubernetesNewForConfig(k8sConfig)
89 if err != nil {
90 return nil, err
91 }
92
93 c.client = clientset.CoreV1()
94 if err = validate(&c); err != nil {
95 return nil, fmt.Errorf("validation failed for K8sConfigMapWatcher: %s", err.Error())
96 }
97 log.Info("Created ConfigMap watcher",
98 "apiserver", k8sConfig.Host, "template namespaces", c.TemplateNamespace,
99 "template labels", c.TemplateLabels, "config labels", c.ConfigLabels)
100 return &c, nil
101 }
102
103 func validate(c *K8sConfigMapWatcher) error {
104 if c == nil {
105 return errors.New("configmap watcher was nil")
106 }
107 if c.TemplateNamespace == "" {
108 return errors.New("namespace is empty")
109 }
110 if c.TemplateLabels == nil {
111 return errors.New("template labels was an uninitialized map")
112 }
113 if c.ConfigLabels == nil {
114 return errors.New("config labels was an uninitialized map")
115 }
116 if c.client == nil {
117 return errors.New("k8s client was not setup properly")
118 }
119 return nil
120 }
121
122
123 func (c *K8sConfigMapWatcher) Watch(notifyMe chan<- interface{}, stopCh <-chan struct{}) error {
124 log.Info("Watching for ConfigMaps for changes",
125 "template namespace", c.TemplateNamespace, "labels", c.ConfigLabels)
126 templateWatcher, err := c.client.ConfigMaps(c.TemplateNamespace).Watch(metav1.ListOptions{
127 LabelSelector: mapStringStringToLabelSelector(c.TemplateLabels),
128 })
129 if err != nil {
130 return fmt.Errorf("unable to create template watcher (possible serviceaccount RBAC/ACL failure?): %s", err.Error())
131 }
132
133 targetNamespace := ""
134 if !c.Config.ClusterScoped {
135 targetNamespace = c.TargetNamespace
136 }
137
138 configWatcher, err := c.client.ConfigMaps(targetNamespace).Watch(metav1.ListOptions{
139 LabelSelector: mapStringStringToLabelSelector(c.ConfigLabels),
140 })
141 if err != nil {
142 return fmt.Errorf("unable to create config watcher (possible serviceaccount RBAC/ACL failure?): %s", err.Error())
143 }
144 defer func() {
145 configWatcher.Stop()
146 templateWatcher.Stop()
147 }()
148 for {
149 select {
150 case e, ok := <-templateWatcher.ResultChan():
151
152
153 if !ok {
154 log.V(5).Info("channel has closed, will restart watcher")
155 return ErrWatchChannelClosed
156 }
157 if e.Type == watch.Error {
158 return apierrs.FromObject(e.Object)
159 }
160 log.V(3).Info("type", e.Type, "kind", e.Object.GetObjectKind())
161 switch e.Type {
162 case watch.Added:
163 fallthrough
164 case watch.Modified:
165 fallthrough
166 case watch.Deleted:
167
168 log.V(3).Info("Signalling event received from watch channel",
169 "type", e.Type, "kind", e.Object.GetObjectKind())
170 notifyMe <- struct{}{}
171 default:
172 log.Error(nil, "got unsupported event! skipping", "type", e.Type, "kind", e.Object.GetObjectKind())
173 }
174 case e, ok := <-configWatcher.ResultChan():
175
176
177 if !ok {
178 log.V(5).Info("channel has closed, will restart watcher")
179 return ErrWatchChannelClosed
180 }
181 if e.Type == watch.Error {
182 return apierrs.FromObject(e.Object)
183 }
184 log.V(3).Info("type", e.Type, "kind", e.Object.GetObjectKind())
185 switch e.Type {
186 case watch.Added:
187 fallthrough
188 case watch.Modified:
189 fallthrough
190 case watch.Deleted:
191
192 log.V(3).Info("Signalling event received from watch channel",
193 "type", e.Type, "kind", e.Object.GetObjectKind())
194 notifyMe <- struct{}{}
195 default:
196 log.Error(nil, "got unsupported event! skipping", "type", e.Type, "kind", e.Object.GetObjectKind())
197 }
198
199 case <-stopCh:
200 log.V(2).Info("Stopping configmap watcher, context indicated we are done")
201
202 return nil
203 }
204 }
205 }
206
207 func mapStringStringToLabelSelector(m map[string]string) string {
208
209 return labels.Set(m).String()
210 }
211
212
213 func (c *K8sConfigMapWatcher) GetInjectionConfigs() (map[string][]*config.InjectionConfig, error) {
214 templates, err := c.GetTemplates()
215 if err != nil {
216 return nil, err
217 }
218
219 configs, err := c.GetConfigs()
220 if err != nil {
221 return nil, err
222 }
223 if len(templates) == 0 || len(configs) == 0 {
224 log.Info("cannot get injection configs")
225 return nil, nil
226 }
227
228 injectionConfigs := make(map[string][]*config.InjectionConfig)
229 if c.metrics != nil {
230 c.metrics.InjectionConfigs.Reset()
231 }
232 for _, conf := range configs {
233 temp, ok := templates[conf.Template]
234 if !ok {
235 log.Error(errors.New("cannot find the specified template"), "",
236 "template", conf.Template, "namespace", conf.Namespace, "config", conf.Name)
237 if c.metrics != nil {
238 c.metrics.TemplateNotExist.WithLabelValues(conf.Namespace, conf.Template).Inc()
239 }
240 continue
241 }
242 yamlTemp, err := template.New("").Parse(temp)
243 if err != nil {
244 log.Error(err, "failed to parse template",
245 "template", conf.Template, "config", conf.Name)
246 continue
247 }
248
249 result, err := renderTemplateWithArgs(yamlTemp, conf.Arguments)
250 if err != nil {
251 log.Error(err, "failed to render template",
252 "template", conf.Template, "config", conf.Name)
253 continue
254 }
255
256 var injectConfig config.InjectionConfig
257 if err := yaml.Unmarshal(result, &injectConfig); err != nil {
258 log.Error(err, "failed to unmarshal injection config", "injection config", string(result))
259 continue
260 }
261
262 injectConfig.Selector = conf.Selector
263 injectConfig.Name = conf.Name
264 if _, ok := injectionConfigs[conf.Namespace]; !ok {
265 injectionConfigs[conf.Namespace] = make([]*config.InjectionConfig, 0)
266 }
267 injectionConfigs[conf.Namespace] = append(injectionConfigs[conf.Namespace], &injectConfig)
268 if c.metrics != nil {
269 c.metrics.InjectionConfigs.WithLabelValues(conf.Namespace, conf.Template).Inc()
270 }
271 }
272
273 return injectionConfigs, nil
274 }
275
276
277 func (c *K8sConfigMapWatcher) GetTemplates() (map[string]string, error) {
278 log.Info("Fetching Template Configs...")
279 templateList, err := c.client.ConfigMaps(c.TemplateNamespace).List(metav1.ListOptions{
280 LabelSelector: mapStringStringToLabelSelector(c.TemplateLabels),
281 })
282 if err != nil {
283 return nil, err
284 }
285
286 log.Info("Fetched templates", "templates count", len(templateList.Items))
287 templates := make(map[string]string, len(templateList.Items))
288 for _, temp := range templateList.Items {
289 templates[temp.Name] = temp.Data[templateItemKey]
290 }
291 if c.metrics != nil {
292 c.metrics.SidecarTemplates.Set(float64(len(templates)))
293 }
294 return templates, nil
295 }
296
297
298 func (c *K8sConfigMapWatcher) GetConfigs() ([]*config.TemplateArgs, error) {
299 log.Info("Fetching Configs...")
300
301 configList, err := c.client.ConfigMaps("").List(metav1.ListOptions{
302 LabelSelector: mapStringStringToLabelSelector(c.ConfigLabels),
303 })
304 if err != nil {
305 return nil, err
306 }
307
308 log.Info("Fetched configs", "configs count", len(configList.Items))
309 if c.metrics != nil {
310 c.metrics.ConfigTemplates.Reset()
311 }
312 configSet := make(map[string]map[string]struct{}, 0)
313 result := make([]*config.TemplateArgs, 0)
314 for _, item := range configList.Items {
315 for _, payload := range item.Data {
316 conf, err := config.LoadTemplateArgs(strings.NewReader(payload))
317 if err != nil {
318 log.Error(err, "failed to load template args", "payload", payload)
319 if c.metrics != nil {
320 c.metrics.TemplateLoadError.Inc()
321 }
322 continue
323 }
324 conf.Namespace = item.Namespace
325 if _, ok := configSet[conf.Namespace]; !ok {
326 configSet[conf.Namespace] = make(map[string]struct{})
327 }
328 if _, ok := configSet[conf.Namespace][conf.Name]; ok {
329 log.Error(errors.New("duplicate config name"), "",
330 "namespace", conf.Namespace, "name", conf.Name)
331 if c.metrics != nil {
332 c.metrics.ConfigNameDuplicate.WithLabelValues(conf.Namespace, conf.Name).Inc()
333 }
334 continue
335 }
336 configSet[conf.Namespace][conf.Name] = struct{}{}
337 if c.metrics != nil {
338 c.metrics.ConfigTemplates.WithLabelValues(conf.Namespace, conf.Template).Inc()
339 }
340 result = append(result, conf)
341 }
342 }
343 return result, nil
344 }
345