1
2
3
4
5
6
7
8
9
10
11
12
13
14 package partition
15
16 import (
17 "context"
18 "errors"
19 "strings"
20
21 "github.com/go-logr/logr"
22 v1 "k8s.io/api/core/v1"
23 k8sError "k8s.io/apimachinery/pkg/api/errors"
24 "k8s.io/apimachinery/pkg/types"
25 "sigs.k8s.io/controller-runtime/pkg/client"
26
27 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
28 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/podnetworkchaosmanager"
29 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
30 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/iptable"
31 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/netutils"
32 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
33 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
34 )
35
36 const (
37 sourceIPSetPostFix = "src"
38 targetIPSetPostFix = "tgt"
39 )
40
41 type Impl struct {
42 client.Client
43
44 builder *podnetworkchaosmanager.Builder
45
46 Log logr.Logger
47 }
48
49 const (
50 waitForApplySync v1alpha1.Phase = "Not Injected/Wait"
51 waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
52 )
53
54 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
55 impl.Log.Info("partition Apply", "chaos", obj)
56 networkchaos, ok := obj.(*v1alpha1.NetworkChaos)
57 if !ok {
58 err := errors.New("chaos is not NetworkChaos")
59 impl.Log.Error(err, "chaos is not NetworkChaos", "chaos", obj)
60 return v1alpha1.NotInjected, err
61 }
62 if networkchaos.Status.Instances == nil {
63 networkchaos.Status.Instances = make(map[string]int64)
64 }
65
66 record := records[index]
67 phase := record.Phase
68
69 if phase == waitForApplySync {
70 podnetworkchaos := &v1alpha1.PodNetworkChaos{}
71 err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), podnetworkchaos)
72 if err != nil {
73 if k8sError.IsNotFound(err) {
74 return v1alpha1.NotInjected, nil
75 }
76
77 if k8sError.IsForbidden(err) {
78 if strings.Contains(err.Error(), "because it is being terminated") {
79 return v1alpha1.NotInjected, nil
80 }
81 }
82
83 return waitForApplySync, err
84 }
85
86 if podnetworkchaos.Status.FailedMessage != "" {
87 return waitForApplySync, errors.New(podnetworkchaos.Status.FailedMessage)
88 }
89
90 if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
91 return v1alpha1.Injected, nil
92 }
93
94 return waitForApplySync, nil
95 }
96
97 var pod v1.Pod
98 err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
99 if err != nil {
100
101 return v1alpha1.NotInjected, err
102 }
103
104 source := networkchaos.Namespace + "/" + networkchaos.Name
105 m := func() *podnetworkchaosmanager.PodNetworkManager {
106 shouldInit := true
107
108 if record.SelectorKey == ".Target" {
109 for _, r := range records {
110 if r.Id == record.Id {
111
112
113 shouldInit = false
114 }
115 }
116 }
117
118 if shouldInit {
119 return impl.builder.WithInit(source, types.NamespacedName{
120 Namespace: pod.Namespace,
121 Name: pod.Name,
122 })
123 }
124
125 return impl.builder.Build(source, types.NamespacedName{
126 Namespace: pod.Namespace,
127 Name: pod.Name,
128 })
129 }()
130
131 if record.SelectorKey == "." {
132 shouldCommit := false
133
134 if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
135 var targets []*v1alpha1.Record
136 for _, record := range records {
137 if record.SelectorKey == ".Target" {
138 targets = append(targets, record)
139 }
140 }
141
142 err := impl.SetDrop(ctx, m, targets, networkchaos, targetIPSetPostFix, v1alpha1.Output)
143 if err != nil {
144 return v1alpha1.NotInjected, err
145 }
146
147 shouldCommit = true
148 }
149
150 if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
151 var targets []*v1alpha1.Record
152 for _, record := range records {
153 if record.SelectorKey == ".Target" {
154 targets = append(targets, record)
155 }
156 }
157
158 err := impl.SetDrop(ctx, m, targets, networkchaos, targetIPSetPostFix, v1alpha1.Input)
159 if err != nil {
160 return v1alpha1.NotInjected, err
161 }
162
163 shouldCommit = true
164 }
165
166 if shouldCommit {
167 generationNumber, err := m.Commit(ctx, networkchaos)
168 if err != nil {
169 return v1alpha1.NotInjected, err
170 }
171
172
173 networkchaos.Status.Instances[record.Id] = generationNumber
174 return waitForApplySync, nil
175 }
176
177 return v1alpha1.Injected, nil
178 } else if record.SelectorKey == ".Target" {
179 shouldCommit := false
180
181 if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
182 var targets []*v1alpha1.Record
183 for _, record := range records {
184 if record.SelectorKey == "." {
185 targets = append(targets, record)
186 }
187 }
188
189 err := impl.SetDrop(ctx, m, targets, networkchaos, sourceIPSetPostFix, v1alpha1.Output)
190 if err != nil {
191 return v1alpha1.NotInjected, err
192 }
193
194 shouldCommit = true
195 }
196
197 if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
198 var targets []*v1alpha1.Record
199 for _, record := range records {
200 if record.SelectorKey == "." {
201 targets = append(targets, record)
202 }
203 }
204
205 err := impl.SetDrop(ctx, m, targets, networkchaos, sourceIPSetPostFix, v1alpha1.Input)
206 if err != nil {
207 return v1alpha1.NotInjected, err
208 }
209
210 shouldCommit = true
211 }
212
213 if shouldCommit {
214 generationNumber, err := m.Commit(ctx, networkchaos)
215 if err != nil {
216 return v1alpha1.NotInjected, err
217 }
218
219
220 networkchaos.Status.Instances[record.Id] = generationNumber
221 return waitForApplySync, nil
222 }
223
224 return v1alpha1.Injected, nil
225 } else {
226 impl.Log.Info("unknown selector key", "record", record)
227 return v1alpha1.NotInjected, nil
228 }
229 }
230
231 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
232 networkchaos, ok := obj.(*v1alpha1.NetworkChaos)
233 if !ok {
234 err := errors.New("chaos is not NetworkChaos")
235 impl.Log.Error(err, "chaos is not NetworkChaos", "chaos", obj)
236 return v1alpha1.Injected, err
237 }
238 if networkchaos.Status.Instances == nil {
239 networkchaos.Status.Instances = make(map[string]int64)
240 }
241
242 record := records[index]
243 phase := record.Phase
244
245 if phase == waitForRecoverSync {
246 podnetworkchaos := &v1alpha1.PodNetworkChaos{}
247 err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), podnetworkchaos)
248 if err != nil {
249
250 if k8sError.IsNotFound(err) {
251 return v1alpha1.NotInjected, nil
252 }
253 return waitForRecoverSync, err
254 }
255
256 if podnetworkchaos.Status.FailedMessage != "" {
257 return waitForRecoverSync, errors.New(podnetworkchaos.Status.FailedMessage)
258 }
259
260 if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
261 return v1alpha1.NotInjected, nil
262 }
263
264 return waitForRecoverSync, nil
265 }
266
267 var pod v1.Pod
268 err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
269 if err != nil {
270
271 if k8sError.IsNotFound(err) {
272 return v1alpha1.NotInjected, nil
273 }
274 return v1alpha1.Injected, err
275 }
276
277 source := networkchaos.Namespace + "/" + networkchaos.Name
278 m := impl.builder.WithInit(source, types.NamespacedName{
279 Namespace: pod.Namespace,
280 Name: pod.Name,
281 })
282 generationNumber, err := m.Commit(ctx, networkchaos)
283 if err != nil {
284 if err == podnetworkchaosmanager.ErrPodNotFound || err == podnetworkchaosmanager.ErrPodNotRunning {
285 return v1alpha1.NotInjected, nil
286 }
287
288 if k8sError.IsForbidden(err) {
289 if strings.Contains(err.Error(), "because it is being terminated") {
290 return v1alpha1.NotInjected, nil
291 }
292 }
293 return v1alpha1.Injected, err
294 }
295
296
297 networkchaos.Status.Instances[record.Id] = generationNumber
298 return waitForRecoverSync, nil
299 }
300
301 func (impl *Impl) SetDrop(ctx context.Context, m *podnetworkchaosmanager.PodNetworkManager, targets []*v1alpha1.Record, networkchaos *v1alpha1.NetworkChaos, ipSetPostFix string, chainDirection v1alpha1.ChainDirection) error {
302 externalCidrs, err := netutils.ResolveCidrs(networkchaos.Spec.ExternalTargets)
303 if err != nil {
304 return err
305 }
306
307 pbChainDirection := pb.Chain_OUTPUT
308 if chainDirection == v1alpha1.Input {
309 pbChainDirection = pb.Chain_INPUT
310 }
311 if len(targets)+len(externalCidrs) == 0 {
312 impl.Log.Info("apply traffic control", "sources", m.Source)
313 m.T.Append(v1alpha1.RawIptables{
314 Name: iptable.GenerateName(pbChainDirection, networkchaos),
315 Direction: chainDirection,
316 IPSets: nil,
317 RawRuleSource: v1alpha1.RawRuleSource{
318 Source: m.Source,
319 },
320 })
321 return nil
322 }
323
324 targetPods := []v1.Pod{}
325 for _, record := range targets {
326 var pod v1.Pod
327 err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
328 if err != nil {
329
330 return err
331 }
332 targetPods = append(targetPods, pod)
333 }
334 dstIpset := ipset.BuildIPSet(targetPods, externalCidrs, networkchaos, ipSetPostFix, m.Source)
335 m.T.Append(dstIpset)
336 m.T.Append(v1alpha1.RawIptables{
337 Name: iptable.GenerateName(pbChainDirection, networkchaos),
338 Direction: chainDirection,
339 IPSets: []string{dstIpset.Name},
340 RawRuleSource: v1alpha1.RawRuleSource{
341 Source: m.Source,
342 },
343 })
344
345 return nil
346 }
347
348 func NewImpl(c client.Client, b *podnetworkchaosmanager.Builder, log logr.Logger) *Impl {
349 return &Impl{
350 Client: c,
351 builder: b,
352 Log: log.WithName("partition"),
353 }
354 }
355