...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/webhook/config/watcher/watcher.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/webhook/config/watcher

     1  // Copyright 2019 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package watcher
    15  
    16  import (
    17  	"errors"
    18  	"fmt"
    19  	"html/template"
    20  	"io/ioutil"
    21  	"os"
    22  	"strings"
    23  
    24  	"github.com/ghodss/yaml"
    25  
    26  	"github.com/chaos-mesh/chaos-mesh/controllers/metrics"
    27  	"github.com/chaos-mesh/chaos-mesh/pkg/webhook/config"
    28  
    29  	ctrl "sigs.k8s.io/controller-runtime"
    30  
    31  	apierrs "k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/labels"
    34  	"k8s.io/apimachinery/pkg/watch"
    35  	"k8s.io/client-go/kubernetes"
    36  	k8sv1 "k8s.io/client-go/kubernetes/typed/core/v1"
    37  	ctrlconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
    38  )
    39  
    40  var log = ctrl.Log.WithName("inject-webhook")
    41  var restClusterConfig = ctrlconfig.GetConfig
    42  var kubernetesNewForConfig = kubernetes.NewForConfig
    43  
    44  const (
    45  	serviceAccountNamespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
    46  	templateItemKey                 = "data"
    47  )
    48  
    49  // ErrWatchChannelClosed should restart watcher
    50  var ErrWatchChannelClosed = errors.New("watcher channel has closed")
    51  
    52  // K8sConfigMapWatcher is a struct that connects to the API and collects, parses, and emits sidecar configurations
    53  type K8sConfigMapWatcher struct {
    54  	Config
    55  	client  k8sv1.CoreV1Interface
    56  	metrics *metrics.ChaosCollector
    57  }
    58  
    59  // New creates a new K8sConfigMapWatcher
    60  func New(cfg Config, metrics *metrics.ChaosCollector) (*K8sConfigMapWatcher, error) {
    61  	c := K8sConfigMapWatcher{Config: cfg, metrics: metrics}
    62  	if strings.TrimSpace(c.TemplateNamespace) == "" {
    63  		// ENHANCEMENT: support downward API/env vars instead? https://github.com/kubernetes/kubernetes/blob/release-1.0/docs/user-guide/downward-api.md
    64  		// load from file on disk for serviceaccount: /var/run/secrets/kubernetes.io/serviceaccount/namespace
    65  		nsBytes, err := ioutil.ReadFile(serviceAccountNamespaceFilePath)
    66  		if err != nil {
    67  			if os.IsNotExist(err) {
    68  				return nil, fmt.Errorf("%s: maybe you should specify ----template-namespace if you are running outside of kubernetes", err.Error())
    69  			}
    70  			return nil, err
    71  		}
    72  		ns := strings.TrimSpace(string(nsBytes))
    73  		if ns != "" {
    74  			c.TemplateNamespace = ns
    75  			log.Info("Inferred ConfigMap",
    76  				"template namespace", c.TemplateNamespace, "filepath", serviceAccountNamespaceFilePath)
    77  		} else {
    78  			return nil, errors.New("can not found namespace. maybe you should specify --template-namespace if you are running outside of kubernetes")
    79  		}
    80  	}
    81  
    82  	log.Info("Creating Kubernetes client to talk to the api-server")
    83  	k8sConfig, err := restClusterConfig()
    84  	if err != nil {
    85  		return nil, err
    86  	}
    87  
    88  	clientset, err := kubernetesNewForConfig(k8sConfig)
    89  	if err != nil {
    90  		return nil, err
    91  	}
    92  
    93  	c.client = clientset.CoreV1()
    94  	if err = validate(&c); err != nil {
    95  		return nil, fmt.Errorf("validation failed for K8sConfigMapWatcher: %s", err.Error())
    96  	}
    97  	log.Info("Created ConfigMap watcher",
    98  		"apiserver", k8sConfig.Host, "template namespaces", c.TemplateNamespace,
    99  		"template labels", c.TemplateLabels, "config labels", c.ConfigLabels)
   100  	return &c, nil
   101  }
   102  
   103  func validate(c *K8sConfigMapWatcher) error {
   104  	if c == nil {
   105  		return errors.New("configmap watcher was nil")
   106  	}
   107  	if c.TemplateNamespace == "" {
   108  		return errors.New("namespace is empty")
   109  	}
   110  	if c.TemplateLabels == nil {
   111  		return errors.New("template labels was an uninitialized map")
   112  	}
   113  	if c.ConfigLabels == nil {
   114  		return errors.New("config labels was an uninitialized map")
   115  	}
   116  	if c.client == nil {
   117  		return errors.New("k8s client was not setup properly")
   118  	}
   119  	return nil
   120  }
   121  
   122  // Watch watches for events impacting watched ConfigMaps and emits their events across a channel
   123  func (c *K8sConfigMapWatcher) Watch(notifyMe chan<- interface{}, stopCh <-chan struct{}) error {
   124  	log.Info("Watching for ConfigMaps for changes",
   125  		"template namespace", c.TemplateNamespace, "labels", c.ConfigLabels)
   126  	templateWatcher, err := c.client.ConfigMaps(c.TemplateNamespace).Watch(metav1.ListOptions{
   127  		LabelSelector: mapStringStringToLabelSelector(c.TemplateLabels),
   128  	})
   129  	if err != nil {
   130  		return fmt.Errorf("unable to create template watcher (possible serviceaccount RBAC/ACL failure?): %s", err.Error())
   131  	}
   132  
   133  	targetNamespace := ""
   134  	if !c.Config.ClusterScoped {
   135  		targetNamespace = c.TargetNamespace
   136  	}
   137  
   138  	configWatcher, err := c.client.ConfigMaps(targetNamespace).Watch(metav1.ListOptions{
   139  		LabelSelector: mapStringStringToLabelSelector(c.ConfigLabels),
   140  	})
   141  	if err != nil {
   142  		return fmt.Errorf("unable to create config watcher (possible serviceaccount RBAC/ACL failure?): %s", err.Error())
   143  	}
   144  	defer func() {
   145  		configWatcher.Stop()
   146  		templateWatcher.Stop()
   147  	}()
   148  	for {
   149  		select {
   150  		case e, ok := <-templateWatcher.ResultChan():
   151  			// channel may closed caused by HTTP timeout, should restart watcher
   152  			// detail at https://github.com/kubernetes/client-go/issues/334
   153  			if !ok {
   154  				log.V(5).Info("channel has closed, will restart watcher")
   155  				return ErrWatchChannelClosed
   156  			}
   157  			if e.Type == watch.Error {
   158  				return apierrs.FromObject(e.Object)
   159  			}
   160  			log.V(3).Info("type", e.Type, "kind", e.Object.GetObjectKind())
   161  			switch e.Type {
   162  			case watch.Added:
   163  				fallthrough
   164  			case watch.Modified:
   165  				fallthrough
   166  			case watch.Deleted:
   167  				// signal reconciliation of all InjectionConfigs
   168  				log.V(3).Info("Signalling event received from watch channel",
   169  					"type", e.Type, "kind", e.Object.GetObjectKind())
   170  				notifyMe <- struct{}{}
   171  			default:
   172  				log.Error(nil, "got unsupported event! skipping", "type", e.Type, "kind", e.Object.GetObjectKind())
   173  			}
   174  		case e, ok := <-configWatcher.ResultChan():
   175  			// channel may closed caused by HTTP timeout, should restart watcher
   176  			// detail at https://github.com/kubernetes/client-go/issues/334
   177  			if !ok {
   178  				log.V(5).Info("channel has closed, will restart watcher")
   179  				return ErrWatchChannelClosed
   180  			}
   181  			if e.Type == watch.Error {
   182  				return apierrs.FromObject(e.Object)
   183  			}
   184  			log.V(3).Info("type", e.Type, "kind", e.Object.GetObjectKind())
   185  			switch e.Type {
   186  			case watch.Added:
   187  				fallthrough
   188  			case watch.Modified:
   189  				fallthrough
   190  			case watch.Deleted:
   191  				// signal reconciliation of all InjectionConfigs
   192  				log.V(3).Info("Signalling event received from watch channel",
   193  					"type", e.Type, "kind", e.Object.GetObjectKind())
   194  				notifyMe <- struct{}{}
   195  			default:
   196  				log.Error(nil, "got unsupported event! skipping", "type", e.Type, "kind", e.Object.GetObjectKind())
   197  			}
   198  			// events! yay!
   199  		case <-stopCh:
   200  			log.V(2).Info("Stopping configmap watcher, context indicated we are done")
   201  			// clean up, we cancelled the context, so stop the watch
   202  			return nil
   203  		}
   204  	}
   205  }
   206  
   207  func mapStringStringToLabelSelector(m map[string]string) string {
   208  	// https://github.com/kubernetes/apimachinery/issues/47
   209  	return labels.Set(m).String()
   210  }
   211  
   212  // GetInjectionConfigs fetches all matching ConfigMaps
   213  func (c *K8sConfigMapWatcher) GetInjectionConfigs() (map[string][]*config.InjectionConfig, error) {
   214  	templates, err := c.GetTemplates()
   215  	if err != nil {
   216  		return nil, err
   217  	}
   218  
   219  	configs, err := c.GetConfigs()
   220  	if err != nil {
   221  		return nil, err
   222  	}
   223  	if len(templates) == 0 || len(configs) == 0 {
   224  		log.Info("cannot get injection configs")
   225  		return nil, nil
   226  	}
   227  
   228  	injectionConfigs := make(map[string][]*config.InjectionConfig)
   229  	if c.metrics != nil {
   230  		c.metrics.InjectionConfigs.Reset()
   231  	}
   232  	for _, conf := range configs {
   233  		temp, ok := templates[conf.Template]
   234  		if !ok {
   235  			log.Error(errors.New("cannot find the specified template"), "",
   236  				"template", conf.Template, "namespace", conf.Namespace, "config", conf.Name)
   237  			if c.metrics != nil {
   238  				c.metrics.TemplateNotExist.WithLabelValues(conf.Namespace, conf.Template).Inc()
   239  			}
   240  			continue
   241  		}
   242  		yamlTemp, err := template.New("").Parse(temp)
   243  		if err != nil {
   244  			log.Error(err, "failed to parse template",
   245  				"template", conf.Template, "config", conf.Name)
   246  			continue
   247  		}
   248  
   249  		result, err := renderTemplateWithArgs(yamlTemp, conf.Arguments)
   250  		if err != nil {
   251  			log.Error(err, "failed to render template",
   252  				"template", conf.Template, "config", conf.Name)
   253  			continue
   254  		}
   255  
   256  		var injectConfig config.InjectionConfig
   257  		if err := yaml.Unmarshal(result, &injectConfig); err != nil {
   258  			log.Error(err, "failed to unmarshal injection config", "injection config", string(result))
   259  			continue
   260  		}
   261  
   262  		injectConfig.Selector = conf.Selector
   263  		injectConfig.Name = conf.Name
   264  		if _, ok := injectionConfigs[conf.Namespace]; !ok {
   265  			injectionConfigs[conf.Namespace] = make([]*config.InjectionConfig, 0)
   266  		}
   267  		injectionConfigs[conf.Namespace] = append(injectionConfigs[conf.Namespace], &injectConfig)
   268  		if c.metrics != nil {
   269  			c.metrics.InjectionConfigs.WithLabelValues(conf.Namespace, conf.Template).Inc()
   270  		}
   271  	}
   272  
   273  	return injectionConfigs, nil
   274  }
   275  
   276  // GetTemplates returns a map of common templates
   277  func (c *K8sConfigMapWatcher) GetTemplates() (map[string]string, error) {
   278  	log.Info("Fetching Template Configs...")
   279  	templateList, err := c.client.ConfigMaps(c.TemplateNamespace).List(metav1.ListOptions{
   280  		LabelSelector: mapStringStringToLabelSelector(c.TemplateLabels),
   281  	})
   282  	if err != nil {
   283  		return nil, err
   284  	}
   285  
   286  	log.Info("Fetched templates", "templates count", len(templateList.Items))
   287  	templates := make(map[string]string, len(templateList.Items))
   288  	for _, temp := range templateList.Items {
   289  		templates[temp.Name] = temp.Data[templateItemKey]
   290  	}
   291  	if c.metrics != nil {
   292  		c.metrics.SidecarTemplates.Set(float64(len(templates)))
   293  	}
   294  	return templates, nil
   295  }
   296  
   297  // GetConfigs returns the list of template args config
   298  func (c *K8sConfigMapWatcher) GetConfigs() ([]*config.TemplateArgs, error) {
   299  	log.Info("Fetching Configs...")
   300  	// List all the configs with the required label selector
   301  	configList, err := c.client.ConfigMaps("").List(metav1.ListOptions{
   302  		LabelSelector: mapStringStringToLabelSelector(c.ConfigLabels),
   303  	})
   304  	if err != nil {
   305  		return nil, err
   306  	}
   307  
   308  	log.Info("Fetched configs", "configs count", len(configList.Items))
   309  	if c.metrics != nil {
   310  		c.metrics.ConfigTemplates.Reset()
   311  	}
   312  	configSet := make(map[string]map[string]struct{}, 0)
   313  	result := make([]*config.TemplateArgs, 0)
   314  	for _, item := range configList.Items {
   315  		for _, payload := range item.Data {
   316  			conf, err := config.LoadTemplateArgs(strings.NewReader(payload))
   317  			if err != nil {
   318  				log.Error(err, "failed to load template args", "payload", payload)
   319  				if c.metrics != nil {
   320  					c.metrics.TemplateLoadError.Inc()
   321  				}
   322  				continue
   323  			}
   324  			conf.Namespace = item.Namespace
   325  			if _, ok := configSet[conf.Namespace]; !ok {
   326  				configSet[conf.Namespace] = make(map[string]struct{})
   327  			}
   328  			if _, ok := configSet[conf.Namespace][conf.Name]; ok {
   329  				log.Error(errors.New("duplicate config name"), "",
   330  					"namespace", conf.Namespace, "name", conf.Name)
   331  				if c.metrics != nil {
   332  					c.metrics.ConfigNameDuplicate.WithLabelValues(conf.Namespace, conf.Name).Inc()
   333  				}
   334  				continue
   335  			}
   336  			configSet[conf.Namespace][conf.Name] = struct{}{}
   337  			if c.metrics != nil {
   338  				c.metrics.ConfigTemplates.WithLabelValues(conf.Namespace, conf.Template).Inc()
   339  			}
   340  			result = append(result, conf)
   341  		}
   342  	}
   343  	return result, nil
   344  }
   345