1
2
3
4
5
6
7
8
9
10
11
12
13
14 package trafficcontrol
15
16 import (
17 "context"
18 "errors"
19 "fmt"
20 "strings"
21
22 "github.com/go-logr/logr"
23 v1 "k8s.io/api/core/v1"
24 k8sError "k8s.io/apimachinery/pkg/api/errors"
25 "k8s.io/apimachinery/pkg/types"
26 "sigs.k8s.io/controller-runtime/pkg/client"
27
28 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
29 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/podnetworkchaosmanager"
30 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
31 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/netutils"
32 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
33 )
34
35 const (
36 networkTcActionMsg = "network traffic control action duration %s"
37 networkChaosSourceMsg = "This is a source pod."
38 networkChaosTargetMsg = "This is a target pod."
39
40 targetIPSetPostFix = "tgt"
41 sourceIPSetPostFix = "src"
42 )
43
44 const (
45 waitForApplySync v1alpha1.Phase = "Not Injected/Wait"
46 waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
47 )
48
49 type Impl struct {
50 client.Client
51
52 builder *podnetworkchaosmanager.Builder
53
54 Log logr.Logger
55 }
56
57 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
58
59
60 impl.Log.Info("traffic control Apply", "namespace", obj.GetObjectMeta().Namespace, "name", obj.GetObjectMeta().Name)
61 networkchaos := obj.(*v1alpha1.NetworkChaos)
62 if networkchaos.Status.Instances == nil {
63 networkchaos.Status.Instances = make(map[string]int64)
64 }
65 if networkchaos.Status.Instances == nil {
66 networkchaos.Status.Instances = make(map[string]int64)
67 }
68
69 record := records[index]
70 phase := record.Phase
71
72 if phase == waitForApplySync {
73 podnetworkchaos := &v1alpha1.PodNetworkChaos{}
74 err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), podnetworkchaos)
75 if err != nil {
76 if k8sError.IsNotFound(err) {
77 return v1alpha1.NotInjected, nil
78 }
79
80 if k8sError.IsForbidden(err) {
81 if strings.Contains(err.Error(), "because it is being terminated") {
82 return v1alpha1.NotInjected, nil
83 }
84 }
85
86 return waitForApplySync, err
87 }
88
89 if podnetworkchaos.Status.FailedMessage != "" {
90 return waitForApplySync, errors.New(podnetworkchaos.Status.FailedMessage)
91 }
92
93 if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
94 return v1alpha1.Injected, nil
95 }
96
97 return waitForApplySync, nil
98 }
99
100 var pod v1.Pod
101 err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
102 if err != nil {
103
104 return v1alpha1.NotInjected, err
105 }
106
107 source := networkchaos.Namespace + "/" + networkchaos.Name
108 m := impl.builder.WithInit(source, types.NamespacedName{
109 Namespace: pod.Namespace,
110 Name: pod.Name,
111 })
112
113 if record.SelectorKey == "." {
114 if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
115 var targets []*v1alpha1.Record
116 for _, record := range records {
117 if record.SelectorKey == ".Target" {
118 targets = append(targets, record)
119 }
120 }
121
122 err := impl.ApplyTc(ctx, m, targets, networkchaos, targetIPSetPostFix)
123 if err != nil {
124 return v1alpha1.NotInjected, err
125 }
126
127 generationNumber, err := m.Commit(ctx, networkchaos)
128 if err != nil {
129 return v1alpha1.NotInjected, err
130 }
131
132
133 networkchaos.Status.Instances[record.Id] = generationNumber
134 return waitForApplySync, nil
135 }
136
137 return v1alpha1.Injected, nil
138 } else if record.SelectorKey == ".Target" {
139 if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
140 var targets []*v1alpha1.Record
141 for _, record := range records {
142 if record.SelectorKey == "." {
143 targets = append(targets, record)
144 }
145 }
146
147 err := impl.ApplyTc(ctx, m, targets, networkchaos, sourceIPSetPostFix)
148 if err != nil {
149 return v1alpha1.NotInjected, err
150 }
151
152 generationNumber, err := m.Commit(ctx, networkchaos)
153 if err != nil {
154 return v1alpha1.NotInjected, err
155 }
156
157
158 networkchaos.Status.Instances[record.Id] = generationNumber
159 return waitForApplySync, nil
160 }
161
162 return v1alpha1.Injected, nil
163 } else {
164 impl.Log.Info("unknown selector key", "record", record)
165 return v1alpha1.NotInjected, nil
166 }
167 }
168
169 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
170
171
172 networkchaos := obj.(*v1alpha1.NetworkChaos)
173 if networkchaos.Status.Instances == nil {
174 networkchaos.Status.Instances = make(map[string]int64)
175 }
176 if networkchaos.Status.Instances == nil {
177 networkchaos.Status.Instances = make(map[string]int64)
178 }
179
180 record := records[index]
181 phase := record.Phase
182
183 if phase == waitForRecoverSync {
184 podnetworkchaos := &v1alpha1.PodNetworkChaos{}
185 err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), podnetworkchaos)
186 if err != nil {
187
188 if k8sError.IsNotFound(err) {
189 return v1alpha1.NotInjected, nil
190 }
191 return waitForRecoverSync, err
192 }
193
194 if podnetworkchaos.Status.FailedMessage != "" {
195 return waitForRecoverSync, errors.New(podnetworkchaos.Status.FailedMessage)
196 }
197
198 if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
199 return v1alpha1.NotInjected, nil
200 }
201
202 return waitForRecoverSync, nil
203 }
204
205 var pod v1.Pod
206 err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
207 if err != nil {
208
209 if k8sError.IsNotFound(err) {
210 return v1alpha1.NotInjected, nil
211 }
212
213 if k8sError.IsForbidden(err) {
214 if strings.Contains(err.Error(), "because it is being terminated") {
215 return v1alpha1.NotInjected, nil
216 }
217 }
218 return v1alpha1.Injected, err
219 }
220
221 source := networkchaos.Namespace + "/" + networkchaos.Name
222
223 m := impl.builder.WithInit(source, types.NamespacedName{
224 Namespace: pod.Namespace,
225 Name: pod.Name,
226 })
227 generationNumber, err := m.Commit(ctx, networkchaos)
228 if err != nil {
229 if err == podnetworkchaosmanager.ErrPodNotFound || err == podnetworkchaosmanager.ErrPodNotRunning {
230 return v1alpha1.NotInjected, nil
231 }
232 return v1alpha1.Injected, err
233 }
234
235
236 networkchaos.Status.Instances[record.Id] = generationNumber
237 return waitForRecoverSync, nil
238 }
239
240 func (impl *Impl) ApplyTc(ctx context.Context, m *podnetworkchaosmanager.PodNetworkManager, targets []*v1alpha1.Record, networkchaos *v1alpha1.NetworkChaos, ipSetPostFix string) error {
241 spec := networkchaos.Spec
242 tcType := v1alpha1.Bandwidth
243 switch spec.Action {
244 case v1alpha1.NetemAction, v1alpha1.DelayAction, v1alpha1.DuplicateAction, v1alpha1.CorruptAction, v1alpha1.LossAction:
245 tcType = v1alpha1.Netem
246 case v1alpha1.BandwidthAction:
247 tcType = v1alpha1.Bandwidth
248 default:
249 return fmt.Errorf("unknown action %s", spec.Action)
250 }
251
252 externalCidrs, err := netutils.ResolveCidrs(networkchaos.Spec.ExternalTargets)
253 if err != nil {
254 return err
255 }
256
257 if len(targets)+len(externalCidrs) == 0 {
258 impl.Log.Info("apply traffic control", "sources", m.Source)
259 m.T.Append(v1alpha1.RawTrafficControl{
260 Type: tcType,
261 TcParameter: spec.TcParameter,
262 Source: m.Source,
263 })
264 return nil
265 }
266
267 targetPods := []v1.Pod{}
268 for _, record := range targets {
269 var pod v1.Pod
270 err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
271 if err != nil {
272
273 return err
274 }
275 targetPods = append(targetPods, pod)
276 }
277 dstIpset := ipset.BuildIPSet(targetPods, externalCidrs, networkchaos, string(tcType[0:2])+ipSetPostFix, m.Source)
278 impl.Log.Info("apply traffic control with filter", "sources", m.Source, "ipset", dstIpset)
279
280 m.T.Append(dstIpset)
281 m.T.Append(v1alpha1.RawTrafficControl{
282 Type: tcType,
283 TcParameter: spec.TcParameter,
284 Source: m.Source,
285 IPSet: dstIpset.Name,
286 })
287
288 return nil
289 }
290
291 func NewImpl(c client.Client, b *podnetworkchaosmanager.Builder, log logr.Logger) *Impl {
292 return &Impl{
293 Client: c,
294 builder: b,
295 Log: log.WithName("trafficcontrol"),
296 }
297 }
298