...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/webhook/config/watcher/watcher.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/webhook/config/watcher

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package watcher
    17  
    18  import (
    19  	"context"
    20  	"errors"
    21  	"fmt"
    22  	"html/template"
    23  	"io/ioutil"
    24  	"os"
    25  	"strings"
    26  
    27  	"github.com/ghodss/yaml"
    28  
    29  	"github.com/chaos-mesh/chaos-mesh/pkg/metrics"
    30  	"github.com/chaos-mesh/chaos-mesh/pkg/webhook/config"
    31  
    32  	ctrl "sigs.k8s.io/controller-runtime"
    33  
    34  	apierrs "k8s.io/apimachinery/pkg/api/errors"
    35  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    36  	"k8s.io/apimachinery/pkg/labels"
    37  	"k8s.io/apimachinery/pkg/watch"
    38  	"k8s.io/client-go/kubernetes"
    39  	k8sv1 "k8s.io/client-go/kubernetes/typed/core/v1"
    40  	ctrlconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
    41  )
    42  
    43  var log = ctrl.Log.WithName("inject-webhook")
    44  var restClusterConfig = ctrlconfig.GetConfig
    45  var kubernetesNewForConfig = kubernetes.NewForConfig
    46  
    47  const (
    48  	serviceAccountNamespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
    49  	templateItemKey                 = "data"
    50  )
    51  
    52  // ErrWatchChannelClosed should restart watcher
    53  var ErrWatchChannelClosed = errors.New("watcher channel has closed")
    54  
    55  // K8sConfigMapWatcher is a struct that connects to the API and collects, parses, and emits sidecar configurations
    56  type K8sConfigMapWatcher struct {
    57  	Config
    58  	client  k8sv1.CoreV1Interface
    59  	metrics *metrics.ChaosControllerManagerMetricsCollector
    60  }
    61  
    62  // New creates a new K8sConfigMapWatcher
    63  func New(cfg Config, metrics *metrics.ChaosControllerManagerMetricsCollector) (*K8sConfigMapWatcher, error) {
    64  	c := K8sConfigMapWatcher{Config: cfg, metrics: metrics}
    65  	if strings.TrimSpace(c.TemplateNamespace) == "" {
    66  		// ENHANCEMENT: support downward API/env vars instead? https://github.com/kubernetes/kubernetes/blob/release-1.0/docs/user-guide/downward-api.md
    67  		// load from file on disk for serviceaccount: /var/run/secrets/kubernetes.io/serviceaccount/namespace
    68  		nsBytes, err := ioutil.ReadFile(serviceAccountNamespaceFilePath)
    69  		if err != nil {
    70  			if os.IsNotExist(err) {
    71  				return nil, fmt.Errorf("%s: maybe you should specify ----template-namespace if you are running outside of kubernetes", err.Error())
    72  			}
    73  			return nil, err
    74  		}
    75  		ns := strings.TrimSpace(string(nsBytes))
    76  		if ns != "" {
    77  			c.TemplateNamespace = ns
    78  			log.Info("Inferred ConfigMap",
    79  				"template namespace", c.TemplateNamespace, "filepath", serviceAccountNamespaceFilePath)
    80  		} else {
    81  			return nil, errors.New("can not found namespace. maybe you should specify --template-namespace if you are running outside of kubernetes")
    82  		}
    83  	}
    84  
    85  	log.Info("Creating Kubernetes client to talk to the api-server")
    86  	k8sConfig, err := restClusterConfig()
    87  	if err != nil {
    88  		return nil, err
    89  	}
    90  
    91  	clientset, err := kubernetesNewForConfig(k8sConfig)
    92  	if err != nil {
    93  		return nil, err
    94  	}
    95  
    96  	c.client = clientset.CoreV1()
    97  	if err = validate(&c); err != nil {
    98  		return nil, fmt.Errorf("validation failed for K8sConfigMapWatcher: %s", err.Error())
    99  	}
   100  	log.Info("Created ConfigMap watcher",
   101  		"apiserver", k8sConfig.Host, "template namespaces", c.TemplateNamespace,
   102  		"template labels", c.TemplateLabels, "config labels", c.ConfigLabels)
   103  	return &c, nil
   104  }
   105  
   106  func validate(c *K8sConfigMapWatcher) error {
   107  	if c == nil {
   108  		return errors.New("configmap watcher was nil")
   109  	}
   110  	if c.TemplateNamespace == "" {
   111  		return errors.New("namespace is empty")
   112  	}
   113  	if c.TemplateLabels == nil {
   114  		return errors.New("template labels was an uninitialized map")
   115  	}
   116  	if c.ConfigLabels == nil {
   117  		return errors.New("config labels was an uninitialized map")
   118  	}
   119  	if c.client == nil {
   120  		return errors.New("k8s client was not setup properly")
   121  	}
   122  	return nil
   123  }
   124  
   125  // Watch watches for events impacting watched ConfigMaps and emits their events across a channel
   126  func (c *K8sConfigMapWatcher) Watch(notifyMe chan<- interface{}, stopCh <-chan struct{}) error {
   127  	log.Info("Watching for ConfigMaps for changes",
   128  		"template namespace", c.TemplateNamespace, "labels", c.ConfigLabels)
   129  	templateWatcher, err := c.client.ConfigMaps(c.TemplateNamespace).Watch(
   130  		// FIXME: get context from parameter
   131  		context.TODO(),
   132  		metav1.ListOptions{
   133  			LabelSelector: mapStringStringToLabelSelector(c.TemplateLabels),
   134  		})
   135  	if err != nil {
   136  		return fmt.Errorf("unable to create template watcher (possible serviceaccount RBAC/ACL failure?): %s", err.Error())
   137  	}
   138  
   139  	targetNamespace := ""
   140  	if !c.Config.ClusterScoped {
   141  		targetNamespace = c.TargetNamespace
   142  	}
   143  
   144  	configWatcher, err := c.client.ConfigMaps(targetNamespace).Watch(
   145  		// FIXME: get context from parameter
   146  		context.TODO(),
   147  		metav1.ListOptions{
   148  			LabelSelector: mapStringStringToLabelSelector(c.ConfigLabels),
   149  		})
   150  	if err != nil {
   151  		return fmt.Errorf("unable to create config watcher (possible serviceaccount RBAC/ACL failure?): %s", err.Error())
   152  	}
   153  	defer func() {
   154  		configWatcher.Stop()
   155  		templateWatcher.Stop()
   156  	}()
   157  	for {
   158  		select {
   159  		case e, ok := <-templateWatcher.ResultChan():
   160  			// channel may closed caused by HTTP timeout, should restart watcher
   161  			// detail at https://github.com/kubernetes/client-go/issues/334
   162  			if !ok {
   163  				log.V(5).Info("channel has closed, will restart watcher")
   164  				return ErrWatchChannelClosed
   165  			}
   166  			if e.Type == watch.Error {
   167  				return apierrs.FromObject(e.Object)
   168  			}
   169  			log.V(3).Info("type", e.Type, "kind", e.Object.GetObjectKind())
   170  			switch e.Type {
   171  			case watch.Added:
   172  				fallthrough
   173  			case watch.Modified:
   174  				fallthrough
   175  			case watch.Deleted:
   176  				// signal reconciliation of all InjectionConfigs
   177  				log.V(3).Info("Signalling event received from watch channel",
   178  					"type", e.Type, "kind", e.Object.GetObjectKind())
   179  				notifyMe <- struct{}{}
   180  			default:
   181  				log.Error(nil, "got unsupported event! skipping", "type", e.Type, "kind", e.Object.GetObjectKind())
   182  			}
   183  		case e, ok := <-configWatcher.ResultChan():
   184  			// channel may closed caused by HTTP timeout, should restart watcher
   185  			// detail at https://github.com/kubernetes/client-go/issues/334
   186  			if !ok {
   187  				log.V(5).Info("channel has closed, will restart watcher")
   188  				return ErrWatchChannelClosed
   189  			}
   190  			if e.Type == watch.Error {
   191  				return apierrs.FromObject(e.Object)
   192  			}
   193  			log.V(3).Info("type", e.Type, "kind", e.Object.GetObjectKind())
   194  			switch e.Type {
   195  			case watch.Added:
   196  				fallthrough
   197  			case watch.Modified:
   198  				fallthrough
   199  			case watch.Deleted:
   200  				// signal reconciliation of all InjectionConfigs
   201  				log.V(3).Info("Signalling event received from watch channel",
   202  					"type", e.Type, "kind", e.Object.GetObjectKind())
   203  				notifyMe <- struct{}{}
   204  			default:
   205  				log.Error(nil, "got unsupported event! skipping", "type", e.Type, "kind", e.Object.GetObjectKind())
   206  			}
   207  			// events! yay!
   208  		case <-stopCh:
   209  			log.V(2).Info("Stopping configmap watcher, context indicated we are done")
   210  			// clean up, we cancelled the context, so stop the watch
   211  			return nil
   212  		}
   213  	}
   214  }
   215  
   216  func mapStringStringToLabelSelector(m map[string]string) string {
   217  	// https://github.com/kubernetes/apimachinery/issues/47
   218  	return labels.Set(m).String()
   219  }
   220  
   221  // GetInjectionConfigs fetches all matching ConfigMaps
   222  func (c *K8sConfigMapWatcher) GetInjectionConfigs() (map[string][]*config.InjectionConfig, error) {
   223  	templates, err := c.GetTemplates()
   224  	if err != nil {
   225  		return nil, err
   226  	}
   227  
   228  	configs, err := c.GetConfigs()
   229  	if err != nil {
   230  		return nil, err
   231  	}
   232  	if len(templates) == 0 || len(configs) == 0 {
   233  		log.Info("cannot get injection configs")
   234  		return nil, nil
   235  	}
   236  
   237  	injectionConfigs := make(map[string][]*config.InjectionConfig)
   238  	if c.metrics != nil {
   239  		c.metrics.InjectionConfigs.Reset()
   240  	}
   241  	for _, conf := range configs {
   242  		temp, ok := templates[conf.Template]
   243  		if !ok {
   244  			log.Error(errors.New("cannot find the specified template"), "",
   245  				"template", conf.Template, "namespace", conf.Namespace, "config", conf.Name)
   246  			if c.metrics != nil {
   247  				c.metrics.TemplateNotExist.WithLabelValues(conf.Namespace, conf.Template).Inc()
   248  			}
   249  			continue
   250  		}
   251  		yamlTemp, err := template.New("").Parse(temp)
   252  		if err != nil {
   253  			log.Error(err, "failed to parse template",
   254  				"template", conf.Template, "config", conf.Name)
   255  			continue
   256  		}
   257  
   258  		result, err := renderTemplateWithArgs(yamlTemp, conf.Arguments)
   259  		if err != nil {
   260  			log.Error(err, "failed to render template",
   261  				"template", conf.Template, "config", conf.Name)
   262  			continue
   263  		}
   264  
   265  		var injectConfig config.InjectionConfig
   266  		if err := yaml.Unmarshal(result, &injectConfig); err != nil {
   267  			log.Error(err, "failed to unmarshal injection config", "injection config", string(result))
   268  			continue
   269  		}
   270  
   271  		injectConfig.Selector = conf.Selector
   272  		injectConfig.Name = conf.Name
   273  		if _, ok := injectionConfigs[conf.Namespace]; !ok {
   274  			injectionConfigs[conf.Namespace] = make([]*config.InjectionConfig, 0)
   275  		}
   276  		injectionConfigs[conf.Namespace] = append(injectionConfigs[conf.Namespace], &injectConfig)
   277  		if c.metrics != nil {
   278  			c.metrics.InjectionConfigs.WithLabelValues(conf.Namespace, conf.Template).Inc()
   279  		}
   280  	}
   281  
   282  	return injectionConfigs, nil
   283  }
   284  
   285  // GetTemplates returns a map of common templates
   286  func (c *K8sConfigMapWatcher) GetTemplates() (map[string]string, error) {
   287  	log.Info("Fetching Template Configs...")
   288  	templateList, err := c.client.ConfigMaps(c.TemplateNamespace).List(
   289  		// FIXME: get context from parameter
   290  		context.TODO(),
   291  		metav1.ListOptions{
   292  			LabelSelector: mapStringStringToLabelSelector(c.TemplateLabels),
   293  		})
   294  	if err != nil {
   295  		return nil, err
   296  	}
   297  
   298  	log.Info("Fetched templates", "templates count", len(templateList.Items))
   299  	templates := make(map[string]string, len(templateList.Items))
   300  	for _, temp := range templateList.Items {
   301  		templates[temp.Name] = temp.Data[templateItemKey]
   302  	}
   303  	if c.metrics != nil {
   304  		c.metrics.SidecarTemplates.Set(float64(len(templates)))
   305  	}
   306  	return templates, nil
   307  }
   308  
   309  // GetConfigs returns the list of template args config
   310  func (c *K8sConfigMapWatcher) GetConfigs() ([]*config.TemplateArgs, error) {
   311  	log.Info("Fetching Configs...")
   312  	// List all the configs with the required label selector
   313  	configList, err := c.client.ConfigMaps("").List(
   314  		// FIXME: get context from parmeter
   315  		context.TODO(),
   316  		metav1.ListOptions{
   317  			LabelSelector: mapStringStringToLabelSelector(c.ConfigLabels),
   318  		})
   319  	if err != nil {
   320  		return nil, err
   321  	}
   322  
   323  	log.Info("Fetched configs", "configs count", len(configList.Items))
   324  	if c.metrics != nil {
   325  		c.metrics.ConfigTemplates.Reset()
   326  	}
   327  	configSet := make(map[string]map[string]struct{})
   328  	result := make([]*config.TemplateArgs, 0)
   329  	for _, item := range configList.Items {
   330  		for _, payload := range item.Data {
   331  			conf, err := config.LoadTemplateArgs(strings.NewReader(payload))
   332  			if err != nil {
   333  				log.Error(err, "failed to load template args", "payload", payload)
   334  				if c.metrics != nil {
   335  					c.metrics.TemplateLoadError.Inc()
   336  				}
   337  				continue
   338  			}
   339  			conf.Namespace = item.Namespace
   340  			if _, ok := configSet[conf.Namespace]; !ok {
   341  				configSet[conf.Namespace] = make(map[string]struct{})
   342  			}
   343  			if _, ok := configSet[conf.Namespace][conf.Name]; ok {
   344  				log.Error(errors.New("duplicate config name"), "",
   345  					"namespace", conf.Namespace, "name", conf.Name)
   346  				if c.metrics != nil {
   347  					c.metrics.ConfigNameDuplicate.WithLabelValues(conf.Namespace, conf.Name).Inc()
   348  				}
   349  				continue
   350  			}
   351  			configSet[conf.Namespace][conf.Name] = struct{}{}
   352  			if c.metrics != nil {
   353  				c.metrics.ConfigTemplates.WithLabelValues(conf.Namespace, conf.Template).Inc()
   354  			}
   355  			result = append(result, conf)
   356  		}
   357  	}
   358  	return result, nil
   359  }
   360