1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package podiochaos
17
18 import (
19 "context"
20 "encoding/json"
21 "strings"
22
23 "github.com/go-logr/logr"
24 "github.com/pkg/errors"
25 v1 "k8s.io/api/core/v1"
26 apierrors "k8s.io/apimachinery/pkg/api/errors"
27 "k8s.io/apimachinery/pkg/types"
28 "k8s.io/client-go/tools/record"
29 "k8s.io/client-go/util/retry"
30 ctrl "sigs.k8s.io/controller-runtime"
31 "sigs.k8s.io/controller-runtime/pkg/client"
32
33 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
34 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
35 "github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
36 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
37 )
38
39
40 type Reconciler struct {
41 client.Client
42 Recorder record.EventRecorder
43
44 Log logr.Logger
45 ChaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
46 }
47
48 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
49 obj := &v1alpha1.PodIOChaos{}
50
51 if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil {
52 if apierrors.IsNotFound(err) {
53 r.Log.Info("chaos not found")
54 } else {
55
56 r.Log.Error(err, "unable to get chaos")
57 }
58 return ctrl.Result{}, nil
59 }
60
61 if obj.ObjectMeta.Generation <= obj.Status.ObservedGeneration && obj.Status.FailedMessage == "" {
62 r.Log.Info("the target pod has been up to date", "pod", obj.Namespace+"/"+obj.Name)
63 return ctrl.Result{}, nil
64 }
65
66 r.Log.Info("updating io chaos", "pod", obj.Namespace+"/"+obj.Name, "spec", obj.Spec)
67
68 pod := &v1.Pod{}
69
70 err := r.Client.Get(ctx, types.NamespacedName{
71 Name: obj.Name,
72 Namespace: obj.Namespace,
73 }, pod)
74 if err != nil {
75 r.Log.Error(err, "fail to find pod")
76 return ctrl.Result{}, nil
77 }
78
79 failedMessage := ""
80 observedGeneration := obj.ObjectMeta.Generation
81 pid := obj.Status.Pid
82 startTime := obj.Status.StartTime
83 defer func() {
84 if err != nil {
85 failedMessage = err.Error()
86 }
87
88 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
89 obj := &v1alpha1.PodIOChaos{}
90
91 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
92 r.Log.Error(err, "unable to get chaos")
93 return err
94 }
95
96 obj.Status.FailedMessage = failedMessage
97 obj.Status.ObservedGeneration = observedGeneration
98 obj.Status.Pid = pid
99 obj.Status.StartTime = startTime
100
101 return r.Client.Status().Update(context.TODO(), obj)
102 })
103
104 if updateError != nil {
105 r.Log.Error(updateError, "fail to update")
106 r.Recorder.Eventf(obj, "Normal", "Failed", "Failed to update status: %s", updateError.Error())
107 }
108 }()
109
110 pbClient, err := r.ChaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
111 Namespace: obj.Namespace,
112 Name: obj.Name,
113 })
114 if err != nil {
115 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
116 return ctrl.Result{Requeue: true}, nil
117 }
118 defer pbClient.Close()
119
120 if len(pod.Status.ContainerStatuses) == 0 {
121 err = errors.Wrapf(utils.ErrContainerNotFound, "pod %s/%s has empty container status", pod.Namespace, pod.Name)
122 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
123 return ctrl.Result{}, nil
124 }
125
126 containerID := pod.Status.ContainerStatuses[0].ContainerID
127 if obj.Spec.Container != nil &&
128 len(strings.TrimSpace(*obj.Spec.Container)) != 0 {
129 containerID = ""
130 for _, container := range pod.Status.ContainerStatuses {
131 if container.Name == *obj.Spec.Container {
132 containerID = container.ContainerID
133 break
134 }
135 }
136 if len(containerID) == 0 {
137 err = errors.Errorf("cannot find container with name %s", *obj.Spec.Container)
138 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
139 return ctrl.Result{}, nil
140 }
141 }
142
143 actions, err := json.Marshal(obj.Spec.Actions)
144 if err != nil {
145 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
146 return ctrl.Result{Requeue: true}, nil
147 }
148 input := string(actions)
149 r.Log.Info("input with", "config", input)
150
151 res, err := pbClient.ApplyIOChaos(ctx, &pb.ApplyIOChaosRequest{
152 Actions: input,
153 Volume: obj.Spec.VolumeMountPath,
154 ContainerId: containerID,
155
156 Instance: obj.Status.Pid,
157 StartTime: obj.Status.StartTime,
158 EnterNS: true,
159 })
160 if err != nil {
161 err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
162 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
163 return ctrl.Result{Requeue: true}, nil
164 }
165
166 startTime = res.StartTime
167 pid = res.Instance
168
169 return ctrl.Result{}, nil
170 }
171