1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package trafficcontrol
17
18 import (
19 "context"
20 "errors"
21 "fmt"
22 "strings"
23
24 "github.com/go-logr/logr"
25 v1 "k8s.io/api/core/v1"
26 k8sError "k8s.io/apimachinery/pkg/api/errors"
27 "k8s.io/apimachinery/pkg/types"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/podnetworkchaosmanager"
32 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
33 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
34 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/netutils"
35 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
36 )
37
38 var _ impltypes.ChaosImpl = (*Impl)(nil)
39
40 const (
41 targetIPSetPostFix = "tgt"
42 sourceIPSetPostFix = "src"
43 )
44
45 const (
46 waitForApplySync v1alpha1.Phase = "Not Injected/Wait"
47 waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
48 )
49
50 type Impl struct {
51 client.Client
52
53 builder *podnetworkchaosmanager.Builder
54
55 Log logr.Logger
56 }
57
58 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
59
60
61 impl.Log.Info("traffic control Apply", "namespace", obj.GetNamespace(), "name", obj.GetName())
62 networkchaos := obj.(*v1alpha1.NetworkChaos)
63 if networkchaos.Status.Instances == nil {
64 networkchaos.Status.Instances = make(map[string]int64)
65 }
66 if networkchaos.Status.Instances == nil {
67 networkchaos.Status.Instances = make(map[string]int64)
68 }
69
70 record := records[index]
71 phase := record.Phase
72
73 if phase == waitForApplySync {
74 podnetworkchaos := &v1alpha1.PodNetworkChaos{}
75 namespacedName, err := controller.ParseNamespacedName(record.Id)
76 if err != nil {
77 return waitForApplySync, err
78 }
79 err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
80 if err != nil {
81 if k8sError.IsNotFound(err) {
82 return v1alpha1.NotInjected, nil
83 }
84
85 if k8sError.IsForbidden(err) {
86 if strings.Contains(err.Error(), "because it is being terminated") {
87 return v1alpha1.NotInjected, nil
88 }
89 }
90
91 return waitForApplySync, err
92 }
93
94 if podnetworkchaos.Status.FailedMessage != "" {
95 return waitForApplySync, errors.New(podnetworkchaos.Status.FailedMessage)
96 }
97
98 if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
99 return v1alpha1.Injected, nil
100 }
101
102 return waitForApplySync, nil
103 }
104
105 var pod v1.Pod
106 namespacedName, err := controller.ParseNamespacedName(record.Id)
107 if err != nil {
108 return v1alpha1.NotInjected, err
109 }
110 err = impl.Client.Get(ctx, namespacedName, &pod)
111 if err != nil {
112
113 return v1alpha1.NotInjected, err
114 }
115
116 source := networkchaos.Namespace + "/" + networkchaos.Name
117 m := impl.builder.WithInit(source, types.NamespacedName{
118 Namespace: pod.Namespace,
119 Name: pod.Name,
120 })
121
122 if record.SelectorKey == "." {
123 if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
124 var targets []*v1alpha1.Record
125 for _, record := range records {
126 if record.SelectorKey == ".Target" {
127 targets = append(targets, record)
128 }
129 }
130
131 err := impl.ApplyTc(ctx, m, targets, networkchaos, targetIPSetPostFix, networkchaos.Spec.Device)
132 if err != nil {
133 return v1alpha1.NotInjected, err
134 }
135
136 generationNumber, err := m.Commit(ctx, networkchaos)
137 if err != nil {
138 return v1alpha1.NotInjected, err
139 }
140
141
142 networkchaos.Status.Instances[record.Id] = generationNumber
143 return waitForApplySync, nil
144 }
145
146 return v1alpha1.Injected, nil
147 } else if record.SelectorKey == ".Target" {
148 if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
149 var targets []*v1alpha1.Record
150 for _, record := range records {
151 if record.SelectorKey == "." {
152 targets = append(targets, record)
153 }
154 }
155
156 err := impl.ApplyTc(ctx, m, targets, networkchaos, sourceIPSetPostFix, networkchaos.Spec.TargetDevice)
157 if err != nil {
158 return v1alpha1.NotInjected, err
159 }
160
161 generationNumber, err := m.Commit(ctx, networkchaos)
162 if err != nil {
163 return v1alpha1.NotInjected, err
164 }
165
166
167 networkchaos.Status.Instances[record.Id] = generationNumber
168 return waitForApplySync, nil
169 }
170
171 return v1alpha1.Injected, nil
172 } else {
173 impl.Log.Info("unknown selector key", "record", record)
174 return v1alpha1.NotInjected, nil
175 }
176 }
177
178 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
179
180
181 networkchaos := obj.(*v1alpha1.NetworkChaos)
182 if networkchaos.Status.Instances == nil {
183 networkchaos.Status.Instances = make(map[string]int64)
184 }
185 if networkchaos.Status.Instances == nil {
186 networkchaos.Status.Instances = make(map[string]int64)
187 }
188
189 record := records[index]
190 phase := record.Phase
191
192 if phase == waitForRecoverSync {
193 podnetworkchaos := &v1alpha1.PodNetworkChaos{}
194 namespacedName, err := controller.ParseNamespacedName(record.Id)
195 if err != nil {
196
197 return waitForRecoverSync, err
198 }
199 err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
200 if err != nil {
201
202 if k8sError.IsNotFound(err) {
203 return v1alpha1.NotInjected, nil
204 }
205 return waitForRecoverSync, err
206 }
207
208 if podnetworkchaos.Status.FailedMessage != "" {
209 return waitForRecoverSync, errors.New(podnetworkchaos.Status.FailedMessage)
210 }
211
212 if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
213 return v1alpha1.NotInjected, nil
214 }
215
216 return waitForRecoverSync, nil
217 }
218
219 var pod v1.Pod
220 namespacedName, err := controller.ParseNamespacedName(record.Id)
221 if err != nil {
222
223 return v1alpha1.Injected, err
224 }
225 err = impl.Client.Get(ctx, namespacedName, &pod)
226 if err != nil {
227
228 if k8sError.IsNotFound(err) {
229 return v1alpha1.NotInjected, nil
230 }
231
232 if k8sError.IsForbidden(err) {
233 if strings.Contains(err.Error(), "because it is being terminated") {
234 return v1alpha1.NotInjected, nil
235 }
236 }
237 return v1alpha1.Injected, err
238 }
239
240 source := networkchaos.Namespace + "/" + networkchaos.Name
241
242 m := impl.builder.WithInit(source, types.NamespacedName{
243 Namespace: pod.Namespace,
244 Name: pod.Name,
245 })
246 generationNumber, err := m.Commit(ctx, networkchaos)
247 if err != nil {
248 if err == podnetworkchaosmanager.ErrPodNotFound || err == podnetworkchaosmanager.ErrPodNotRunning {
249 return v1alpha1.NotInjected, nil
250 }
251 return v1alpha1.Injected, err
252 }
253
254
255 networkchaos.Status.Instances[record.Id] = generationNumber
256 return waitForRecoverSync, nil
257 }
258
259 func (impl *Impl) ApplyTc(ctx context.Context, m *podnetworkchaosmanager.PodNetworkManager, targets []*v1alpha1.Record, networkchaos *v1alpha1.NetworkChaos, ipSetPostFix string, device string) error {
260 spec := networkchaos.Spec
261 tcType := v1alpha1.Bandwidth
262 switch spec.Action {
263 case v1alpha1.NetemAction, v1alpha1.DelayAction, v1alpha1.DuplicateAction, v1alpha1.CorruptAction, v1alpha1.LossAction:
264 tcType = v1alpha1.Netem
265 case v1alpha1.BandwidthAction:
266 tcType = v1alpha1.Bandwidth
267 default:
268 return fmt.Errorf("unknown action %s", spec.Action)
269 }
270
271 externalCidrs, err := netutils.ResolveCidrs(networkchaos.Spec.ExternalTargets)
272 if err != nil {
273 return err
274 }
275
276 if len(targets)+len(externalCidrs) == 0 {
277 impl.Log.Info("apply traffic control", "sources", m.Source)
278 m.T.Append(v1alpha1.RawTrafficControl{
279 Type: tcType,
280 TcParameter: spec.TcParameter,
281 Source: m.Source,
282 Device: device,
283 })
284 return nil
285 }
286
287 targetPods := []v1.Pod{}
288 for _, record := range targets {
289 var pod v1.Pod
290 namespacedName, err := controller.ParseNamespacedName(record.Id)
291 if err != nil {
292
293 return err
294 }
295 err = impl.Client.Get(ctx, namespacedName, &pod)
296 if err != nil {
297
298 return err
299 }
300 targetPods = append(targetPods, pod)
301 }
302 dstIpset := ipset.BuildIPSet(targetPods, externalCidrs, networkchaos, string(tcType[0:2])+ipSetPostFix, m.Source)
303 impl.Log.Info("apply traffic control with filter", "sources", m.Source, "ipset", dstIpset)
304
305 m.T.Append(dstIpset)
306 m.T.Append(v1alpha1.RawTrafficControl{
307 Type: tcType,
308 TcParameter: spec.TcParameter,
309 Source: m.Source,
310 IPSet: dstIpset.Name,
311 Device: device,
312 })
313
314 return nil
315 }
316
317 func NewImpl(c client.Client, b *podnetworkchaosmanager.Builder, log logr.Logger) *Impl {
318 return &Impl{
319 Client: c,
320 builder: b,
321 Log: log.WithName("trafficcontrol"),
322 }
323 }
324