...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/selector/pod/selector.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/selector/pod

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package pod
    17  
    18  import (
    19  	"context"
    20  
    21  	"github.com/pkg/errors"
    22  	"go.uber.org/fx"
    23  	v1 "k8s.io/api/core/v1"
    24  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	"sigs.k8s.io/controller-runtime/pkg/client"
    27  
    28  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    29  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    30  	"github.com/chaos-mesh/chaos-mesh/pkg/log"
    31  	"github.com/chaos-mesh/chaos-mesh/pkg/mock"
    32  	"github.com/chaos-mesh/chaos-mesh/pkg/selector/generic"
    33  	genericannotation "github.com/chaos-mesh/chaos-mesh/pkg/selector/generic/annotation"
    34  	genericfield "github.com/chaos-mesh/chaos-mesh/pkg/selector/generic/field"
    35  	genericlabel "github.com/chaos-mesh/chaos-mesh/pkg/selector/generic/label"
    36  	genericnamespace "github.com/chaos-mesh/chaos-mesh/pkg/selector/generic/namespace"
    37  	"github.com/chaos-mesh/chaos-mesh/pkg/selector/generic/registry"
    38  )
    39  
    40  var ErrNoPodSelected = errors.New("no pod is selected")
    41  
    42  type SelectImpl struct {
    43  	c client.Client
    44  	r client.Reader
    45  
    46  	generic.Option
    47  }
    48  
    49  type Pod struct {
    50  	v1.Pod
    51  }
    52  
    53  func (pod *Pod) Id() string {
    54  	return (types.NamespacedName{
    55  		Name:      pod.Name,
    56  		Namespace: pod.Namespace,
    57  	}).String()
    58  }
    59  
    60  func (impl *SelectImpl) Select(ctx context.Context, ps *v1alpha1.PodSelector) ([]*Pod, error) {
    61  	if ps == nil {
    62  		return []*Pod{}, nil
    63  	}
    64  
    65  	pods, err := SelectAndFilterPods(ctx, impl.c, impl.r, ps, impl.ClusterScoped, impl.TargetNamespace, impl.EnableFilterNamespace)
    66  	if err != nil {
    67  		return nil, err
    68  	}
    69  
    70  	var result []*Pod
    71  	for _, pod := range pods {
    72  		result = append(result, &Pod{
    73  			pod,
    74  		})
    75  	}
    76  
    77  	return result, nil
    78  }
    79  
    80  type Params struct {
    81  	fx.In
    82  
    83  	Client client.Client
    84  	Reader client.Reader `name:"no-cache"`
    85  }
    86  
    87  func New(params Params) *SelectImpl {
    88  	return &SelectImpl{
    89  		params.Client,
    90  		params.Reader,
    91  		generic.Option{
    92  			ClusterScoped:         config.ControllerCfg.ClusterScoped,
    93  			TargetNamespace:       config.ControllerCfg.TargetNamespace,
    94  			EnableFilterNamespace: config.ControllerCfg.EnableFilterNamespace,
    95  		},
    96  	}
    97  }
    98  
    99  // SelectAndFilterPods returns the list of pods that filtered by selector and SelectorMode
   100  // Deprecated: use pod.SelectImpl as instead
   101  func SelectAndFilterPods(ctx context.Context, c client.Client, r client.Reader, spec *v1alpha1.PodSelector, clusterScoped bool, targetNamespace string, enableFilterNamespace bool) ([]v1.Pod, error) {
   102  	if pods := mock.On("MockSelectAndFilterPods"); pods != nil {
   103  		return pods.(func() []v1.Pod)(), nil
   104  	}
   105  	if err := mock.On("MockSelectedAndFilterPodsError"); err != nil {
   106  		return nil, err.(error)
   107  	}
   108  
   109  	selector := spec.Selector
   110  	mode := spec.Mode
   111  	value := spec.Value
   112  
   113  	pods, err := SelectPods(ctx, c, r, selector, clusterScoped, targetNamespace, enableFilterNamespace)
   114  	if err != nil {
   115  		return nil, err
   116  	}
   117  
   118  	if len(pods) == 0 {
   119  		return nil, ErrNoPodSelected
   120  	}
   121  
   122  	filteredPod, err := filterPodsByMode(pods, mode, value)
   123  	if err != nil {
   124  		return nil, err
   125  	}
   126  
   127  	return filteredPod, nil
   128  }
   129  
   130  //revive:disable:flag-parameter
   131  
   132  // SelectPods returns the list of pods that are available for pod chaos action.
   133  // It returns all pods that match the configured label, annotation and namespace selectors.
   134  // If pods are specifically specified by `selector.Pods`, it just returns the selector.Pods.
   135  func SelectPods(ctx context.Context, c client.Client, r client.Reader, selector v1alpha1.PodSelectorSpec, clusterScoped bool, targetNamespace string, enableFilterNamespace bool) ([]v1.Pod, error) {
   136  	// pods are specifically specified
   137  	if len(selector.Pods) > 0 {
   138  		return selectSpecifiedPods(ctx, c, selector, clusterScoped, targetNamespace, enableFilterNamespace)
   139  	}
   140  
   141  	selectorRegistry := newSelectorRegistry(ctx, c, selector)
   142  	selectorChain, err := registry.Parse(selectorRegistry, selector.GenericSelectorSpec, generic.Option{
   143  		ClusterScoped:         clusterScoped,
   144  		TargetNamespace:       targetNamespace,
   145  		EnableFilterNamespace: enableFilterNamespace,
   146  	})
   147  	if err != nil {
   148  		return nil, err
   149  	}
   150  
   151  	return listPods(ctx, c, r, selector, selectorChain, enableFilterNamespace)
   152  }
   153  
   154  func selectSpecifiedPods(ctx context.Context, c client.Client, spec v1alpha1.PodSelectorSpec,
   155  	clusterScoped bool, targetNamespace string, enableFilterNamespace bool) ([]v1.Pod, error) {
   156  	var pods []v1.Pod
   157  	namespaceCheck := make(map[string]bool)
   158  	logger, err := log.NewDefaultZapLogger()
   159  	if err != nil {
   160  		return pods, errors.Wrap(err, "failed to create logger")
   161  	}
   162  	for ns, names := range spec.Pods {
   163  		if !clusterScoped {
   164  			if targetNamespace != ns {
   165  				log.L().WithName("pod-selector").Info("skip namespace because ns is out of scope within namespace scoped mode", "namespace", ns)
   166  				continue
   167  			}
   168  		}
   169  
   170  		if enableFilterNamespace {
   171  			allow, ok := namespaceCheck[ns]
   172  			if !ok {
   173  				allow = genericnamespace.CheckNamespace(ctx, c, ns, logger)
   174  				namespaceCheck[ns] = allow
   175  			}
   176  			if !allow {
   177  				continue
   178  			}
   179  		}
   180  		for _, name := range names {
   181  			var pod v1.Pod
   182  			err := c.Get(ctx, types.NamespacedName{
   183  				Namespace: ns,
   184  				Name:      name,
   185  			}, &pod)
   186  			if err == nil {
   187  				pods = append(pods, pod)
   188  				continue
   189  			}
   190  
   191  			if apierrors.IsNotFound(err) {
   192  				log.L().WithName("pod-selector").Info("pod is not found, skip it", "namespace", ns, "pod name", name)
   193  				continue
   194  			}
   195  
   196  			return nil, err
   197  		}
   198  	}
   199  	return pods, nil
   200  }
   201  
   202  //revive:enable:flag-parameter
   203  
   204  // CheckPodMeetSelector checks if this pod meets the selection criteria.
   205  func CheckPodMeetSelector(ctx context.Context, c client.Client, pod v1.Pod, selector v1alpha1.PodSelectorSpec, clusterScoped bool, targetNamespace string, enableFilterNamespace bool) (bool, error) {
   206  	if len(selector.Pods) > 0 {
   207  		meet := false
   208  		for ns, names := range selector.Pods {
   209  			if pod.Namespace != ns {
   210  				continue
   211  			}
   212  
   213  			for _, name := range names {
   214  				if pod.Name == name {
   215  					meet = true
   216  				}
   217  			}
   218  
   219  			if !meet {
   220  				return false, nil
   221  			}
   222  		}
   223  	}
   224  
   225  	selectorRegistry := newSelectorRegistry(ctx, c, selector)
   226  	selectorChain, err := registry.Parse(selectorRegistry, selector.GenericSelectorSpec, generic.Option{
   227  		ClusterScoped:         clusterScoped,
   228  		TargetNamespace:       targetNamespace,
   229  		EnableFilterNamespace: enableFilterNamespace,
   230  	})
   231  	if err != nil {
   232  		return false, err
   233  	}
   234  
   235  	return selectorChain.Match(&pod), nil
   236  }
   237  
   238  func newSelectorRegistry(ctx context.Context, c client.Client, spec v1alpha1.PodSelectorSpec) registry.Registry {
   239  	return map[string]registry.SelectorFactory{
   240  		genericlabel.Name:      genericlabel.New,
   241  		genericnamespace.Name:  genericnamespace.New,
   242  		genericfield.Name:      genericfield.New,
   243  		genericannotation.Name: genericannotation.New,
   244  		nodeSelectorName: func(selector v1alpha1.GenericSelectorSpec, _ generic.Option) (generic.Selector, error) {
   245  			return newNodeSelector(ctx, c, spec)
   246  		},
   247  		phaseSelectorName: func(selector v1alpha1.GenericSelectorSpec, _ generic.Option) (generic.Selector, error) {
   248  			return newPhaseSelector(spec)
   249  		},
   250  	}
   251  }
   252  
   253  func listPods(ctx context.Context, c client.Client, r client.Reader, spec v1alpha1.PodSelectorSpec,
   254  	selectorChain generic.SelectorChain, enableFilterNamespace bool) ([]v1.Pod, error) {
   255  	var pods []v1.Pod
   256  	namespaceCheck := make(map[string]bool)
   257  	logger, err := log.NewDefaultZapLogger()
   258  	if err != nil {
   259  		return pods, errors.Wrap(err, "failed to create logger")
   260  	}
   261  	if err := selectorChain.ListObjects(c, r,
   262  		func(listFunc generic.ListFunc, opts client.ListOptions) error {
   263  			var podList v1.PodList
   264  			if len(spec.Namespaces) > 0 {
   265  				for _, namespace := range spec.Namespaces {
   266  					if enableFilterNamespace {
   267  						allow, ok := namespaceCheck[namespace]
   268  						if !ok {
   269  							allow = genericnamespace.CheckNamespace(ctx, c, namespace, logger)
   270  							namespaceCheck[namespace] = allow
   271  						}
   272  						if !allow {
   273  							continue
   274  						}
   275  					}
   276  
   277  					opts.Namespace = namespace
   278  					if err := listFunc(ctx, &podList, &opts); err != nil {
   279  						return err
   280  					}
   281  					pods = append(pods, podList.Items...)
   282  				}
   283  			} else {
   284  				// in fact, this will never happen
   285  				if err := listFunc(ctx, &podList, &opts); err != nil {
   286  					return err
   287  				}
   288  				pods = append(pods, podList.Items...)
   289  			}
   290  			return nil
   291  		}); err != nil {
   292  		return nil, err
   293  	}
   294  
   295  	filterPods := make([]v1.Pod, 0, len(pods))
   296  	for _, pod := range pods {
   297  		pod := pod
   298  		if selectorChain.Match(&pod) {
   299  			filterPods = append(filterPods, pod)
   300  		}
   301  	}
   302  	return filterPods, nil
   303  }
   304  
   305  // filterPodsByMode filters pods by mode from pod list
   306  func filterPodsByMode(pods []v1.Pod, mode v1alpha1.SelectorMode, value string) ([]v1.Pod, error) {
   307  	indexes, err := generic.FilterObjectsByMode(mode, value, len(pods))
   308  	if err != nil {
   309  		return nil, err
   310  	}
   311  
   312  	var filteredPods []v1.Pod
   313  
   314  	for _, index := range indexes {
   315  		index := index
   316  		filteredPods = append(filteredPods, pods[index])
   317  	}
   318  	return filteredPods, nil
   319  }
   320