1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package records
17
18 import (
19 "context"
20 "reflect"
21 "strings"
22
23 "github.com/go-logr/logr"
24 apierrors "k8s.io/apimachinery/pkg/api/errors"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/client-go/util/retry"
27 ctrl "sigs.k8s.io/controller-runtime"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
32 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
33 "github.com/chaos-mesh/chaos-mesh/pkg/selector"
34 )
35
36
37 type Reconciler struct {
38 Impl types.ChaosImpl
39
40
41 Object v1alpha1.InnerObject
42
43
44 client.Client
45 client.Reader
46
47 Recorder recorder.ChaosRecorder
48
49 Selector *selector.Selector
50
51 Log logr.Logger
52 }
53
54 type Operation string
55
56 const (
57 Apply Operation = "apply"
58 Recover Operation = "recover"
59 Nothing Operation = ""
60 )
61
62
63 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
64 obj := r.Object.DeepCopyObject().(v1alpha1.InnerObjectWithSelector)
65 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
66 if apierrors.IsNotFound(err) {
67 r.Log.Info("chaos not found")
68 } else {
69
70 r.Log.Error(err, "unable to get chaos")
71 }
72 return ctrl.Result{}, nil
73 }
74
75 shouldUpdate := false
76
77 desiredPhase := obj.GetStatus().Experiment.DesiredPhase
78 records := obj.GetStatus().Experiment.Records
79 selectors := obj.GetSelectorSpecs()
80
81 logger := r.Log.WithValues("name", obj.GetName(), "namespace", obj.GetNamespace(), "kind", obj.GetObjectKind().GroupVersionKind().Kind)
82
83 if records == nil {
84 for name, sel := range selectors {
85 targets, err := r.Selector.Select(context.TODO(), sel)
86 if err != nil {
87 logger.Error(err, "fail to select")
88 r.Recorder.Event(obj, recorder.Failed{
89 Activity: "select targets",
90 Err: err.Error(),
91 })
92 return ctrl.Result{}, nil
93 }
94
95 if len(targets) == 0 {
96 logger.Info("no target has been selected")
97 r.Recorder.Event(obj, recorder.Failed{
98 Activity: "select targets",
99 Err: "no target has been selected",
100 })
101 return ctrl.Result{}, nil
102 }
103
104 for _, target := range targets {
105 records = append(records, &v1alpha1.Record{
106 Id: target.Id(),
107 SelectorKey: name,
108 Phase: v1alpha1.NotInjected,
109 })
110 shouldUpdate = true
111 }
112 }
113
114 }
115
116 needRetry := false
117 for index, record := range records {
118 var err error
119 idLogger := logger.WithValues("id", records[index].Id)
120 idLogger.Info("iterating record", "record", record, "desiredPhase", desiredPhase)
121
122
123
124
125
126
127 originalPhase := record.Phase
128 operation := Nothing
129 if desiredPhase == v1alpha1.RunningPhase && originalPhase != v1alpha1.Injected {
130
131
132
133 if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
134 operation = Apply
135 } else {
136 operation = Recover
137 }
138 }
139 if desiredPhase == v1alpha1.StoppedPhase && originalPhase != v1alpha1.NotInjected {
140
141
142
143 if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
144 operation = Apply
145 } else {
146 operation = Recover
147 }
148 }
149
150 if operation == Apply {
151 idLogger.Info("apply chaos")
152 record.Phase, err = r.Impl.Apply(context.TODO(), index, records, obj)
153 if record.Phase != originalPhase {
154 shouldUpdate = true
155 }
156 if err != nil {
157
158
159 idLogger.Error(err, "fail to apply chaos")
160 applyFailedEvent := newRecordEvent(v1alpha1.TypeFailed, v1alpha1.Apply, err.Error())
161 records[index].Events = append(records[index].Events, *applyFailedEvent)
162 r.Recorder.Event(obj, recorder.Failed{
163 Activity: "apply chaos",
164 Err: err.Error(),
165 })
166 needRetry = true
167
168 shouldUpdate = true
169 continue
170 }
171
172 if record.Phase == v1alpha1.Injected {
173 records[index].InjectedCount++
174 applySucceedEvent := newRecordEvent(v1alpha1.TypeSucceeded, v1alpha1.Apply, "")
175 records[index].Events = append(records[index].Events, *applySucceedEvent)
176 r.Recorder.Event(obj, recorder.Applied{
177 Id: records[index].Id,
178 })
179 }
180 } else if operation == Recover {
181 idLogger.Info("recover chaos")
182 record.Phase, err = r.Impl.Recover(context.TODO(), index, records, obj)
183 if record.Phase != originalPhase {
184 shouldUpdate = true
185 }
186 if err != nil {
187
188
189 idLogger.Error(err, "fail to recover chaos")
190 recoverFailedEvent := newRecordEvent(v1alpha1.TypeFailed, v1alpha1.Recover, err.Error())
191 records[index].Events = append(records[index].Events, *recoverFailedEvent)
192 r.Recorder.Event(obj, recorder.Failed{
193 Activity: "recover chaos",
194 Err: err.Error(),
195 })
196 needRetry = true
197
198 shouldUpdate = true
199 continue
200 }
201
202 if record.Phase == v1alpha1.NotInjected {
203 records[index].RecoveredCount++
204 recoverSucceedEvent := newRecordEvent(v1alpha1.TypeSucceeded, v1alpha1.Recover, "")
205 records[index].Events = append(records[index].Events, *recoverSucceedEvent)
206 r.Recorder.Event(obj, recorder.Recovered{
207 Id: records[index].Id,
208 })
209 }
210 }
211 }
212
213
214 var customStatus reflect.Value
215 if objWithStatus, ok := obj.(v1alpha1.InnerObjectWithCustomStatus); ok {
216 customStatus = reflect.Indirect(reflect.ValueOf(objWithStatus.GetCustomStatus()))
217 }
218 if shouldUpdate {
219 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
220 logger.Info("updating records", "records", records)
221 obj := r.Object.DeepCopyObject().(v1alpha1.InnerObjectWithSelector)
222
223 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
224 logger.Error(err, "unable to get chaos")
225 return err
226 }
227
228 obj.GetStatus().Experiment.Records = records
229 if objWithStatus, ok := obj.(v1alpha1.InnerObjectWithCustomStatus); ok {
230 ptrToCustomStatus := objWithStatus.GetCustomStatus()
231
232 reflect.Indirect(reflect.ValueOf(ptrToCustomStatus)).Set(reflect.Indirect(customStatus))
233 }
234 return r.Client.Update(context.TODO(), obj)
235 })
236 if updateError != nil {
237 logger.Error(updateError, "fail to update")
238 r.Recorder.Event(obj, recorder.Failed{
239 Activity: "update records",
240 Err: updateError.Error(),
241 })
242 return ctrl.Result{Requeue: true}, nil
243 }
244
245 r.Recorder.Event(obj, recorder.Updated{
246 Field: "records",
247 })
248 }
249 return ctrl.Result{Requeue: needRetry}, nil
250 }
251
252 func newRecordEvent(eventType v1alpha1.RecordEventType, eventStage v1alpha1.RecordEventOperation, msg string) *v1alpha1.RecordEvent {
253 return v1alpha1.NewRecordEvent(eventType, eventStage, msg, metav1.Now())
254 }
255