1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package podnetworkchaos
17
18 import (
19 "context"
20 "fmt"
21
22 "github.com/go-logr/logr"
23 "github.com/pkg/errors"
24 corev1 "k8s.io/api/core/v1"
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 "k8s.io/apimachinery/pkg/types"
27 "k8s.io/client-go/util/retry"
28 ctrl "sigs.k8s.io/controller-runtime"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
33 "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/iptable"
34 tcpkg "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/tc"
35 "github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
36 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
37 pbutils "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/netem"
38 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
39 "github.com/chaos-mesh/chaos-mesh/pkg/netem"
40 )
41
42 const (
43 invalidNetemSpecMsg = "invalid spec for netem action, at least one is required from delay, loss, duplicate, corrupt"
44 )
45
46
47 type Reconciler struct {
48 client.Client
49 Recorder recorder.ChaosRecorder
50
51 Log logr.Logger
52 AllowHostNetworkTesting bool
53 ChaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
54 }
55
56 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
57 obj := &v1alpha1.PodNetworkChaos{}
58
59 if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil {
60 if apierrors.IsNotFound(err) {
61 r.Log.Info("chaos not found")
62 } else {
63
64 r.Log.Error(err, "unable to get chaos")
65 }
66 return ctrl.Result{}, nil
67 }
68
69 if obj.ObjectMeta.Generation <= obj.Status.ObservedGeneration && obj.Status.FailedMessage == "" {
70 r.Log.Info("the target pod has been up to date", "pod", obj.Namespace+"/"+obj.Name)
71 return ctrl.Result{}, nil
72 }
73
74 r.Log.Info("updating podnetworkchaos", "pod", obj.Namespace+"/"+obj.Name, "spec", obj.Spec)
75
76 pod := &corev1.Pod{}
77
78 err := r.Client.Get(ctx, types.NamespacedName{
79 Name: obj.Name,
80 Namespace: obj.Namespace,
81 }, pod)
82 if err != nil {
83 r.Log.Error(err, "fail to find pod")
84 return ctrl.Result{}, nil
85 }
86
87 failedMessage := ""
88 observedGeneration := obj.ObjectMeta.Generation
89 defer func() {
90 if err != nil {
91 failedMessage = err.Error()
92 }
93
94 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
95 obj := &v1alpha1.PodNetworkChaos{}
96
97 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
98 r.Log.Error(err, "unable to get chaos")
99 return err
100 }
101
102 obj.Status.FailedMessage = failedMessage
103 obj.Status.ObservedGeneration = observedGeneration
104
105 return r.Client.Status().Update(context.TODO(), obj)
106 })
107
108 if updateError != nil {
109 r.Log.Error(updateError, "fail to update")
110 r.Recorder.Event(obj, recorder.Failed{
111 Activity: "update status",
112 Err: updateError.Error(),
113 })
114 }
115
116 r.Recorder.Event(obj, recorder.Updated{
117 Field: "ObservedGeneration and FailedMessage",
118 })
119 }()
120
121 if !r.AllowHostNetworkTesting {
122 if pod.Spec.HostNetwork {
123 err = errors.Errorf("It's dangerous to inject network chaos on a pod(%s/%s) with `hostNetwork`", pod.Namespace, pod.Name)
124 r.Log.Error(err, "fail to inject network chaos")
125 r.Recorder.Event(obj, recorder.Failed{
126 Activity: "inject network chaos",
127 Err: err.Error(),
128 })
129 return ctrl.Result{}, nil
130 }
131 }
132
133 err = r.SetIPSets(ctx, pod, obj)
134 if err != nil {
135 r.Log.Error(err, "fail to set ipsets")
136 r.Recorder.Event(obj, recorder.Failed{
137 Activity: "set ipsets",
138 Err: err.Error(),
139 })
140 return ctrl.Result{Requeue: true}, nil
141 }
142
143 err = r.SetIptables(ctx, pod, obj)
144 if err != nil {
145 r.Log.Error(err, "fail to set iptables")
146 r.Recorder.Event(obj, recorder.Failed{
147 Activity: "set iptables",
148 Err: err.Error(),
149 })
150 return ctrl.Result{Requeue: true}, nil
151 }
152
153 err = r.SetTcs(ctx, pod, obj)
154 if err != nil {
155 r.Recorder.Event(obj, recorder.Failed{
156 Activity: "set tc",
157 Err: err.Error(),
158 })
159 return ctrl.Result{Requeue: true}, nil
160 }
161
162 return ctrl.Result{}, nil
163 }
164
165
166 func (r *Reconciler) SetIPSets(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos) error {
167 ipsets := []*pb.IPSet{}
168 for _, ipset := range chaos.Spec.IPSets {
169 ipsets = append(ipsets, &pb.IPSet{
170 Name: ipset.Name,
171 Cidrs: ipset.Cidrs,
172 })
173 }
174 return ipset.FlushIPSets(ctx, r.ChaosDaemonClientBuilder, pod, ipsets)
175 }
176
177
178 func (r *Reconciler) SetIptables(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos) error {
179 chains := []*pb.Chain{}
180 for _, chain := range chaos.Spec.Iptables {
181 var direction pb.Chain_Direction
182 if chain.Direction == v1alpha1.Input {
183 direction = pb.Chain_INPUT
184 } else if chain.Direction == v1alpha1.Output {
185 direction = pb.Chain_OUTPUT
186 } else {
187 err := fmt.Errorf("unknown direction %s", string(chain.Direction))
188 r.Log.Error(err, "unknown direction")
189 return err
190 }
191 chains = append(chains, &pb.Chain{
192 Name: chain.Name,
193 Ipsets: chain.IPSets,
194 Direction: direction,
195 Target: "DROP",
196 Device: chain.Device,
197 })
198 }
199 return iptable.SetIptablesChains(ctx, r.ChaosDaemonClientBuilder, pod, chains)
200 }
201
202
203 func (r *Reconciler) SetTcs(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos) error {
204 tcs := []*pb.Tc{}
205 for _, tc := range chaos.Spec.TrafficControls {
206 if tc.Type == v1alpha1.Bandwidth {
207 tbf, err := netem.FromBandwidth(tc.Bandwidth)
208 if err != nil {
209 return err
210 }
211 tcs = append(tcs, &pb.Tc{
212 Type: pb.Tc_BANDWIDTH,
213 Tbf: tbf,
214 Ipset: tc.IPSet,
215 Device: tc.Device,
216 })
217 } else if tc.Type == v1alpha1.Netem {
218 netem, err := mergeNetem(tc.TcParameter)
219 if err != nil {
220 return err
221 }
222 tcs = append(tcs, &pb.Tc{
223 Type: pb.Tc_NETEM,
224 Netem: netem,
225 Ipset: tc.IPSet,
226 Device: tc.Device,
227 })
228 } else {
229 return fmt.Errorf("unknown tc type")
230 }
231 }
232
233 r.Log.Info("setting tcs", "tcs", tcs)
234 return tcpkg.SetTcs(ctx, r.ChaosDaemonClientBuilder, pod, tcs)
235 }
236
237
238 type NetemSpec interface {
239 ToNetem() (*pb.Netem, error)
240 }
241
242
243 func mergeNetem(spec v1alpha1.TcParameter) (*pb.Netem, error) {
244
245
246
247
248
249
250
251 var emSpecs []*pb.Netem
252 if spec.Delay != nil {
253 em, err := netem.FromDelay(spec.Delay)
254 if err != nil {
255 return nil, err
256 }
257 emSpecs = append(emSpecs, em)
258 }
259 if spec.Loss != nil {
260 em, err := netem.FromLoss(spec.Loss)
261 if err != nil {
262 return nil, err
263 }
264 emSpecs = append(emSpecs, em)
265 }
266 if spec.Duplicate != nil {
267 em, err := netem.FromDuplicate(spec.Duplicate)
268 if err != nil {
269 return nil, err
270 }
271 emSpecs = append(emSpecs, em)
272 }
273 if spec.Corrupt != nil {
274 em, err := netem.FromCorrupt(spec.Corrupt)
275 if err != nil {
276 return nil, err
277 }
278 emSpecs = append(emSpecs, em)
279 }
280 if len(emSpecs) == 0 {
281 return nil, errors.New(invalidNetemSpecMsg)
282 }
283
284 merged := &pb.Netem{}
285 for _, em := range emSpecs {
286 merged = pbutils.MergeNetem(merged, em)
287 }
288 return merged, nil
289 }
290