1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package podnetworkchaos
17
18 import (
19 "context"
20 "fmt"
21
22 "github.com/go-logr/logr"
23 "github.com/pkg/errors"
24 corev1 "k8s.io/api/core/v1"
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 "k8s.io/apimachinery/pkg/types"
27 "k8s.io/client-go/util/retry"
28 ctrl "sigs.k8s.io/controller-runtime"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
33 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/iptable"
34 tcpkg "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/tc"
35 "github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
36 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
37 chaosdaemonclient "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/client"
38 pbutils "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/netem"
39 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
40 "github.com/chaos-mesh/chaos-mesh/pkg/netem"
41 )
42
43 const (
44 invalidNetemSpecMsg = "invalid spec for netem action, at least one is required from delay, loss, duplicate, corrupt"
45 )
46
47
48 type Reconciler struct {
49 client.Client
50 Recorder recorder.ChaosRecorder
51
52 Log logr.Logger
53 AllowHostNetworkTesting bool
54 ChaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
55 }
56
57 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
58 obj := &v1alpha1.PodNetworkChaos{}
59
60 if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil {
61 if apierrors.IsNotFound(err) {
62 r.Log.Info("chaos not found")
63 } else {
64
65 r.Log.Error(err, "unable to get chaos")
66 }
67 return ctrl.Result{}, nil
68 }
69
70 if obj.ObjectMeta.Generation <= obj.Status.ObservedGeneration && obj.Status.FailedMessage == "" {
71 r.Log.Info("the target pod has been up to date", "pod", obj.Namespace+"/"+obj.Name)
72 return ctrl.Result{}, nil
73 }
74
75 r.Log.Info("updating podnetworkchaos", "pod", obj.Namespace+"/"+obj.Name, "spec", obj.Spec)
76
77 pod := &corev1.Pod{}
78
79 err := r.Client.Get(ctx, types.NamespacedName{
80 Name: obj.Name,
81 Namespace: obj.Namespace,
82 }, pod)
83 if err != nil {
84 r.Log.Error(err, "fail to find pod")
85 return ctrl.Result{}, nil
86 }
87
88 failedMessage := ""
89 observedGeneration := obj.ObjectMeta.Generation
90 defer func() {
91 if err != nil {
92 failedMessage = err.Error()
93 }
94
95 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
96 obj := &v1alpha1.PodNetworkChaos{}
97
98 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
99 r.Log.Error(err, "unable to get chaos")
100 return err
101 }
102
103 obj.Status.FailedMessage = failedMessage
104 obj.Status.ObservedGeneration = observedGeneration
105
106 return r.Client.Status().Update(context.TODO(), obj)
107 })
108
109 if updateError != nil {
110 r.Log.Error(updateError, "fail to update")
111 r.Recorder.Event(obj, recorder.Failed{
112 Activity: "update status",
113 Err: updateError.Error(),
114 })
115 }
116
117 r.Recorder.Event(obj, recorder.Updated{
118 Field: "ObservedGeneration and FailedMessage",
119 })
120 }()
121
122 if !r.AllowHostNetworkTesting {
123 if pod.Spec.HostNetwork {
124 err = errors.Errorf("It's dangerous to inject network chaos on a pod(%s/%s) with `hostNetwork`", pod.Namespace, pod.Name)
125 r.Log.Error(err, "fail to inject network chaos")
126 r.Recorder.Event(obj, recorder.Failed{
127 Activity: "inject network chaos",
128 Err: err.Error(),
129 })
130 return ctrl.Result{}, nil
131 }
132 }
133
134 pbClient, err := r.ChaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
135 Name: obj.Name,
136 Namespace: obj.Namespace,
137 })
138 if err != nil {
139 r.Recorder.Event(obj, recorder.Failed{
140 Activity: "create chaos daemon client",
141 Err: err.Error(),
142 })
143 return ctrl.Result{Requeue: true}, nil
144 }
145 defer pbClient.Close()
146
147 err = r.SetIPSets(ctx, pod, obj, pbClient)
148 if err != nil {
149 r.Log.Error(err, "fail to set ipsets")
150 r.Recorder.Event(obj, recorder.Failed{
151 Activity: "set ipsets",
152 Err: err.Error(),
153 })
154 return ctrl.Result{Requeue: true}, nil
155 }
156
157 err = r.SetIptables(ctx, pod, obj, pbClient)
158 if err != nil {
159 r.Log.Error(err, "fail to set iptables")
160 r.Recorder.Event(obj, recorder.Failed{
161 Activity: "set iptables",
162 Err: err.Error(),
163 })
164 return ctrl.Result{Requeue: true}, nil
165 }
166
167 err = r.SetTcs(ctx, pod, obj, pbClient)
168 if err != nil {
169 r.Recorder.Event(obj, recorder.Failed{
170 Activity: "set tc",
171 Err: err.Error(),
172 })
173 return ctrl.Result{Requeue: true}, nil
174 }
175
176 return ctrl.Result{}, nil
177 }
178
179
180 func (r *Reconciler) SetIPSets(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
181 ipsets := []*pb.IPSet{}
182 for _, ipset := range chaos.Spec.IPSets {
183 ipsets = append(ipsets, &pb.IPSet{
184 Name: ipset.Name,
185 Cidrs: ipset.Cidrs,
186 })
187 }
188 return ipset.FlushIPSets(ctx, chaosdaemonClient, pod, ipsets)
189 }
190
191
192 func (r *Reconciler) SetIptables(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
193 chains := []*pb.Chain{}
194 for _, chain := range chaos.Spec.Iptables {
195 var direction pb.Chain_Direction
196 if chain.Direction == v1alpha1.Input {
197 direction = pb.Chain_INPUT
198 } else if chain.Direction == v1alpha1.Output {
199 direction = pb.Chain_OUTPUT
200 } else {
201 err := fmt.Errorf("unknown direction %s", string(chain.Direction))
202 r.Log.Error(err, "unknown direction")
203 return err
204 }
205 chains = append(chains, &pb.Chain{
206 Name: chain.Name,
207 Ipsets: chain.IPSets,
208 Direction: direction,
209 Target: "DROP",
210 Device: chain.Device,
211 })
212 }
213 return iptable.SetIptablesChains(ctx, chaosdaemonClient, pod, chains)
214 }
215
216
217 func (r *Reconciler) SetTcs(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
218 tcs := []*pb.Tc{}
219 for _, tc := range chaos.Spec.TrafficControls {
220 if tc.Type == v1alpha1.Bandwidth {
221 tbf, err := netem.FromBandwidth(tc.Bandwidth)
222 if err != nil {
223 return err
224 }
225 tcs = append(tcs, &pb.Tc{
226 Type: pb.Tc_BANDWIDTH,
227 Tbf: tbf,
228 Ipset: tc.IPSet,
229 Device: tc.Device,
230 })
231 } else if tc.Type == v1alpha1.Netem {
232 netem, err := mergeNetem(tc.TcParameter)
233 if err != nil {
234 return err
235 }
236 tcs = append(tcs, &pb.Tc{
237 Type: pb.Tc_NETEM,
238 Netem: netem,
239 Ipset: tc.IPSet,
240 Device: tc.Device,
241 })
242 } else {
243 return fmt.Errorf("unknown tc type")
244 }
245 }
246
247 r.Log.Info("setting tcs", "tcs", tcs)
248 return tcpkg.SetTcs(ctx, chaosdaemonClient, pod, tcs)
249 }
250
251
252 type NetemSpec interface {
253 ToNetem() (*pb.Netem, error)
254 }
255
256
257 func mergeNetem(spec v1alpha1.TcParameter) (*pb.Netem, error) {
258
259
260
261
262
263
264
265 var emSpecs []*pb.Netem
266 if spec.Delay != nil {
267 em, err := netem.FromDelay(spec.Delay)
268 if err != nil {
269 return nil, err
270 }
271 emSpecs = append(emSpecs, em)
272 }
273 if spec.Loss != nil {
274 em, err := netem.FromLoss(spec.Loss)
275 if err != nil {
276 return nil, err
277 }
278 emSpecs = append(emSpecs, em)
279 }
280 if spec.Duplicate != nil {
281 em, err := netem.FromDuplicate(spec.Duplicate)
282 if err != nil {
283 return nil, err
284 }
285 emSpecs = append(emSpecs, em)
286 }
287 if spec.Corrupt != nil {
288 em, err := netem.FromCorrupt(spec.Corrupt)
289 if err != nil {
290 return nil, err
291 }
292 emSpecs = append(emSpecs, em)
293 }
294 if len(emSpecs) == 0 {
295 return nil, errors.New(invalidNetemSpecMsg)
296 }
297
298 merged := &pb.Netem{}
299 for _, em := range emSpecs {
300 merged = pbutils.MergeNetem(merged, em)
301 }
302 return merged, nil
303 }
304