1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package partition
17
18 import (
19 "context"
20 "strings"
21
22 "github.com/go-logr/logr"
23 "github.com/pkg/errors"
24 v1 "k8s.io/api/core/v1"
25 k8sError "k8s.io/apimachinery/pkg/api/errors"
26 "k8s.io/apimachinery/pkg/types"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/podnetworkchaosmanager"
31 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
32 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
33 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/iptable"
34 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/netutils"
35 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
36 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
37 )
38
39 var _ impltypes.ChaosImpl = (*Impl)(nil)
40
41 const (
42 sourceIPSetPostFix = "src"
43 targetIPSetPostFix = "tgt"
44 )
45
46 type Impl struct {
47 client.Client
48
49 builder *podnetworkchaosmanager.Builder
50
51 Log logr.Logger
52 }
53
54 const (
55 waitForApplySync v1alpha1.Phase = "Not Injected/Wait"
56 waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
57 )
58
59 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
60 impl.Log.Info("partition Apply", "chaos", obj)
61 networkchaos, ok := obj.(*v1alpha1.NetworkChaos)
62 if !ok {
63 err := errors.New("chaos is not NetworkChaos")
64 impl.Log.Error(err, "chaos is not NetworkChaos", "chaos", obj)
65 return v1alpha1.NotInjected, err
66 }
67 if networkchaos.Status.Instances == nil {
68 networkchaos.Status.Instances = make(map[string]int64)
69 }
70
71 record := records[index]
72 phase := record.Phase
73
74 if phase == waitForApplySync {
75 podnetworkchaos := &v1alpha1.PodNetworkChaos{}
76 namespacedName, err := controller.ParseNamespacedName(record.Id)
77 if err != nil {
78 return waitForApplySync, err
79 }
80 err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
81 if err != nil {
82 if k8sError.IsNotFound(err) {
83 return v1alpha1.NotInjected, nil
84 }
85
86 if k8sError.IsForbidden(err) {
87 if strings.Contains(err.Error(), "because it is being terminated") {
88 return v1alpha1.NotInjected, nil
89 }
90 }
91
92 return waitForApplySync, err
93 }
94
95 if podnetworkchaos.Status.FailedMessage != "" {
96 return waitForApplySync, errors.New(podnetworkchaos.Status.FailedMessage)
97 }
98
99 if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
100 return v1alpha1.Injected, nil
101 }
102
103 return waitForApplySync, nil
104 }
105
106 var pod v1.Pod
107 namespacedName, err := controller.ParseNamespacedName(record.Id)
108 if err != nil {
109 return v1alpha1.NotInjected, err
110 }
111 err = impl.Client.Get(ctx, namespacedName, &pod)
112 if err != nil {
113
114 return v1alpha1.NotInjected, err
115 }
116
117 source := networkchaos.Namespace + "/" + networkchaos.Name
118 m := func() *podnetworkchaosmanager.PodNetworkManager {
119 shouldInit := true
120
121 if record.SelectorKey == ".Target" {
122 for _, r := range records {
123 if r.Id == record.Id {
124
125
126 shouldInit = false
127 }
128 }
129 }
130
131 if shouldInit {
132 return impl.builder.WithInit(source, types.NamespacedName{
133 Namespace: pod.Namespace,
134 Name: pod.Name,
135 })
136 }
137
138 return impl.builder.Build(source, types.NamespacedName{
139 Namespace: pod.Namespace,
140 Name: pod.Name,
141 })
142 }()
143
144 if record.SelectorKey == "." {
145 shouldCommit := false
146
147 if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
148 var targets []*v1alpha1.Record
149 for _, record := range records {
150 if record.SelectorKey == ".Target" {
151 targets = append(targets, record)
152 }
153 }
154
155 err := impl.SetDrop(ctx, m, targets, networkchaos, targetIPSetPostFix, v1alpha1.Output, networkchaos.Spec.Device)
156 if err != nil {
157 return v1alpha1.NotInjected, err
158 }
159
160 shouldCommit = true
161 }
162
163 if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
164 var targets []*v1alpha1.Record
165 for _, record := range records {
166 if record.SelectorKey == ".Target" {
167 targets = append(targets, record)
168 }
169 }
170
171 err := impl.SetDrop(ctx, m, targets, networkchaos, targetIPSetPostFix, v1alpha1.Input, networkchaos.Spec.Device)
172 if err != nil {
173 return v1alpha1.NotInjected, err
174 }
175
176 shouldCommit = true
177 }
178
179 if shouldCommit {
180 generationNumber, err := m.Commit(ctx, networkchaos)
181 if err != nil {
182 return v1alpha1.NotInjected, err
183 }
184
185
186 networkchaos.Status.Instances[record.Id] = generationNumber
187 return waitForApplySync, nil
188 }
189
190 return v1alpha1.Injected, nil
191 } else if record.SelectorKey == ".Target" {
192 shouldCommit := false
193
194 if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
195 var targets []*v1alpha1.Record
196 for _, record := range records {
197 if record.SelectorKey == "." {
198 targets = append(targets, record)
199 }
200 }
201
202 err := impl.SetDrop(ctx, m, targets, networkchaos, sourceIPSetPostFix, v1alpha1.Output, networkchaos.Spec.TargetDevice)
203 if err != nil {
204 return v1alpha1.NotInjected, err
205 }
206
207 shouldCommit = true
208 }
209
210 if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
211 var targets []*v1alpha1.Record
212 for _, record := range records {
213 if record.SelectorKey == "." {
214 targets = append(targets, record)
215 }
216 }
217
218 err := impl.SetDrop(ctx, m, targets, networkchaos, sourceIPSetPostFix, v1alpha1.Input, networkchaos.Spec.TargetDevice)
219 if err != nil {
220 return v1alpha1.NotInjected, err
221 }
222
223 shouldCommit = true
224 }
225
226 if shouldCommit {
227 generationNumber, err := m.Commit(ctx, networkchaos)
228 if err != nil {
229 return v1alpha1.NotInjected, err
230 }
231
232
233 networkchaos.Status.Instances[record.Id] = generationNumber
234 return waitForApplySync, nil
235 }
236
237 return v1alpha1.Injected, nil
238 } else {
239 impl.Log.Info("unknown selector key", "record", record)
240 return v1alpha1.NotInjected, nil
241 }
242 }
243
244 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
245 networkchaos, ok := obj.(*v1alpha1.NetworkChaos)
246 if !ok {
247 err := errors.New("chaos is not NetworkChaos")
248 impl.Log.Error(err, "chaos is not NetworkChaos", "chaos", obj)
249 return v1alpha1.Injected, err
250 }
251 if networkchaos.Status.Instances == nil {
252 networkchaos.Status.Instances = make(map[string]int64)
253 }
254
255 record := records[index]
256 phase := record.Phase
257
258 if phase == waitForRecoverSync {
259 podnetworkchaos := &v1alpha1.PodNetworkChaos{}
260 namespacedName, err := controller.ParseNamespacedName(record.Id)
261 if err != nil {
262
263 return waitForApplySync, err
264 }
265 err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
266 if err != nil {
267
268 if k8sError.IsNotFound(err) {
269 return v1alpha1.NotInjected, nil
270 }
271 return waitForRecoverSync, err
272 }
273
274 if podnetworkchaos.Status.FailedMessage != "" {
275 return waitForRecoverSync, errors.New(podnetworkchaos.Status.FailedMessage)
276 }
277
278 if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
279 return v1alpha1.NotInjected, nil
280 }
281
282 return waitForRecoverSync, nil
283 }
284
285 var pod v1.Pod
286 namespacedName, err := controller.ParseNamespacedName(record.Id)
287 if err != nil {
288
289 return v1alpha1.NotInjected, err
290 }
291 err = impl.Client.Get(ctx, namespacedName, &pod)
292 if err != nil {
293
294 if k8sError.IsNotFound(err) {
295 return v1alpha1.NotInjected, nil
296 }
297 return v1alpha1.Injected, err
298 }
299
300 source := networkchaos.Namespace + "/" + networkchaos.Name
301 m := impl.builder.WithInit(source, types.NamespacedName{
302 Namespace: pod.Namespace,
303 Name: pod.Name,
304 })
305 generationNumber, err := m.Commit(ctx, networkchaos)
306 if err != nil {
307 if err == podnetworkchaosmanager.ErrPodNotFound || err == podnetworkchaosmanager.ErrPodNotRunning {
308 return v1alpha1.NotInjected, nil
309 }
310
311 if k8sError.IsForbidden(err) {
312 if strings.Contains(err.Error(), "because it is being terminated") {
313 return v1alpha1.NotInjected, nil
314 }
315 }
316 return v1alpha1.Injected, err
317 }
318
319
320 networkchaos.Status.Instances[record.Id] = generationNumber
321 return waitForRecoverSync, nil
322 }
323
324 func (impl *Impl) SetDrop(ctx context.Context, m *podnetworkchaosmanager.PodNetworkManager, targets []*v1alpha1.Record, networkchaos *v1alpha1.NetworkChaos, ipSetPostFix string, chainDirection v1alpha1.ChainDirection, device string) error {
325 externalCidrs, err := netutils.ResolveCidrs(networkchaos.Spec.ExternalTargets)
326 if err != nil {
327 return err
328 }
329
330 pbChainDirection := pb.Chain_OUTPUT
331 if chainDirection == v1alpha1.Input {
332 pbChainDirection = pb.Chain_INPUT
333 }
334 if len(targets)+len(externalCidrs) == 0 {
335 impl.Log.Info("apply traffic control", "sources", m.Source)
336 m.T.Append(v1alpha1.RawIptables{
337 Name: iptable.GenerateName(pbChainDirection, networkchaos),
338 Direction: chainDirection,
339 IPSets: nil,
340 RawRuleSource: v1alpha1.RawRuleSource{
341 Source: m.Source,
342 },
343 Device: device,
344 })
345 return nil
346 }
347
348 targetPods := []v1.Pod{}
349 for _, record := range targets {
350 var pod v1.Pod
351 namespacedName, err := controller.ParseNamespacedName(record.Id)
352 if err != nil {
353
354 return err
355 }
356 err = impl.Client.Get(ctx, namespacedName, &pod)
357 if err != nil {
358
359 return err
360 }
361 targetPods = append(targetPods, pod)
362 }
363 dstIPSets := ipset.BuildIPSets(targetPods, externalCidrs, networkchaos, ipSetPostFix, m.Source)
364 dstSetIPSet := ipset.BuildSetIPSet(dstIPSets, networkchaos, ipSetPostFix, m.Source)
365
366 for _, ipSet := range dstIPSets {
367 m.T.Append(ipSet)
368 }
369
370 m.T.Append(dstSetIPSet)
371
372 m.T.Append(v1alpha1.RawIptables{
373 Name: iptable.GenerateName(pbChainDirection, networkchaos),
374 Direction: chainDirection,
375 IPSets: []string{dstSetIPSet.Name},
376 RawRuleSource: v1alpha1.RawRuleSource{
377 Source: m.Source,
378 },
379 Device: device,
380 })
381
382 return nil
383 }
384
385 func NewImpl(c client.Client, b *podnetworkchaosmanager.Builder, log logr.Logger) *Impl {
386 return &Impl{
387 Client: c,
388 builder: b,
389 Log: log.WithName("partition"),
390 }
391 }
392