1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package trafficcontrol
17
18 import (
19 "context"
20 "strings"
21
22 "github.com/go-logr/logr"
23 "github.com/pkg/errors"
24 v1 "k8s.io/api/core/v1"
25 k8sError "k8s.io/apimachinery/pkg/api/errors"
26 "k8s.io/apimachinery/pkg/types"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/podnetworkchaosmanager"
31 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
32 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
33 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
34 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/netutils"
35 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
36 )
37
38 var _ impltypes.ChaosImpl = (*Impl)(nil)
39
40 const (
41 targetIPSetPostFix = "tgt"
42 sourceIPSetPostFix = "src"
43 )
44
45 const (
46 waitForApplySync v1alpha1.Phase = "Not Injected/Wait"
47 waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
48 )
49
50 type Impl struct {
51 client.Client
52
53 builder *podnetworkchaosmanager.Builder
54
55 Log logr.Logger
56 }
57
58 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
59
60
61 impl.Log.Info("traffic control Apply", "namespace", obj.GetNamespace(), "name", obj.GetName())
62 networkchaos := obj.(*v1alpha1.NetworkChaos)
63 if networkchaos.Status.Instances == nil {
64 networkchaos.Status.Instances = make(map[string]int64)
65 }
66
67 record := records[index]
68 phase := record.Phase
69
70 if phase == waitForApplySync {
71 podnetworkchaos := &v1alpha1.PodNetworkChaos{}
72 namespacedName, err := controller.ParseNamespacedName(record.Id)
73 if err != nil {
74 return waitForApplySync, err
75 }
76 err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
77 if err != nil {
78 if k8sError.IsNotFound(err) {
79 return v1alpha1.NotInjected, nil
80 }
81
82 if k8sError.IsForbidden(err) {
83 if strings.Contains(err.Error(), "because it is being terminated") {
84 return v1alpha1.NotInjected, nil
85 }
86 }
87
88 return waitForApplySync, err
89 }
90
91 if podnetworkchaos.Status.FailedMessage != "" {
92 return waitForApplySync, errors.New(podnetworkchaos.Status.FailedMessage)
93 }
94
95 if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
96 return v1alpha1.Injected, nil
97 }
98
99 return waitForApplySync, nil
100 }
101
102 var pod v1.Pod
103 namespacedName, err := controller.ParseNamespacedName(record.Id)
104 if err != nil {
105 return v1alpha1.NotInjected, err
106 }
107 err = impl.Client.Get(ctx, namespacedName, &pod)
108 if err != nil {
109
110 return v1alpha1.NotInjected, err
111 }
112
113 source := networkchaos.Namespace + "/" + networkchaos.Name
114 m := impl.builder.WithInit(source, types.NamespacedName{
115 Namespace: pod.Namespace,
116 Name: pod.Name,
117 })
118
119 if record.SelectorKey == "." {
120 if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
121 var targets []*v1alpha1.Record
122 for _, record := range records {
123 if record.SelectorKey == ".Target" {
124 targets = append(targets, record)
125 }
126 }
127
128 err := impl.ApplyTc(ctx, m, targets, networkchaos, targetIPSetPostFix, networkchaos.Spec.Device)
129 if err != nil {
130 return v1alpha1.NotInjected, err
131 }
132
133 generationNumber, err := m.Commit(ctx, networkchaos)
134 if err != nil {
135 return v1alpha1.NotInjected, err
136 }
137
138
139 networkchaos.Status.Instances[record.Id] = generationNumber
140 return waitForApplySync, nil
141 }
142
143 return v1alpha1.Injected, nil
144 } else if record.SelectorKey == ".Target" {
145 if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
146 var targets []*v1alpha1.Record
147 for _, record := range records {
148 if record.SelectorKey == "." {
149 targets = append(targets, record)
150 }
151 }
152
153 err := impl.ApplyTc(ctx, m, targets, networkchaos, sourceIPSetPostFix, networkchaos.Spec.TargetDevice)
154 if err != nil {
155 return v1alpha1.NotInjected, err
156 }
157
158 generationNumber, err := m.Commit(ctx, networkchaos)
159 if err != nil {
160 return v1alpha1.NotInjected, err
161 }
162
163
164 networkchaos.Status.Instances[record.Id] = generationNumber
165 return waitForApplySync, nil
166 }
167
168 return v1alpha1.Injected, nil
169 } else {
170 impl.Log.Info("unknown selector key", "record", record)
171 return v1alpha1.NotInjected, nil
172 }
173 }
174
175 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
176
177
178 networkchaos := obj.(*v1alpha1.NetworkChaos)
179 if networkchaos.Status.Instances == nil {
180 networkchaos.Status.Instances = make(map[string]int64)
181 }
182
183 record := records[index]
184 phase := record.Phase
185
186 if phase == waitForRecoverSync {
187 podnetworkchaos := &v1alpha1.PodNetworkChaos{}
188 namespacedName, err := controller.ParseNamespacedName(record.Id)
189 if err != nil {
190
191 return waitForRecoverSync, err
192 }
193 err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
194 if err != nil {
195
196 if k8sError.IsNotFound(err) {
197 return v1alpha1.NotInjected, nil
198 }
199 return waitForRecoverSync, err
200 }
201
202 if podnetworkchaos.Status.FailedMessage != "" {
203 return waitForRecoverSync, errors.New(podnetworkchaos.Status.FailedMessage)
204 }
205
206 if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
207 return v1alpha1.NotInjected, nil
208 }
209
210 return waitForRecoverSync, nil
211 }
212
213 var pod v1.Pod
214 namespacedName, err := controller.ParseNamespacedName(record.Id)
215 if err != nil {
216
217 return v1alpha1.Injected, err
218 }
219 err = impl.Client.Get(ctx, namespacedName, &pod)
220 if err != nil {
221
222 if k8sError.IsNotFound(err) {
223 return v1alpha1.NotInjected, nil
224 }
225
226 if k8sError.IsForbidden(err) {
227 if strings.Contains(err.Error(), "because it is being terminated") {
228 return v1alpha1.NotInjected, nil
229 }
230 }
231 return v1alpha1.Injected, err
232 }
233
234 source := networkchaos.Namespace + "/" + networkchaos.Name
235
236 m := impl.builder.WithInit(source, types.NamespacedName{
237 Namespace: pod.Namespace,
238 Name: pod.Name,
239 })
240 generationNumber, err := m.Commit(ctx, networkchaos)
241 if err != nil {
242 if err == podnetworkchaosmanager.ErrPodNotFound || err == podnetworkchaosmanager.ErrPodNotRunning {
243 return v1alpha1.NotInjected, nil
244 }
245 return v1alpha1.Injected, err
246 }
247
248
249 networkchaos.Status.Instances[record.Id] = generationNumber
250 return waitForRecoverSync, nil
251 }
252
253 func (impl *Impl) ApplyTc(ctx context.Context, m *podnetworkchaosmanager.PodNetworkManager, targets []*v1alpha1.Record, networkchaos *v1alpha1.NetworkChaos, ipSetPostFix string, device string) error {
254 spec := networkchaos.Spec
255 tcType := v1alpha1.Bandwidth
256 switch spec.Action {
257 case v1alpha1.NetemAction, v1alpha1.DelayAction, v1alpha1.DuplicateAction, v1alpha1.CorruptAction, v1alpha1.LossAction:
258 tcType = v1alpha1.Netem
259 case v1alpha1.BandwidthAction:
260 tcType = v1alpha1.Bandwidth
261 default:
262 return errors.Wrapf(utils.ErrUnknownAction, "action: %s", spec.Action)
263 }
264
265 externalCidrs, err := netutils.ResolveCidrs(networkchaos.Spec.ExternalTargets)
266 if err != nil {
267 return err
268 }
269
270 if len(targets)+len(externalCidrs) == 0 {
271 impl.Log.Info("apply traffic control", "sources", m.Source)
272 m.T.Append(v1alpha1.RawTrafficControl{
273 Type: tcType,
274 TcParameter: spec.TcParameter,
275 Source: m.Source,
276 Device: device,
277 })
278 return nil
279 }
280
281 targetPods := []v1.Pod{}
282 for _, record := range targets {
283 var pod v1.Pod
284 namespacedName, err := controller.ParseNamespacedName(record.Id)
285 if err != nil {
286
287 return err
288 }
289 err = impl.Client.Get(ctx, namespacedName, &pod)
290 if err != nil {
291
292 return err
293 }
294 targetPods = append(targetPods, pod)
295 }
296 ipSetWithTcPostFix := string(tcType[0:2]) + ipSetPostFix
297 dstIPSets := ipset.BuildIPSets(targetPods, externalCidrs, networkchaos, ipSetWithTcPostFix, m.Source)
298 dstSetIPSet := ipset.BuildSetIPSet(dstIPSets, networkchaos, ipSetWithTcPostFix, m.Source)
299 impl.Log.Info("apply traffic control with filter", "sources", m.Source, "setIpset", dstSetIPSet, "ipSets", dstIPSets)
300
301 for _, ipSet := range dstIPSets {
302 m.T.Append(ipSet)
303 }
304
305 m.T.Append(dstSetIPSet)
306
307 m.T.Append(v1alpha1.RawTrafficControl{
308 Type: tcType,
309 TcParameter: spec.TcParameter,
310 Source: m.Source,
311 IPSet: dstSetIPSet.Name,
312 Device: device,
313 })
314
315 return nil
316 }
317
318 func NewImpl(c client.Client, b *podnetworkchaosmanager.Builder, log logr.Logger) *Impl {
319 return &Impl{
320 Client: c,
321 builder: b,
322 Log: log.WithName("trafficcontrol"),
323 }
324 }
325