1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package podnetworkchaos
17
18 import (
19 "context"
20
21 "github.com/go-logr/logr"
22 "github.com/pkg/errors"
23 corev1 "k8s.io/api/core/v1"
24 apierrors "k8s.io/apimachinery/pkg/api/errors"
25 "k8s.io/apimachinery/pkg/types"
26 "k8s.io/client-go/util/retry"
27 ctrl "sigs.k8s.io/controller-runtime"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
32 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/iptable"
33 tcpkg "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/tc"
34 "github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
35 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
36 chaosdaemonclient "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/client"
37 pbutils "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/netem"
38 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
39 "github.com/chaos-mesh/chaos-mesh/pkg/netem"
40 )
41
42 const (
43 invalidNetemSpecMsg = "invalid spec for netem action, at least one is required from delay, loss, duplicate, corrupt"
44 )
45
46
47 type Reconciler struct {
48 client.Client
49 Recorder recorder.ChaosRecorder
50
51 Log logr.Logger
52 AllowHostNetworkTesting bool
53 ChaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
54 }
55
56 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
57 obj := &v1alpha1.PodNetworkChaos{}
58
59 if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil {
60 if apierrors.IsNotFound(err) {
61 r.Log.Info("chaos not found")
62 } else {
63
64 r.Log.Error(err, "unable to get chaos")
65 }
66 return ctrl.Result{}, nil
67 }
68
69 if obj.ObjectMeta.Generation <= obj.Status.ObservedGeneration && obj.Status.FailedMessage == "" {
70 r.Log.Info("the target pod has been up to date", "pod", obj.Namespace+"/"+obj.Name)
71 return ctrl.Result{}, nil
72 }
73
74 r.Log.Info("updating podnetworkchaos", "pod", obj.Namespace+"/"+obj.Name, "spec", obj.Spec)
75
76 pod := &corev1.Pod{}
77
78 err := r.Client.Get(ctx, types.NamespacedName{
79 Name: obj.Name,
80 Namespace: obj.Namespace,
81 }, pod)
82 if err != nil {
83 r.Log.Error(err, "fail to find pod")
84 return ctrl.Result{}, nil
85 }
86
87 failedMessage := ""
88 observedGeneration := obj.ObjectMeta.Generation
89 defer func() {
90 if err != nil {
91 failedMessage = err.Error()
92 }
93
94 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
95 obj := &v1alpha1.PodNetworkChaos{}
96
97 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
98 r.Log.Error(err, "unable to get chaos")
99 return err
100 }
101
102 obj.Status.FailedMessage = failedMessage
103 obj.Status.ObservedGeneration = observedGeneration
104
105 return r.Client.Status().Update(context.TODO(), obj)
106 })
107
108 if updateError != nil {
109 r.Log.Error(updateError, "fail to update")
110 r.Recorder.Event(obj, recorder.Failed{
111 Activity: "update status",
112 Err: updateError.Error(),
113 })
114 }
115
116 r.Recorder.Event(obj, recorder.Updated{
117 Field: "ObservedGeneration and FailedMessage",
118 })
119 }()
120
121 if !r.AllowHostNetworkTesting {
122 if pod.Spec.HostNetwork {
123 err = errors.Errorf("It's dangerous to inject network chaos on a pod(%s/%s) with `hostNetwork`", pod.Namespace, pod.Name)
124 r.Log.Error(err, "fail to inject network chaos")
125 r.Recorder.Event(obj, recorder.Failed{
126 Activity: "inject network chaos",
127 Err: err.Error(),
128 })
129 return ctrl.Result{}, nil
130 }
131 }
132
133 pbClient, err := r.ChaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
134 Name: obj.Name,
135 Namespace: obj.Namespace,
136 })
137 if err != nil {
138 r.Recorder.Event(obj, recorder.Failed{
139 Activity: "create chaos daemon client",
140 Err: err.Error(),
141 })
142 return ctrl.Result{Requeue: true}, nil
143 }
144 defer pbClient.Close()
145
146 err = r.SetIPSets(ctx, pod, obj, pbClient)
147 if err != nil {
148 err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
149 r.Log.Error(err, "fail to set ipsets")
150 r.Recorder.Event(obj, recorder.Failed{
151 Activity: "set ipsets",
152 Err: err.Error(),
153 })
154 return ctrl.Result{Requeue: true}, nil
155 }
156
157 err = r.SetIptables(ctx, pod, obj, pbClient)
158 if err != nil {
159 err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
160 r.Log.Error(err, "fail to set iptables")
161 r.Recorder.Event(obj, recorder.Failed{
162 Activity: "set iptables",
163 Err: err.Error(),
164 })
165 return ctrl.Result{Requeue: true}, nil
166 }
167
168 err = r.SetTcs(ctx, pod, obj, pbClient)
169 if err != nil {
170 err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
171 r.Recorder.Event(obj, recorder.Failed{
172 Activity: "set tc",
173 Err: err.Error(),
174 })
175 return ctrl.Result{Requeue: true}, nil
176 }
177
178 return ctrl.Result{}, nil
179 }
180
181
182 func (r *Reconciler) SetIPSets(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
183 ipsets := []*pb.IPSet{}
184 for _, ipset := range chaos.Spec.IPSets {
185 cidrAndPorts := []*pb.CidrAndPort{}
186 for _, cidrAndPort := range ipset.CidrAndPorts {
187 cidrAndPorts = append(cidrAndPorts, &pb.CidrAndPort{
188 Cidr: cidrAndPort.Cidr,
189 Port: uint32(cidrAndPort.Port),
190 })
191 }
192 ipsets = append(ipsets, &pb.IPSet{
193 Name: ipset.Name,
194 Type: string(ipset.IPSetType),
195 Cidrs: ipset.Cidrs,
196 CidrAndPorts: cidrAndPorts,
197 SetNames: ipset.SetNames,
198 })
199 }
200 return ipset.FlushIPSets(ctx, chaosdaemonClient, pod, ipsets)
201 }
202
203
204 func (r *Reconciler) SetIptables(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
205 chains := []*pb.Chain{}
206 for _, chain := range chaos.Spec.Iptables {
207 var direction pb.Chain_Direction
208 if chain.Direction == v1alpha1.Input {
209 direction = pb.Chain_INPUT
210 } else if chain.Direction == v1alpha1.Output {
211 direction = pb.Chain_OUTPUT
212 } else {
213 err := errors.Errorf("unknown direction %s", string(chain.Direction))
214 r.Log.Error(err, "unknown direction")
215 return err
216 }
217 chains = append(chains, &pb.Chain{
218 Name: chain.Name,
219 Ipsets: chain.IPSets,
220 Direction: direction,
221 Target: "DROP",
222 Device: chain.Device,
223 })
224 }
225 return iptable.SetIptablesChains(ctx, chaosdaemonClient, pod, chains)
226 }
227
228
229 func (r *Reconciler) SetTcs(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
230 tcs := []*pb.Tc{}
231 for _, tc := range chaos.Spec.TrafficControls {
232 if tc.Type == v1alpha1.Bandwidth {
233 tbf, err := netem.FromBandwidth(tc.Bandwidth)
234 if err != nil {
235 return err
236 }
237 tcs = append(tcs, &pb.Tc{
238 Type: pb.Tc_BANDWIDTH,
239 Tbf: tbf,
240 Ipset: tc.IPSet,
241 Device: tc.Device,
242 })
243 } else if tc.Type == v1alpha1.Netem {
244 netem, err := mergeNetem(tc.TcParameter)
245 if err != nil {
246 return err
247 }
248 tcs = append(tcs, &pb.Tc{
249 Type: pb.Tc_NETEM,
250 Netem: netem,
251 Ipset: tc.IPSet,
252 Device: tc.Device,
253 })
254 } else {
255 return errors.New("unknown tc type")
256 }
257 }
258
259 r.Log.Info("setting tcs", "tcs", tcs)
260 return tcpkg.SetTcs(ctx, chaosdaemonClient, pod, tcs)
261 }
262
263
264 type NetemSpec interface {
265 ToNetem() (*pb.Netem, error)
266 }
267
268
269 func mergeNetem(spec v1alpha1.TcParameter) (*pb.Netem, error) {
270
271
272
273
274
275
276
277 var emSpecs []*pb.Netem
278 if spec.Delay != nil {
279 em, err := netem.FromDelay(spec.Delay)
280 if err != nil {
281 return nil, err
282 }
283 emSpecs = append(emSpecs, em)
284 }
285 if spec.Loss != nil {
286 em, err := netem.FromLoss(spec.Loss)
287 if err != nil {
288 return nil, err
289 }
290 emSpecs = append(emSpecs, em)
291 }
292 if spec.Duplicate != nil {
293 em, err := netem.FromDuplicate(spec.Duplicate)
294 if err != nil {
295 return nil, err
296 }
297 emSpecs = append(emSpecs, em)
298 }
299 if spec.Corrupt != nil {
300 em, err := netem.FromCorrupt(spec.Corrupt)
301 if err != nil {
302 return nil, err
303 }
304 emSpecs = append(emSpecs, em)
305 }
306 if spec.Rate != nil {
307 em, err := netem.FromRate(spec.Rate)
308 if err != nil {
309 return nil, err
310 }
311 emSpecs = append(emSpecs, em)
312 }
313 if len(emSpecs) == 0 {
314 return nil, errors.New(invalidNetemSpecMsg)
315 }
316
317 merged := &pb.Netem{}
318 for _, em := range emSpecs {
319 merged = pbutils.MergeNetem(merged, em)
320 }
321 return merged, nil
322 }
323