...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/selector/pod/selector.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/selector/pod

     1  // Copyright 2019 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package pod
    15  
    16  import (
    17  	"context"
    18  	"crypto/rand"
    19  	"errors"
    20  	"fmt"
    21  	"math"
    22  	"math/big"
    23  	"strconv"
    24  	"strings"
    25  
    26  	"go.uber.org/fx"
    27  	v1 "k8s.io/api/core/v1"
    28  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/fields"
    31  	"k8s.io/apimachinery/pkg/labels"
    32  	"k8s.io/apimachinery/pkg/selection"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	ctrl "sigs.k8s.io/controller-runtime"
    35  	"sigs.k8s.io/controller-runtime/pkg/client"
    36  
    37  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    38  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    39  	"github.com/chaos-mesh/chaos-mesh/pkg/label"
    40  	"github.com/chaos-mesh/chaos-mesh/pkg/mock"
    41  )
    42  
    43  var log = ctrl.Log.WithName("selector")
    44  
    45  const injectAnnotationKey = "chaos-mesh.org/inject"
    46  
    47  type Option struct {
    48  	ClusterScoped         bool
    49  	TargetNamespace       string
    50  	EnableFilterNamespace bool
    51  }
    52  
    53  type SelectImpl struct {
    54  	c client.Client
    55  	r client.Reader
    56  
    57  	Option
    58  }
    59  
    60  type Pod struct {
    61  	v1.Pod
    62  }
    63  
    64  func (pod *Pod) Id() string {
    65  	return (types.NamespacedName{
    66  		Name:      pod.Name,
    67  		Namespace: pod.Namespace,
    68  	}).String()
    69  }
    70  
    71  func (impl *SelectImpl) Select(ctx context.Context, ps *v1alpha1.PodSelector) ([]*Pod, error) {
    72  	if ps == nil {
    73  		return []*Pod{}, nil
    74  	}
    75  
    76  	pods, err := SelectAndFilterPods(ctx, impl.c, impl.r, ps, impl.ClusterScoped, impl.TargetNamespace, impl.EnableFilterNamespace)
    77  	if err != nil {
    78  		return nil, err
    79  	}
    80  
    81  	var result []*Pod
    82  	for _, pod := range pods {
    83  		result = append(result, &Pod{
    84  			pod,
    85  		})
    86  	}
    87  
    88  	return result, nil
    89  }
    90  
    91  type Params struct {
    92  	fx.In
    93  
    94  	Client client.Client
    95  	Reader client.Reader `name:"no-cache"`
    96  }
    97  
    98  func New(params Params) *SelectImpl {
    99  	return &SelectImpl{
   100  		params.Client,
   101  		params.Reader,
   102  		Option{
   103  			config.ControllerCfg.ClusterScoped,
   104  			config.ControllerCfg.TargetNamespace,
   105  			config.ControllerCfg.EnableFilterNamespace,
   106  		},
   107  	}
   108  }
   109  
   110  // SelectAndFilterPods returns the list of pods that filtered by selector and PodMode
   111  func SelectAndFilterPods(ctx context.Context, c client.Client, r client.Reader, spec *v1alpha1.PodSelector, clusterScoped bool, targetNamespace string, enableFilterNamespace bool) ([]v1.Pod, error) {
   112  	if pods := mock.On("MockSelectAndFilterPods"); pods != nil {
   113  		return pods.(func() []v1.Pod)(), nil
   114  	}
   115  	if err := mock.On("MockSelectedAndFilterPodsError"); err != nil {
   116  		return nil, err.(error)
   117  	}
   118  
   119  	selector := spec.Selector
   120  	mode := spec.Mode
   121  	value := spec.Value
   122  
   123  	pods, err := SelectPods(ctx, c, r, selector, clusterScoped, targetNamespace, enableFilterNamespace)
   124  	if err != nil {
   125  		return nil, err
   126  	}
   127  
   128  	if len(pods) == 0 {
   129  		err = errors.New("no pod is selected")
   130  		return nil, err
   131  	}
   132  
   133  	filteredPod, err := filterPodsByMode(pods, mode, value)
   134  	if err != nil {
   135  		return nil, err
   136  	}
   137  
   138  	return filteredPod, nil
   139  }
   140  
   141  //revive:disable:flag-parameter
   142  
   143  // SelectPods returns the list of pods that are available for pod chaos action.
   144  // It returns all pods that match the configured label, annotation and namespace selectors.
   145  // If pods are specifically specified by `selector.Pods`, it just returns the selector.Pods.
   146  func SelectPods(ctx context.Context, c client.Client, r client.Reader, selector v1alpha1.PodSelectorSpec, clusterScoped bool, targetNamespace string, enableFilterNamespace bool) ([]v1.Pod, error) {
   147  	// TODO: refactor: make different selectors to replace if-else logics
   148  	var pods []v1.Pod
   149  
   150  	// pods are specifically specified
   151  	if len(selector.Pods) > 0 {
   152  		for ns, names := range selector.Pods {
   153  			if !clusterScoped {
   154  				if targetNamespace != ns {
   155  					log.Info("skip namespace because ns is out of scope within namespace scoped mode", "namespace", ns)
   156  					continue
   157  				}
   158  			}
   159  			for _, name := range names {
   160  				var pod v1.Pod
   161  				err := c.Get(ctx, types.NamespacedName{
   162  					Namespace: ns,
   163  					Name:      name,
   164  				}, &pod)
   165  				if err == nil {
   166  					pods = append(pods, pod)
   167  					continue
   168  				}
   169  
   170  				if apierrors.IsNotFound(err) {
   171  					log.Error(err, "Pod is not found", "namespace", ns, "pod name", name)
   172  					continue
   173  				}
   174  
   175  				return nil, err
   176  			}
   177  		}
   178  
   179  		return pods, nil
   180  	}
   181  
   182  	if !clusterScoped {
   183  		if len(selector.Namespaces) > 1 {
   184  			return nil, fmt.Errorf("could NOT use more than 1 namespace selector within namespace scoped mode")
   185  		} else if len(selector.Namespaces) == 1 {
   186  			if selector.Namespaces[0] != targetNamespace {
   187  				return nil, fmt.Errorf("could NOT list pods from out of scoped namespace: %s", selector.Namespaces[0])
   188  			}
   189  		}
   190  	}
   191  
   192  	var listOptions = client.ListOptions{}
   193  	if !clusterScoped {
   194  		listOptions.Namespace = targetNamespace
   195  	}
   196  	if len(selector.LabelSelectors) > 0 || len(selector.ExpressionSelectors) > 0 {
   197  		metav1Ls := &metav1.LabelSelector{
   198  			MatchLabels:      selector.LabelSelectors,
   199  			MatchExpressions: selector.ExpressionSelectors,
   200  		}
   201  		ls, err := metav1.LabelSelectorAsSelector(metav1Ls)
   202  		if err != nil {
   203  			return nil, err
   204  		}
   205  		listOptions.LabelSelector = ls
   206  	}
   207  
   208  	listFunc := c.List
   209  
   210  	if len(selector.FieldSelectors) > 0 {
   211  		listOptions.FieldSelector = fields.SelectorFromSet(selector.FieldSelectors)
   212  
   213  		// Since FieldSelectors need to implement index creation, Reader.List is used to get the pod list.
   214  		// Otherwise, just call Client.List directly, which can be obtained through cache.
   215  		if r != nil {
   216  			listFunc = r.List
   217  		}
   218  	}
   219  
   220  	var podList v1.PodList
   221  	if len(selector.Namespaces) > 0 {
   222  		for _, namespace := range selector.Namespaces {
   223  			listOptions.Namespace = namespace
   224  
   225  			if err := listFunc(ctx, &podList, &listOptions); err != nil {
   226  				return nil, err
   227  			}
   228  
   229  			pods = append(pods, podList.Items...)
   230  		}
   231  	} else {
   232  		if err := listFunc(ctx, &podList, &listOptions); err != nil {
   233  			return nil, err
   234  		}
   235  
   236  		pods = append(pods, podList.Items...)
   237  	}
   238  
   239  	var (
   240  		nodes           []v1.Node
   241  		nodeList        v1.NodeList
   242  		nodeListOptions = client.ListOptions{}
   243  	)
   244  	// if both setting Nodes and NodeSelectors, the node list will be combined.
   245  	if len(selector.Nodes) > 0 || len(selector.NodeSelectors) > 0 {
   246  		if len(selector.Nodes) > 0 {
   247  			for _, nodename := range selector.Nodes {
   248  				var node v1.Node
   249  				if err := c.Get(ctx, types.NamespacedName{Name: nodename}, &node); err != nil {
   250  					return nil, err
   251  				}
   252  				nodes = append(nodes, node)
   253  			}
   254  		}
   255  		if len(selector.NodeSelectors) > 0 {
   256  			nodeListOptions.LabelSelector = labels.SelectorFromSet(selector.NodeSelectors)
   257  			if err := c.List(ctx, &nodeList, &nodeListOptions); err != nil {
   258  				return nil, err
   259  			}
   260  			nodes = append(nodes, nodeList.Items...)
   261  		}
   262  		pods = filterPodByNode(pods, nodes)
   263  	}
   264  	if enableFilterNamespace {
   265  		pods = filterByNamespaces(ctx, c, pods)
   266  	}
   267  
   268  	namespaceSelector, err := parseSelector(strings.Join(selector.Namespaces, ","))
   269  	if err != nil {
   270  		return nil, err
   271  	}
   272  	pods, err = filterByNamespaceSelector(pods, namespaceSelector)
   273  	if err != nil {
   274  		return nil, err
   275  	}
   276  
   277  	annotationsSelector, err := parseSelector(label.Label(selector.AnnotationSelectors).String())
   278  	if err != nil {
   279  		return nil, err
   280  	}
   281  	pods = filterByAnnotations(pods, annotationsSelector)
   282  
   283  	phaseSelector, err := parseSelector(strings.Join(selector.PodPhaseSelectors, ","))
   284  	if err != nil {
   285  		return nil, err
   286  	}
   287  	pods, err = filterByPhaseSelector(pods, phaseSelector)
   288  	if err != nil {
   289  		return nil, err
   290  	}
   291  
   292  	return pods, nil
   293  }
   294  
   295  //revive:enable:flag-parameter
   296  
   297  // GetService get k8s service by service name
   298  func GetService(ctx context.Context, c client.Client, namespace, controllerNamespace string, serviceName string) (*v1.Service, error) {
   299  	// use the environment value if namespace is empty
   300  	if len(namespace) == 0 {
   301  		namespace = controllerNamespace
   302  	}
   303  
   304  	service := &v1.Service{}
   305  	err := c.Get(ctx, client.ObjectKey{
   306  		Namespace: namespace,
   307  		Name:      serviceName,
   308  	}, service)
   309  	if err != nil {
   310  		return nil, err
   311  	}
   312  
   313  	return service, nil
   314  }
   315  
   316  // CheckPodMeetSelector checks if this pod meets the selection criteria.
   317  // TODO: support to check fieldsSelector
   318  func CheckPodMeetSelector(pod v1.Pod, selector v1alpha1.PodSelectorSpec) (bool, error) {
   319  	if len(selector.Pods) > 0 {
   320  		meet := false
   321  		for ns, names := range selector.Pods {
   322  			if pod.Namespace != ns {
   323  				continue
   324  			}
   325  
   326  			for _, name := range names {
   327  				if pod.Name == name {
   328  					meet = true
   329  				}
   330  			}
   331  
   332  			if !meet {
   333  				return false, nil
   334  			}
   335  		}
   336  	}
   337  
   338  	// check pod labels.
   339  	if pod.Labels == nil {
   340  		pod.Labels = make(map[string]string)
   341  	}
   342  
   343  	if selector.LabelSelectors == nil {
   344  		selector.LabelSelectors = make(map[string]string)
   345  	}
   346  
   347  	if len(selector.LabelSelectors) > 0 || len(selector.ExpressionSelectors) > 0 {
   348  		metav1Ls := &metav1.LabelSelector{
   349  			MatchLabels:      selector.LabelSelectors,
   350  			MatchExpressions: selector.ExpressionSelectors,
   351  		}
   352  		ls, err := metav1.LabelSelectorAsSelector(metav1Ls)
   353  		if err != nil {
   354  			return false, err
   355  		}
   356  		podLabels := labels.Set(pod.Labels)
   357  		if len(pod.Labels) == 0 || !ls.Matches(podLabels) {
   358  			return false, nil
   359  		}
   360  	}
   361  
   362  	pods := []v1.Pod{pod}
   363  
   364  	namespaceSelector, err := parseSelector(strings.Join(selector.Namespaces, ","))
   365  	if err != nil {
   366  		return false, err
   367  	}
   368  
   369  	pods, err = filterByNamespaceSelector(pods, namespaceSelector)
   370  	if err != nil {
   371  		return false, err
   372  	}
   373  
   374  	annotationsSelector, err := parseSelector(label.Label(selector.AnnotationSelectors).String())
   375  	if err != nil {
   376  		return false, err
   377  	}
   378  
   379  	pods = filterByAnnotations(pods, annotationsSelector)
   380  
   381  	phaseSelector, err := parseSelector(strings.Join(selector.PodPhaseSelectors, ","))
   382  	if err != nil {
   383  		return false, err
   384  	}
   385  	pods, err = filterByPhaseSelector(pods, phaseSelector)
   386  	if err != nil {
   387  		return false, err
   388  	}
   389  
   390  	if len(pods) > 0 {
   391  		return true, nil
   392  	}
   393  
   394  	return false, nil
   395  }
   396  
   397  func filterPodByNode(pods []v1.Pod, nodes []v1.Node) []v1.Pod {
   398  	if len(nodes) == 0 {
   399  		return nil
   400  	}
   401  	var filteredList []v1.Pod
   402  	for _, pod := range pods {
   403  		for _, node := range nodes {
   404  			if pod.Spec.NodeName == node.Name {
   405  				filteredList = append(filteredList, pod)
   406  			}
   407  		}
   408  	}
   409  	return filteredList
   410  }
   411  
   412  // filterPodsByMode filters pods by mode from pod list
   413  func filterPodsByMode(pods []v1.Pod, mode v1alpha1.PodMode, value string) ([]v1.Pod, error) {
   414  	if len(pods) == 0 {
   415  		return nil, errors.New("cannot generate pods from empty list")
   416  	}
   417  
   418  	switch mode {
   419  	case v1alpha1.OnePodMode:
   420  		index := getRandomNumber(len(pods))
   421  		pod := pods[index]
   422  
   423  		return []v1.Pod{pod}, nil
   424  	case v1alpha1.AllPodMode:
   425  		return pods, nil
   426  	case v1alpha1.FixedPodMode:
   427  		num, err := strconv.Atoi(value)
   428  		if err != nil {
   429  			return nil, err
   430  		}
   431  
   432  		if len(pods) < num {
   433  			num = len(pods)
   434  		}
   435  
   436  		if num <= 0 {
   437  			return nil, errors.New("cannot select any pod as value below or equal 0")
   438  		}
   439  
   440  		return getFixedSubListFromPodList(pods, num), nil
   441  	case v1alpha1.FixedPercentPodMode:
   442  		percentage, err := strconv.Atoi(value)
   443  		if err != nil {
   444  			return nil, err
   445  		}
   446  
   447  		if percentage == 0 {
   448  			return nil, errors.New("cannot select any pod as value below or equal 0")
   449  		}
   450  
   451  		if percentage < 0 || percentage > 100 {
   452  			return nil, fmt.Errorf("fixed percentage value of %d is invalid, Must be (0,100]", percentage)
   453  		}
   454  
   455  		num := int(math.Floor(float64(len(pods)) * float64(percentage) / 100))
   456  
   457  		return getFixedSubListFromPodList(pods, num), nil
   458  	case v1alpha1.RandomMaxPercentPodMode:
   459  		maxPercentage, err := strconv.Atoi(value)
   460  		if err != nil {
   461  			return nil, err
   462  		}
   463  
   464  		if maxPercentage == 0 {
   465  			return nil, errors.New("cannot select any pod as value below or equal 0")
   466  		}
   467  
   468  		if maxPercentage < 0 || maxPercentage > 100 {
   469  			return nil, fmt.Errorf("fixed percentage value of %d is invalid, Must be [0-100]", maxPercentage)
   470  		}
   471  
   472  		percentage := getRandomNumber(maxPercentage + 1) // + 1 because Intn works with half open interval [0,n) and we want [0,n]
   473  		num := int(math.Floor(float64(len(pods)) * float64(percentage) / 100))
   474  
   475  		return getFixedSubListFromPodList(pods, num), nil
   476  	default:
   477  		return nil, fmt.Errorf("mode %s not supported", mode)
   478  	}
   479  }
   480  
   481  // filterByAnnotations filters a list of pods by a given annotation selector.
   482  func filterByAnnotations(pods []v1.Pod, annotations labels.Selector) []v1.Pod {
   483  	// empty filter returns original list
   484  	if annotations.Empty() {
   485  		return pods
   486  	}
   487  
   488  	var filteredList []v1.Pod
   489  
   490  	for _, pod := range pods {
   491  		// convert the pod's annotations to an equivalent label selector
   492  		selector := labels.Set(pod.Annotations)
   493  
   494  		// include pod if its annotations match the selector
   495  		if annotations.Matches(selector) {
   496  			filteredList = append(filteredList, pod)
   497  		}
   498  	}
   499  
   500  	return filteredList
   501  }
   502  
   503  // filterByPhaseSet filters a list of pods by a given PodPhase selector.
   504  func filterByPhaseSelector(pods []v1.Pod, phases labels.Selector) ([]v1.Pod, error) {
   505  	if phases.Empty() {
   506  		return pods, nil
   507  	}
   508  
   509  	reqs, _ := phases.Requirements()
   510  	var (
   511  		reqIncl []labels.Requirement
   512  		reqExcl []labels.Requirement
   513  
   514  		filteredList []v1.Pod
   515  	)
   516  
   517  	for _, req := range reqs {
   518  		switch req.Operator() {
   519  		case selection.Exists:
   520  			reqIncl = append(reqIncl, req)
   521  		case selection.DoesNotExist:
   522  			reqExcl = append(reqExcl, req)
   523  		default:
   524  			return nil, fmt.Errorf("unsupported operator: %s", req.Operator())
   525  		}
   526  	}
   527  
   528  	for _, pod := range pods {
   529  		included := len(reqIncl) == 0
   530  		selector := labels.Set{string(pod.Status.Phase): ""}
   531  
   532  		// include pod if one including requirement matches
   533  		for _, req := range reqIncl {
   534  			if req.Matches(selector) {
   535  				included = true
   536  				break
   537  			}
   538  		}
   539  
   540  		// exclude pod if it is filtered out by at least one excluding requirement
   541  		for _, req := range reqExcl {
   542  			if !req.Matches(selector) {
   543  				included = false
   544  				break
   545  			}
   546  		}
   547  
   548  		if included {
   549  			filteredList = append(filteredList, pod)
   550  		}
   551  	}
   552  
   553  	return filteredList, nil
   554  }
   555  
   556  func filterByNamespaces(ctx context.Context, c client.Client, pods []v1.Pod) []v1.Pod {
   557  	var filteredList []v1.Pod
   558  
   559  	for _, pod := range pods {
   560  		ok, err := IsAllowedNamespaces(ctx, c, pod.Namespace)
   561  		if err != nil {
   562  			log.Error(err, "fail to check whether this namespace is allowed", "namespace", pod.Namespace)
   563  			continue
   564  		}
   565  
   566  		if ok {
   567  			filteredList = append(filteredList, pod)
   568  		} else {
   569  			log.Info("namespace is not enabled for chaos-mesh", "namespace", pod.Namespace)
   570  		}
   571  	}
   572  	return filteredList
   573  }
   574  
   575  func IsAllowedNamespaces(ctx context.Context, c client.Client, namespace string) (bool, error) {
   576  	ns := &v1.Namespace{}
   577  
   578  	err := c.Get(ctx, types.NamespacedName{Name: namespace}, ns)
   579  	if err != nil {
   580  		return false, err
   581  	}
   582  
   583  	if ns.Annotations[injectAnnotationKey] == "enabled" {
   584  		return true, nil
   585  	}
   586  
   587  	return false, nil
   588  }
   589  
   590  // filterByNamespaceSelector filters a list of pods by a given namespace selector.
   591  func filterByNamespaceSelector(pods []v1.Pod, namespaces labels.Selector) ([]v1.Pod, error) {
   592  	// empty filter returns original list
   593  	if namespaces.Empty() {
   594  		return pods, nil
   595  	}
   596  
   597  	// split requirements into including and excluding groups
   598  	reqs, _ := namespaces.Requirements()
   599  
   600  	var (
   601  		reqIncl []labels.Requirement
   602  		reqExcl []labels.Requirement
   603  
   604  		filteredList []v1.Pod
   605  	)
   606  
   607  	for _, req := range reqs {
   608  		switch req.Operator() {
   609  		case selection.Exists:
   610  			reqIncl = append(reqIncl, req)
   611  		case selection.DoesNotExist:
   612  			reqExcl = append(reqExcl, req)
   613  		default:
   614  			return nil, fmt.Errorf("unsupported operator: %s", req.Operator())
   615  		}
   616  	}
   617  
   618  	for _, pod := range pods {
   619  		// if there aren't any including requirements, we're in by default
   620  		included := len(reqIncl) == 0
   621  
   622  		// convert the pod's namespace to an equivalent label selector
   623  		selector := labels.Set{pod.Namespace: ""}
   624  
   625  		// include pod if one including requirement matches
   626  		for _, req := range reqIncl {
   627  			if req.Matches(selector) {
   628  				included = true
   629  				break
   630  			}
   631  		}
   632  
   633  		// exclude pod if it is filtered out by at least one excluding requirement
   634  		for _, req := range reqExcl {
   635  			if !req.Matches(selector) {
   636  				included = false
   637  				break
   638  			}
   639  		}
   640  
   641  		if included {
   642  			filteredList = append(filteredList, pod)
   643  		}
   644  	}
   645  
   646  	return filteredList, nil
   647  }
   648  
   649  func parseSelector(str string) (labels.Selector, error) {
   650  	selector, err := labels.Parse(str)
   651  	if err != nil {
   652  		return nil, err
   653  	}
   654  	return selector, nil
   655  }
   656  
   657  func getFixedSubListFromPodList(pods []v1.Pod, num int) []v1.Pod {
   658  	indexes := RandomFixedIndexes(0, uint(len(pods)), uint(num))
   659  
   660  	var filteredPods []v1.Pod
   661  
   662  	for _, index := range indexes {
   663  		index := index
   664  		filteredPods = append(filteredPods, pods[index])
   665  	}
   666  
   667  	return filteredPods
   668  }
   669  
   670  // RandomFixedIndexes returns the `count` random indexes between `start` and `end`.
   671  // [start, end)
   672  func RandomFixedIndexes(start, end, count uint) []uint {
   673  	var indexes []uint
   674  	m := make(map[uint]uint, count)
   675  
   676  	if end < start {
   677  		return indexes
   678  	}
   679  
   680  	if count > end-start {
   681  		for i := start; i < end; i++ {
   682  			indexes = append(indexes, i)
   683  		}
   684  
   685  		return indexes
   686  	}
   687  
   688  	for i := 0; i < int(count); {
   689  		index := uint(getRandomNumber(int(end-start))) + start
   690  
   691  		_, exist := m[index]
   692  		if exist {
   693  			continue
   694  		}
   695  
   696  		m[index] = index
   697  		indexes = append(indexes, index)
   698  		i++
   699  	}
   700  
   701  	return indexes
   702  }
   703  
   704  func getRandomNumber(max int) uint64 {
   705  	num, _ := rand.Int(rand.Reader, big.NewInt(int64(max)))
   706  	return num.Uint64()
   707  }
   708