1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package pod
17
18 import (
19 "context"
20
21 "github.com/pkg/errors"
22 "go.uber.org/fx"
23 v1 "k8s.io/api/core/v1"
24 apierrors "k8s.io/apimachinery/pkg/api/errors"
25 "k8s.io/apimachinery/pkg/types"
26 "sigs.k8s.io/controller-runtime/pkg/client"
27
28 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
29 "github.com/chaos-mesh/chaos-mesh/controllers/config"
30 "github.com/chaos-mesh/chaos-mesh/pkg/log"
31 "github.com/chaos-mesh/chaos-mesh/pkg/mock"
32 "github.com/chaos-mesh/chaos-mesh/pkg/selector/generic"
33 genericannotation "github.com/chaos-mesh/chaos-mesh/pkg/selector/generic/annotation"
34 genericfield "github.com/chaos-mesh/chaos-mesh/pkg/selector/generic/field"
35 genericlabel "github.com/chaos-mesh/chaos-mesh/pkg/selector/generic/label"
36 genericnamespace "github.com/chaos-mesh/chaos-mesh/pkg/selector/generic/namespace"
37 "github.com/chaos-mesh/chaos-mesh/pkg/selector/generic/registry"
38 )
39
40 var ErrNoPodSelected = errors.New("no pod is selected")
41
42 type SelectImpl struct {
43 c client.Client
44 r client.Reader
45
46 generic.Option
47 }
48
49 type Pod struct {
50 v1.Pod
51 }
52
53 func (pod *Pod) Id() string {
54 return (types.NamespacedName{
55 Name: pod.Name,
56 Namespace: pod.Namespace,
57 }).String()
58 }
59
60 func (impl *SelectImpl) Select(ctx context.Context, ps *v1alpha1.PodSelector) ([]*Pod, error) {
61 if ps == nil {
62 return []*Pod{}, nil
63 }
64
65 pods, err := SelectAndFilterPods(ctx, impl.c, impl.r, ps, impl.ClusterScoped, impl.TargetNamespace, impl.EnableFilterNamespace)
66 if err != nil {
67 return nil, err
68 }
69
70 var result []*Pod
71 for _, pod := range pods {
72 result = append(result, &Pod{
73 pod,
74 })
75 }
76
77 return result, nil
78 }
79
80 type Params struct {
81 fx.In
82
83 Client client.Client
84 Reader client.Reader `name:"no-cache"`
85 }
86
87 func New(params Params) *SelectImpl {
88 return &SelectImpl{
89 params.Client,
90 params.Reader,
91 generic.Option{
92 ClusterScoped: config.ControllerCfg.ClusterScoped,
93 TargetNamespace: config.ControllerCfg.TargetNamespace,
94 EnableFilterNamespace: config.ControllerCfg.EnableFilterNamespace,
95 },
96 }
97 }
98
99
100
101 func SelectAndFilterPods(ctx context.Context, c client.Client, r client.Reader, spec *v1alpha1.PodSelector, clusterScoped bool, targetNamespace string, enableFilterNamespace bool) ([]v1.Pod, error) {
102 if pods := mock.On("MockSelectAndFilterPods"); pods != nil {
103 return pods.(func() []v1.Pod)(), nil
104 }
105 if err := mock.On("MockSelectedAndFilterPodsError"); err != nil {
106 return nil, err.(error)
107 }
108
109 selector := spec.Selector
110 mode := spec.Mode
111 value := spec.Value
112
113 pods, err := SelectPods(ctx, c, r, selector, clusterScoped, targetNamespace, enableFilterNamespace)
114 if err != nil {
115 return nil, err
116 }
117
118 if len(pods) == 0 {
119 return nil, ErrNoPodSelected
120 }
121
122 filteredPod, err := filterPodsByMode(pods, mode, value)
123 if err != nil {
124 return nil, err
125 }
126
127 return filteredPod, nil
128 }
129
130
131
132
133
134
135 func SelectPods(ctx context.Context, c client.Client, r client.Reader, selector v1alpha1.PodSelectorSpec, clusterScoped bool, targetNamespace string, enableFilterNamespace bool) ([]v1.Pod, error) {
136
137 if len(selector.Pods) > 0 {
138 return selectSpecifiedPods(ctx, c, selector, clusterScoped, targetNamespace, enableFilterNamespace)
139 }
140
141 selectorRegistry := newSelectorRegistry(ctx, c, selector)
142 selectorChain, err := registry.Parse(selectorRegistry, selector.GenericSelectorSpec, generic.Option{
143 ClusterScoped: clusterScoped,
144 TargetNamespace: targetNamespace,
145 EnableFilterNamespace: enableFilterNamespace,
146 })
147 if err != nil {
148 return nil, err
149 }
150
151 return listPods(ctx, c, r, selector, selectorChain, enableFilterNamespace)
152 }
153
154 func selectSpecifiedPods(ctx context.Context, c client.Client, spec v1alpha1.PodSelectorSpec,
155 clusterScoped bool, targetNamespace string, enableFilterNamespace bool) ([]v1.Pod, error) {
156 var pods []v1.Pod
157 namespaceCheck := make(map[string]bool)
158 logger, err := log.NewDefaultZapLogger()
159 if err != nil {
160 return pods, errors.Wrap(err, "failed to create logger")
161 }
162 for ns, names := range spec.Pods {
163 if !clusterScoped {
164 if targetNamespace != ns {
165 log.L().WithName("pod-selector").Info("skip namespace because ns is out of scope within namespace scoped mode", "namespace", ns)
166 continue
167 }
168 }
169
170 if enableFilterNamespace {
171 allow, ok := namespaceCheck[ns]
172 if !ok {
173 allow = genericnamespace.CheckNamespace(ctx, c, ns, logger)
174 namespaceCheck[ns] = allow
175 }
176 if !allow {
177 continue
178 }
179 }
180 for _, name := range names {
181 var pod v1.Pod
182 err := c.Get(ctx, types.NamespacedName{
183 Namespace: ns,
184 Name: name,
185 }, &pod)
186 if err == nil {
187 pods = append(pods, pod)
188 continue
189 }
190
191 if apierrors.IsNotFound(err) {
192 log.L().WithName("pod-selector").Info("pod is not found, skip it", "namespace", ns, "pod name", name)
193 continue
194 }
195
196 return nil, err
197 }
198 }
199 return pods, nil
200 }
201
202
203
204
205 func CheckPodMeetSelector(ctx context.Context, c client.Client, pod v1.Pod, selector v1alpha1.PodSelectorSpec, clusterScoped bool, targetNamespace string, enableFilterNamespace bool) (bool, error) {
206 if len(selector.Pods) > 0 {
207 meet := false
208 for ns, names := range selector.Pods {
209 if pod.Namespace != ns {
210 continue
211 }
212
213 for _, name := range names {
214 if pod.Name == name {
215 meet = true
216 }
217 }
218
219 if !meet {
220 return false, nil
221 }
222 }
223 }
224
225 selectorRegistry := newSelectorRegistry(ctx, c, selector)
226 selectorChain, err := registry.Parse(selectorRegistry, selector.GenericSelectorSpec, generic.Option{
227 ClusterScoped: clusterScoped,
228 TargetNamespace: targetNamespace,
229 EnableFilterNamespace: enableFilterNamespace,
230 })
231 if err != nil {
232 return false, err
233 }
234
235 return selectorChain.Match(&pod), nil
236 }
237
238 func newSelectorRegistry(ctx context.Context, c client.Client, spec v1alpha1.PodSelectorSpec) registry.Registry {
239 return map[string]registry.SelectorFactory{
240 genericlabel.Name: genericlabel.New,
241 genericnamespace.Name: genericnamespace.New,
242 genericfield.Name: genericfield.New,
243 genericannotation.Name: genericannotation.New,
244 nodeSelectorName: func(selector v1alpha1.GenericSelectorSpec, _ generic.Option) (generic.Selector, error) {
245 return newNodeSelector(ctx, c, spec)
246 },
247 phaseSelectorName: func(selector v1alpha1.GenericSelectorSpec, _ generic.Option) (generic.Selector, error) {
248 return newPhaseSelector(spec)
249 },
250 }
251 }
252
253 func listPods(ctx context.Context, c client.Client, r client.Reader, spec v1alpha1.PodSelectorSpec,
254 selectorChain generic.SelectorChain, enableFilterNamespace bool) ([]v1.Pod, error) {
255 var pods []v1.Pod
256 namespaceCheck := make(map[string]bool)
257 logger, err := log.NewDefaultZapLogger()
258 if err != nil {
259 return pods, errors.Wrap(err, "failed to create logger")
260 }
261 if err := selectorChain.ListObjects(c, r,
262 func(listFunc generic.ListFunc, opts client.ListOptions) error {
263 var podList v1.PodList
264 if len(spec.Namespaces) > 0 {
265 for _, namespace := range spec.Namespaces {
266 if enableFilterNamespace {
267 allow, ok := namespaceCheck[namespace]
268 if !ok {
269 allow = genericnamespace.CheckNamespace(ctx, c, namespace, logger)
270 namespaceCheck[namespace] = allow
271 }
272 if !allow {
273 continue
274 }
275 }
276
277 opts.Namespace = namespace
278 if err := listFunc(ctx, &podList, &opts); err != nil {
279 return err
280 }
281 pods = append(pods, podList.Items...)
282 }
283 } else {
284
285 if err := listFunc(ctx, &podList, &opts); err != nil {
286 return err
287 }
288 pods = append(pods, podList.Items...)
289 }
290 return nil
291 }); err != nil {
292 return nil, err
293 }
294
295 filterPods := make([]v1.Pod, 0, len(pods))
296 for _, pod := range pods {
297 pod := pod
298 if selectorChain.Match(&pod) {
299 filterPods = append(filterPods, pod)
300 }
301 }
302 return filterPods, nil
303 }
304
305
306 func filterPodsByMode(pods []v1.Pod, mode v1alpha1.SelectorMode, value string) ([]v1.Pod, error) {
307 indexes, err := generic.FilterObjectsByMode(mode, value, len(pods))
308 if err != nil {
309 return nil, err
310 }
311
312 var filteredPods []v1.Pod
313
314 for _, index := range indexes {
315 index := index
316 filteredPods = append(filteredPods, pods[index])
317 }
318 return filteredPods, nil
319 }
320