...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/selector/pod/selector.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/selector/pod

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package pod
    17  
    18  import (
    19  	"context"
    20  	"crypto/rand"
    21  	"errors"
    22  	"fmt"
    23  	"math"
    24  	"math/big"
    25  	"strconv"
    26  	"strings"
    27  
    28  	"go.uber.org/fx"
    29  	v1 "k8s.io/api/core/v1"
    30  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/fields"
    33  	"k8s.io/apimachinery/pkg/labels"
    34  	"k8s.io/apimachinery/pkg/selection"
    35  	"k8s.io/apimachinery/pkg/types"
    36  	ctrl "sigs.k8s.io/controller-runtime"
    37  	"sigs.k8s.io/controller-runtime/pkg/client"
    38  
    39  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    40  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    41  	"github.com/chaos-mesh/chaos-mesh/pkg/label"
    42  	"github.com/chaos-mesh/chaos-mesh/pkg/mock"
    43  )
    44  
    45  var log = ctrl.Log.WithName("selector")
    46  
    47  const injectAnnotationKey = "chaos-mesh.org/inject"
    48  
    49  type Option struct {
    50  	ClusterScoped         bool
    51  	TargetNamespace       string
    52  	EnableFilterNamespace bool
    53  }
    54  
    55  type SelectImpl struct {
    56  	c client.Client
    57  	r client.Reader
    58  
    59  	Option
    60  }
    61  
    62  type Pod struct {
    63  	v1.Pod
    64  }
    65  
    66  func (pod *Pod) Id() string {
    67  	return (types.NamespacedName{
    68  		Name:      pod.Name,
    69  		Namespace: pod.Namespace,
    70  	}).String()
    71  }
    72  
    73  func (impl *SelectImpl) Select(ctx context.Context, ps *v1alpha1.PodSelector) ([]*Pod, error) {
    74  	if ps == nil {
    75  		return []*Pod{}, nil
    76  	}
    77  
    78  	pods, err := SelectAndFilterPods(ctx, impl.c, impl.r, ps, impl.ClusterScoped, impl.TargetNamespace, impl.EnableFilterNamespace)
    79  	if err != nil {
    80  		return nil, err
    81  	}
    82  
    83  	var result []*Pod
    84  	for _, pod := range pods {
    85  		result = append(result, &Pod{
    86  			pod,
    87  		})
    88  	}
    89  
    90  	return result, nil
    91  }
    92  
    93  type Params struct {
    94  	fx.In
    95  
    96  	Client client.Client
    97  	Reader client.Reader `name:"no-cache"`
    98  }
    99  
   100  func New(params Params) *SelectImpl {
   101  	return &SelectImpl{
   102  		params.Client,
   103  		params.Reader,
   104  		Option{
   105  			config.ControllerCfg.ClusterScoped,
   106  			config.ControllerCfg.TargetNamespace,
   107  			config.ControllerCfg.EnableFilterNamespace,
   108  		},
   109  	}
   110  }
   111  
   112  // SelectAndFilterPods returns the list of pods that filtered by selector and SelectorMode
   113  func SelectAndFilterPods(ctx context.Context, c client.Client, r client.Reader, spec *v1alpha1.PodSelector, clusterScoped bool, targetNamespace string, enableFilterNamespace bool) ([]v1.Pod, error) {
   114  	if pods := mock.On("MockSelectAndFilterPods"); pods != nil {
   115  		return pods.(func() []v1.Pod)(), nil
   116  	}
   117  	if err := mock.On("MockSelectedAndFilterPodsError"); err != nil {
   118  		return nil, err.(error)
   119  	}
   120  
   121  	selector := spec.Selector
   122  	mode := spec.Mode
   123  	value := spec.Value
   124  
   125  	pods, err := SelectPods(ctx, c, r, selector, clusterScoped, targetNamespace, enableFilterNamespace)
   126  	if err != nil {
   127  		return nil, err
   128  	}
   129  
   130  	if len(pods) == 0 {
   131  		err = errors.New("no pod is selected")
   132  		return nil, err
   133  	}
   134  
   135  	filteredPod, err := filterPodsByMode(pods, mode, value)
   136  	if err != nil {
   137  		return nil, err
   138  	}
   139  
   140  	return filteredPod, nil
   141  }
   142  
   143  //revive:disable:flag-parameter
   144  
   145  // SelectPods returns the list of pods that are available for pod chaos action.
   146  // It returns all pods that match the configured label, annotation and namespace selectors.
   147  // If pods are specifically specified by `selector.Pods`, it just returns the selector.Pods.
   148  func SelectPods(ctx context.Context, c client.Client, r client.Reader, selector v1alpha1.PodSelectorSpec, clusterScoped bool, targetNamespace string, enableFilterNamespace bool) ([]v1.Pod, error) {
   149  	// TODO: refactor: make different selectors to replace if-else logics
   150  	var pods []v1.Pod
   151  
   152  	// pods are specifically specified
   153  	if len(selector.Pods) > 0 {
   154  		for ns, names := range selector.Pods {
   155  			if !clusterScoped {
   156  				if targetNamespace != ns {
   157  					log.Info("skip namespace because ns is out of scope within namespace scoped mode", "namespace", ns)
   158  					continue
   159  				}
   160  			}
   161  			for _, name := range names {
   162  				var pod v1.Pod
   163  				err := c.Get(ctx, types.NamespacedName{
   164  					Namespace: ns,
   165  					Name:      name,
   166  				}, &pod)
   167  				if err == nil {
   168  					pods = append(pods, pod)
   169  					continue
   170  				}
   171  
   172  				if apierrors.IsNotFound(err) {
   173  					log.Error(err, "Pod is not found", "namespace", ns, "pod name", name)
   174  					continue
   175  				}
   176  
   177  				return nil, err
   178  			}
   179  		}
   180  
   181  		return pods, nil
   182  	}
   183  
   184  	if !clusterScoped {
   185  		if len(selector.Namespaces) > 1 {
   186  			return nil, fmt.Errorf("could NOT use more than 1 namespace selector within namespace scoped mode")
   187  		} else if len(selector.Namespaces) == 1 {
   188  			if selector.Namespaces[0] != targetNamespace {
   189  				return nil, fmt.Errorf("could NOT list pods from out of scoped namespace: %s", selector.Namespaces[0])
   190  			}
   191  		}
   192  	}
   193  
   194  	var listOptions = client.ListOptions{}
   195  	if !clusterScoped {
   196  		listOptions.Namespace = targetNamespace
   197  	}
   198  	if len(selector.LabelSelectors) > 0 || len(selector.ExpressionSelectors) > 0 {
   199  		metav1Ls := &metav1.LabelSelector{
   200  			MatchLabels:      selector.LabelSelectors,
   201  			MatchExpressions: selector.ExpressionSelectors,
   202  		}
   203  		ls, err := metav1.LabelSelectorAsSelector(metav1Ls)
   204  		if err != nil {
   205  			return nil, err
   206  		}
   207  		listOptions.LabelSelector = ls
   208  	}
   209  
   210  	listFunc := c.List
   211  
   212  	if len(selector.FieldSelectors) > 0 {
   213  		listOptions.FieldSelector = fields.SelectorFromSet(selector.FieldSelectors)
   214  
   215  		// Since FieldSelectors need to implement index creation, Reader.List is used to get the pod list.
   216  		// Otherwise, just call Client.List directly, which can be obtained through cache.
   217  		if r != nil {
   218  			listFunc = r.List
   219  		}
   220  	}
   221  
   222  	var podList v1.PodList
   223  	if len(selector.Namespaces) > 0 {
   224  		for _, namespace := range selector.Namespaces {
   225  			listOptions.Namespace = namespace
   226  
   227  			if err := listFunc(ctx, &podList, &listOptions); err != nil {
   228  				return nil, err
   229  			}
   230  
   231  			pods = append(pods, podList.Items...)
   232  		}
   233  	} else {
   234  		if err := listFunc(ctx, &podList, &listOptions); err != nil {
   235  			return nil, err
   236  		}
   237  
   238  		pods = append(pods, podList.Items...)
   239  	}
   240  
   241  	var (
   242  		nodes           []v1.Node
   243  		nodeList        v1.NodeList
   244  		nodeListOptions = client.ListOptions{}
   245  	)
   246  	// if both setting Nodes and NodeSelectors, the node list will be combined.
   247  	if len(selector.Nodes) > 0 || len(selector.NodeSelectors) > 0 {
   248  		if len(selector.Nodes) > 0 {
   249  			for _, nodename := range selector.Nodes {
   250  				var node v1.Node
   251  				if err := c.Get(ctx, types.NamespacedName{Name: nodename}, &node); err != nil {
   252  					return nil, err
   253  				}
   254  				nodes = append(nodes, node)
   255  			}
   256  		}
   257  		if len(selector.NodeSelectors) > 0 {
   258  			nodeListOptions.LabelSelector = labels.SelectorFromSet(selector.NodeSelectors)
   259  			if err := c.List(ctx, &nodeList, &nodeListOptions); err != nil {
   260  				return nil, err
   261  			}
   262  			nodes = append(nodes, nodeList.Items...)
   263  		}
   264  		pods = filterPodByNode(pods, nodes)
   265  	}
   266  	if enableFilterNamespace {
   267  		pods = filterByNamespaces(ctx, c, pods)
   268  	}
   269  
   270  	namespaceSelector, err := parseSelector(strings.Join(selector.Namespaces, ","))
   271  	if err != nil {
   272  		return nil, err
   273  	}
   274  	pods, err = filterByNamespaceSelector(pods, namespaceSelector)
   275  	if err != nil {
   276  		return nil, err
   277  	}
   278  
   279  	annotationsSelector, err := parseSelector(label.Label(selector.AnnotationSelectors).String())
   280  	if err != nil {
   281  		return nil, err
   282  	}
   283  	pods = filterByAnnotations(pods, annotationsSelector)
   284  
   285  	phaseSelector, err := parseSelector(strings.Join(selector.PodPhaseSelectors, ","))
   286  	if err != nil {
   287  		return nil, err
   288  	}
   289  	pods, err = filterByPhaseSelector(pods, phaseSelector)
   290  	if err != nil {
   291  		return nil, err
   292  	}
   293  
   294  	return pods, nil
   295  }
   296  
   297  //revive:enable:flag-parameter
   298  
   299  // GetService get k8s service by service name
   300  func GetService(ctx context.Context, c client.Client, namespace, controllerNamespace string, serviceName string) (*v1.Service, error) {
   301  	// use the environment value if namespace is empty
   302  	if len(namespace) == 0 {
   303  		namespace = controllerNamespace
   304  	}
   305  
   306  	service := &v1.Service{}
   307  	err := c.Get(ctx, client.ObjectKey{
   308  		Namespace: namespace,
   309  		Name:      serviceName,
   310  	}, service)
   311  	if err != nil {
   312  		return nil, err
   313  	}
   314  
   315  	return service, nil
   316  }
   317  
   318  // CheckPodMeetSelector checks if this pod meets the selection criteria.
   319  // TODO: support to check fieldsSelector
   320  func CheckPodMeetSelector(pod v1.Pod, selector v1alpha1.PodSelectorSpec) (bool, error) {
   321  	if len(selector.Pods) > 0 {
   322  		meet := false
   323  		for ns, names := range selector.Pods {
   324  			if pod.Namespace != ns {
   325  				continue
   326  			}
   327  
   328  			for _, name := range names {
   329  				if pod.Name == name {
   330  					meet = true
   331  				}
   332  			}
   333  
   334  			if !meet {
   335  				return false, nil
   336  			}
   337  		}
   338  	}
   339  
   340  	// check pod labels.
   341  	if pod.Labels == nil {
   342  		pod.Labels = make(map[string]string)
   343  	}
   344  
   345  	if selector.LabelSelectors == nil {
   346  		selector.LabelSelectors = make(map[string]string)
   347  	}
   348  
   349  	if len(selector.LabelSelectors) > 0 || len(selector.ExpressionSelectors) > 0 {
   350  		metav1Ls := &metav1.LabelSelector{
   351  			MatchLabels:      selector.LabelSelectors,
   352  			MatchExpressions: selector.ExpressionSelectors,
   353  		}
   354  		ls, err := metav1.LabelSelectorAsSelector(metav1Ls)
   355  		if err != nil {
   356  			return false, err
   357  		}
   358  		podLabels := labels.Set(pod.Labels)
   359  		if len(pod.Labels) == 0 || !ls.Matches(podLabels) {
   360  			return false, nil
   361  		}
   362  	}
   363  
   364  	pods := []v1.Pod{pod}
   365  
   366  	namespaceSelector, err := parseSelector(strings.Join(selector.Namespaces, ","))
   367  	if err != nil {
   368  		return false, err
   369  	}
   370  
   371  	pods, err = filterByNamespaceSelector(pods, namespaceSelector)
   372  	if err != nil {
   373  		return false, err
   374  	}
   375  
   376  	annotationsSelector, err := parseSelector(label.Label(selector.AnnotationSelectors).String())
   377  	if err != nil {
   378  		return false, err
   379  	}
   380  
   381  	pods = filterByAnnotations(pods, annotationsSelector)
   382  
   383  	phaseSelector, err := parseSelector(strings.Join(selector.PodPhaseSelectors, ","))
   384  	if err != nil {
   385  		return false, err
   386  	}
   387  	pods, err = filterByPhaseSelector(pods, phaseSelector)
   388  	if err != nil {
   389  		return false, err
   390  	}
   391  
   392  	if len(pods) > 0 {
   393  		return true, nil
   394  	}
   395  
   396  	return false, nil
   397  }
   398  
   399  func filterPodByNode(pods []v1.Pod, nodes []v1.Node) []v1.Pod {
   400  	if len(nodes) == 0 {
   401  		return nil
   402  	}
   403  	var filteredList []v1.Pod
   404  	for _, pod := range pods {
   405  		for _, node := range nodes {
   406  			if pod.Spec.NodeName == node.Name {
   407  				filteredList = append(filteredList, pod)
   408  			}
   409  		}
   410  	}
   411  	return filteredList
   412  }
   413  
   414  // filterPodsByMode filters pods by mode from pod list
   415  func filterPodsByMode(pods []v1.Pod, mode v1alpha1.SelectorMode, value string) ([]v1.Pod, error) {
   416  	if len(pods) == 0 {
   417  		return nil, errors.New("cannot generate pods from empty list")
   418  	}
   419  
   420  	switch mode {
   421  	case v1alpha1.OneMode:
   422  		index := getRandomNumber(len(pods))
   423  		pod := pods[index]
   424  
   425  		return []v1.Pod{pod}, nil
   426  	case v1alpha1.AllMode:
   427  		return pods, nil
   428  	case v1alpha1.FixedMode:
   429  		num, err := strconv.Atoi(value)
   430  		if err != nil {
   431  			return nil, err
   432  		}
   433  
   434  		if len(pods) < num {
   435  			num = len(pods)
   436  		}
   437  
   438  		if num <= 0 {
   439  			return nil, errors.New("cannot select any pod as value below or equal 0")
   440  		}
   441  
   442  		return getFixedSubListFromPodList(pods, num), nil
   443  	case v1alpha1.FixedPercentMode:
   444  		percentage, err := strconv.Atoi(value)
   445  		if err != nil {
   446  			return nil, err
   447  		}
   448  
   449  		if percentage == 0 {
   450  			return nil, errors.New("cannot select any pod as value below or equal 0")
   451  		}
   452  
   453  		if percentage < 0 || percentage > 100 {
   454  			return nil, fmt.Errorf("fixed percentage value of %d is invalid, Must be (0,100]", percentage)
   455  		}
   456  
   457  		num := int(math.Floor(float64(len(pods)) * float64(percentage) / 100))
   458  
   459  		return getFixedSubListFromPodList(pods, num), nil
   460  	case v1alpha1.RandomMaxPercentMode:
   461  		maxPercentage, err := strconv.Atoi(value)
   462  		if err != nil {
   463  			return nil, err
   464  		}
   465  
   466  		if maxPercentage == 0 {
   467  			return nil, errors.New("cannot select any pod as value below or equal 0")
   468  		}
   469  
   470  		if maxPercentage < 0 || maxPercentage > 100 {
   471  			return nil, fmt.Errorf("fixed percentage value of %d is invalid, Must be [0-100]", maxPercentage)
   472  		}
   473  
   474  		percentage := getRandomNumber(maxPercentage + 1) // + 1 because Intn works with half open interval [0,n) and we want [0,n]
   475  		num := int(math.Floor(float64(len(pods)) * float64(percentage) / 100))
   476  
   477  		return getFixedSubListFromPodList(pods, num), nil
   478  	default:
   479  		return nil, fmt.Errorf("mode %s not supported", mode)
   480  	}
   481  }
   482  
   483  // filterByAnnotations filters a list of pods by a given annotation selector.
   484  func filterByAnnotations(pods []v1.Pod, annotations labels.Selector) []v1.Pod {
   485  	// empty filter returns original list
   486  	if annotations.Empty() {
   487  		return pods
   488  	}
   489  
   490  	var filteredList []v1.Pod
   491  
   492  	for _, pod := range pods {
   493  		// convert the pod's annotations to an equivalent label selector
   494  		selector := labels.Set(pod.Annotations)
   495  
   496  		// include pod if its annotations match the selector
   497  		if annotations.Matches(selector) {
   498  			filteredList = append(filteredList, pod)
   499  		}
   500  	}
   501  
   502  	return filteredList
   503  }
   504  
   505  // filterByPhaseSet filters a list of pods by a given PodPhase selector.
   506  func filterByPhaseSelector(pods []v1.Pod, phases labels.Selector) ([]v1.Pod, error) {
   507  	if phases.Empty() {
   508  		return pods, nil
   509  	}
   510  
   511  	reqs, _ := phases.Requirements()
   512  	var (
   513  		reqIncl []labels.Requirement
   514  		reqExcl []labels.Requirement
   515  
   516  		filteredList []v1.Pod
   517  	)
   518  
   519  	for _, req := range reqs {
   520  		switch req.Operator() {
   521  		case selection.Exists:
   522  			reqIncl = append(reqIncl, req)
   523  		case selection.DoesNotExist:
   524  			reqExcl = append(reqExcl, req)
   525  		default:
   526  			return nil, fmt.Errorf("unsupported operator: %s", req.Operator())
   527  		}
   528  	}
   529  
   530  	for _, pod := range pods {
   531  		included := len(reqIncl) == 0
   532  		selector := labels.Set{string(pod.Status.Phase): ""}
   533  
   534  		// include pod if one including requirement matches
   535  		for _, req := range reqIncl {
   536  			if req.Matches(selector) {
   537  				included = true
   538  				break
   539  			}
   540  		}
   541  
   542  		// exclude pod if it is filtered out by at least one excluding requirement
   543  		for _, req := range reqExcl {
   544  			if !req.Matches(selector) {
   545  				included = false
   546  				break
   547  			}
   548  		}
   549  
   550  		if included {
   551  			filteredList = append(filteredList, pod)
   552  		}
   553  	}
   554  
   555  	return filteredList, nil
   556  }
   557  
   558  func filterByNamespaces(ctx context.Context, c client.Client, pods []v1.Pod) []v1.Pod {
   559  	var filteredList []v1.Pod
   560  
   561  	for _, pod := range pods {
   562  		ok, err := IsAllowedNamespaces(ctx, c, pod.Namespace)
   563  		if err != nil {
   564  			log.Error(err, "fail to check whether this namespace is allowed", "namespace", pod.Namespace)
   565  			continue
   566  		}
   567  
   568  		if ok {
   569  			filteredList = append(filteredList, pod)
   570  		} else {
   571  			log.Info("namespace is not enabled for chaos-mesh", "namespace", pod.Namespace)
   572  		}
   573  	}
   574  	return filteredList
   575  }
   576  
   577  func IsAllowedNamespaces(ctx context.Context, c client.Client, namespace string) (bool, error) {
   578  	ns := &v1.Namespace{}
   579  
   580  	err := c.Get(ctx, types.NamespacedName{Name: namespace}, ns)
   581  	if err != nil {
   582  		return false, err
   583  	}
   584  
   585  	if ns.Annotations[injectAnnotationKey] == "enabled" {
   586  		return true, nil
   587  	}
   588  
   589  	return false, nil
   590  }
   591  
   592  // filterByNamespaceSelector filters a list of pods by a given namespace selector.
   593  func filterByNamespaceSelector(pods []v1.Pod, namespaces labels.Selector) ([]v1.Pod, error) {
   594  	// empty filter returns original list
   595  	if namespaces.Empty() {
   596  		return pods, nil
   597  	}
   598  
   599  	// split requirements into including and excluding groups
   600  	reqs, _ := namespaces.Requirements()
   601  
   602  	var (
   603  		reqIncl []labels.Requirement
   604  		reqExcl []labels.Requirement
   605  
   606  		filteredList []v1.Pod
   607  	)
   608  
   609  	for _, req := range reqs {
   610  		switch req.Operator() {
   611  		case selection.Exists:
   612  			reqIncl = append(reqIncl, req)
   613  		case selection.DoesNotExist:
   614  			reqExcl = append(reqExcl, req)
   615  		default:
   616  			return nil, fmt.Errorf("unsupported operator: %s", req.Operator())
   617  		}
   618  	}
   619  
   620  	for _, pod := range pods {
   621  		// if there aren't any including requirements, we're in by default
   622  		included := len(reqIncl) == 0
   623  
   624  		// convert the pod's namespace to an equivalent label selector
   625  		selector := labels.Set{pod.Namespace: ""}
   626  
   627  		// include pod if one including requirement matches
   628  		for _, req := range reqIncl {
   629  			if req.Matches(selector) {
   630  				included = true
   631  				break
   632  			}
   633  		}
   634  
   635  		// exclude pod if it is filtered out by at least one excluding requirement
   636  		for _, req := range reqExcl {
   637  			if !req.Matches(selector) {
   638  				included = false
   639  				break
   640  			}
   641  		}
   642  
   643  		if included {
   644  			filteredList = append(filteredList, pod)
   645  		}
   646  	}
   647  
   648  	return filteredList, nil
   649  }
   650  
   651  func parseSelector(str string) (labels.Selector, error) {
   652  	selector, err := labels.Parse(str)
   653  	if err != nil {
   654  		return nil, err
   655  	}
   656  	return selector, nil
   657  }
   658  
   659  func getFixedSubListFromPodList(pods []v1.Pod, num int) []v1.Pod {
   660  	indexes := RandomFixedIndexes(0, uint(len(pods)), uint(num))
   661  
   662  	var filteredPods []v1.Pod
   663  
   664  	for _, index := range indexes {
   665  		index := index
   666  		filteredPods = append(filteredPods, pods[index])
   667  	}
   668  
   669  	return filteredPods
   670  }
   671  
   672  // RandomFixedIndexes returns the `count` random indexes between `start` and `end`.
   673  // [start, end)
   674  func RandomFixedIndexes(start, end, count uint) []uint {
   675  	var indexes []uint
   676  	m := make(map[uint]uint, count)
   677  
   678  	if end < start {
   679  		return indexes
   680  	}
   681  
   682  	if count > end-start {
   683  		for i := start; i < end; i++ {
   684  			indexes = append(indexes, i)
   685  		}
   686  
   687  		return indexes
   688  	}
   689  
   690  	for i := 0; i < int(count); {
   691  		index := uint(getRandomNumber(int(end-start))) + start
   692  
   693  		_, exist := m[index]
   694  		if exist {
   695  			continue
   696  		}
   697  
   698  		m[index] = index
   699  		indexes = append(indexes, index)
   700  		i++
   701  	}
   702  
   703  	return indexes
   704  }
   705  
   706  func getRandomNumber(max int) uint64 {
   707  	num, _ := rand.Int(rand.Reader, big.NewInt(int64(max)))
   708  	return num.Uint64()
   709  }
   710