1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package records
17
18 import (
19 "context"
20 "reflect"
21 "strings"
22
23 "github.com/go-logr/logr"
24 apierrors "k8s.io/apimachinery/pkg/api/errors"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/client-go/util/retry"
27 ctrl "sigs.k8s.io/controller-runtime"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
32 "github.com/chaos-mesh/chaos-mesh/controllers/config"
33 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
34 "github.com/chaos-mesh/chaos-mesh/pkg/selector"
35 )
36
37
38 type Reconciler struct {
39 Impl types.ChaosImpl
40
41
42 Object v1alpha1.InnerObject
43
44
45 client.Client
46 client.Reader
47
48 Recorder recorder.ChaosRecorder
49
50 Selector *selector.Selector
51
52 Log logr.Logger
53 }
54
55 type Operation string
56
57 const (
58 Apply Operation = "apply"
59 Recover Operation = "recover"
60 Nothing Operation = ""
61 )
62
63
64 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
65 obj := r.Object.DeepCopyObject().(v1alpha1.InnerObjectWithSelector)
66 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
67 if apierrors.IsNotFound(err) {
68 r.Log.Info("chaos not found")
69 } else {
70
71 r.Log.Error(err, "unable to get chaos")
72 }
73 return ctrl.Result{}, nil
74 }
75
76 shouldUpdate := false
77
78 desiredPhase := obj.GetStatus().Experiment.DesiredPhase
79 records := obj.GetStatus().Experiment.Records
80 selectors := obj.GetSelectorSpecs()
81
82 logger := r.Log.WithValues("name", obj.GetName(), "namespace", obj.GetNamespace(), "kind", obj.GetObjectKind().GroupVersionKind().Kind)
83
84 if records == nil {
85 for name, sel := range selectors {
86 targets, err := r.Selector.Select(context.TODO(), sel)
87 if err != nil {
88 logger.Error(err, "fail to select")
89 r.Recorder.Event(obj, recorder.Failed{
90 Activity: "select targets",
91 Err: err.Error(),
92 })
93 return ctrl.Result{}, nil
94 }
95
96 if len(targets) == 0 {
97 logger.Info("no target has been selected")
98 r.Recorder.Event(obj, recorder.Failed{
99 Activity: "select targets",
100 Err: "no target has been selected",
101 })
102 return ctrl.Result{}, nil
103 }
104
105 for _, target := range targets {
106 records = append(records, &v1alpha1.Record{
107 Id: target.Id(),
108 SelectorKey: name,
109 Phase: v1alpha1.NotInjected,
110 })
111 shouldUpdate = true
112 }
113 }
114
115 }
116
117 needRetry := false
118 for index, record := range records {
119 var err error
120 idLogger := logger.WithValues("id", records[index].Id)
121 idLogger.Info("iterating record", "record", record, "desiredPhase", desiredPhase)
122
123
124
125
126
127
128 originalPhase := record.Phase
129 operation := Nothing
130 if desiredPhase == v1alpha1.RunningPhase && originalPhase != v1alpha1.Injected {
131
132
133
134 if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
135 operation = Apply
136 } else {
137 operation = Recover
138 }
139 }
140 if desiredPhase == v1alpha1.StoppedPhase && originalPhase != v1alpha1.NotInjected {
141
142
143
144 if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
145 operation = Apply
146 } else {
147 operation = Recover
148 }
149 }
150
151 if operation == Apply {
152 idLogger.Info("apply chaos")
153 record.Phase, err = r.Impl.Apply(context.TODO(), index, records, obj)
154 if record.Phase != originalPhase {
155 shouldUpdate = true
156 }
157 if err != nil {
158
159
160 idLogger.Error(err, "fail to apply chaos")
161 applyFailedEvent := newRecordEvent(v1alpha1.TypeFailed, v1alpha1.Apply, err.Error())
162 if len(records[index].Events) >= config.ControllerCfg.MaxEvents {
163 records[index].Events = records[index].Events[1:]
164 }
165 records[index].Events = append(records[index].Events, *applyFailedEvent)
166 r.Recorder.Event(obj, recorder.Failed{
167 Activity: "apply chaos",
168 Err: err.Error(),
169 })
170 needRetry = true
171
172 shouldUpdate = true
173 continue
174 }
175
176 if record.Phase == v1alpha1.Injected {
177 records[index].InjectedCount++
178 applySucceedEvent := newRecordEvent(v1alpha1.TypeSucceeded, v1alpha1.Apply, "")
179 if len(records[index].Events) >= config.ControllerCfg.MaxEvents {
180 records[index].Events = records[index].Events[1:]
181 }
182 records[index].Events = append(records[index].Events, *applySucceedEvent)
183 r.Recorder.Event(obj, recorder.Applied{
184 Id: records[index].Id,
185 })
186 }
187 } else if operation == Recover {
188 idLogger.Info("recover chaos")
189 record.Phase, err = r.Impl.Recover(context.TODO(), index, records, obj)
190 if record.Phase != originalPhase {
191 shouldUpdate = true
192 }
193 if err != nil {
194
195
196 idLogger.Error(err, "fail to recover chaos")
197 recoverFailedEvent := newRecordEvent(v1alpha1.TypeFailed, v1alpha1.Recover, err.Error())
198 if len(records[index].Events) >= config.ControllerCfg.MaxEvents {
199 records[index].Events = records[index].Events[1:]
200 }
201 records[index].Events = append(records[index].Events, *recoverFailedEvent)
202 r.Recorder.Event(obj, recorder.Failed{
203 Activity: "recover chaos",
204 Err: err.Error(),
205 })
206 needRetry = true
207
208 shouldUpdate = true
209 continue
210 }
211
212 if record.Phase == v1alpha1.NotInjected {
213 records[index].RecoveredCount++
214 recoverSucceedEvent := newRecordEvent(v1alpha1.TypeSucceeded, v1alpha1.Recover, "")
215 if len(records[index].Events) >= config.ControllerCfg.MaxEvents {
216 records[index].Events = records[index].Events[1:]
217 }
218 records[index].Events = append(records[index].Events, *recoverSucceedEvent)
219 r.Recorder.Event(obj, recorder.Recovered{
220 Id: records[index].Id,
221 })
222 }
223 }
224 }
225
226
227 var customStatus reflect.Value
228 if objWithStatus, ok := obj.(v1alpha1.InnerObjectWithCustomStatus); ok {
229 customStatus = reflect.Indirect(reflect.ValueOf(objWithStatus.GetCustomStatus()))
230 }
231 if shouldUpdate {
232 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
233 logger.Info("updating records", "records", records)
234 obj := r.Object.DeepCopyObject().(v1alpha1.InnerObjectWithSelector)
235
236 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
237 logger.Error(err, "unable to get chaos")
238 return err
239 }
240
241 obj.GetStatus().Experiment.Records = records
242 if objWithStatus, ok := obj.(v1alpha1.InnerObjectWithCustomStatus); ok {
243 ptrToCustomStatus := objWithStatus.GetCustomStatus()
244
245 reflect.Indirect(reflect.ValueOf(ptrToCustomStatus)).Set(reflect.Indirect(customStatus))
246 }
247 return r.Client.Update(context.TODO(), obj)
248 })
249 if updateError != nil {
250 logger.Error(updateError, "fail to update")
251 r.Recorder.Event(obj, recorder.Failed{
252 Activity: "update records",
253 Err: updateError.Error(),
254 })
255 return ctrl.Result{Requeue: true}, nil
256 }
257
258 r.Recorder.Event(obj, recorder.Updated{
259 Field: "records",
260 })
261 }
262 return ctrl.Result{Requeue: needRetry}, nil
263 }
264
265 func newRecordEvent(eventType v1alpha1.RecordEventType, eventStage v1alpha1.RecordEventOperation, msg string) *v1alpha1.RecordEvent {
266 return v1alpha1.NewRecordEvent(eventType, eventStage, msg, metav1.Now())
267 }
268