1
2
3
4
5
6
7
8
9
10
11
12
13
14 package common
15
16 import (
17 "context"
18 "reflect"
19 "strings"
20
21 "github.com/go-logr/logr"
22 apierrors "k8s.io/apimachinery/pkg/api/errors"
23 "k8s.io/client-go/util/retry"
24 ctrl "sigs.k8s.io/controller-runtime"
25 "sigs.k8s.io/controller-runtime/pkg/client"
26
27 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
28 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
29 "github.com/chaos-mesh/chaos-mesh/pkg/selector"
30 )
31
32 type InnerObjectWithCustomStatus interface {
33 v1alpha1.InnerObject
34
35 GetCustomStatus() interface{}
36 }
37
38 type InnerObjectWithSelector interface {
39 v1alpha1.InnerObject
40
41 GetSelectorSpecs() map[string]interface{}
42 }
43
44 type ChaosImpl interface {
45 Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error)
46 Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error)
47 }
48
49
50 type Reconciler struct {
51 Impl ChaosImpl
52
53
54 Object InnerObjectWithSelector
55
56
57 client.Client
58 client.Reader
59
60 Recorder recorder.ChaosRecorder
61
62 Selector *selector.Selector
63
64 Log logr.Logger
65 }
66
67 type Operation string
68
69 const (
70 Apply Operation = "apply"
71 Recover Operation = "recover"
72 Nothing Operation = ""
73 )
74
75
76 func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
77 obj := r.Object.DeepCopyObject().(InnerObjectWithSelector)
78
79 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
80 if apierrors.IsNotFound(err) {
81 r.Log.Info("chaos not found")
82 } else {
83
84 r.Log.Error(err, "unable to get chaos")
85 }
86 return ctrl.Result{}, nil
87 }
88
89 shouldUpdate := false
90
91 desiredPhase := obj.GetStatus().Experiment.DesiredPhase
92 records := obj.GetStatus().Experiment.Records
93 selectors := obj.GetSelectorSpecs()
94
95 if records == nil {
96 for name, sel := range selectors {
97 targets, err := r.Selector.Select(context.TODO(), sel)
98 if err != nil {
99 r.Log.Error(err, "fail to select")
100 r.Recorder.Event(obj, recorder.Failed{
101 Activity: "select targets",
102 Err: err.Error(),
103 })
104 return ctrl.Result{}, nil
105 }
106
107 for _, target := range targets {
108 records = append(records, &v1alpha1.Record{
109 Id: target.Id(),
110 SelectorKey: name,
111 Phase: v1alpha1.NotInjected,
112 })
113 shouldUpdate = true
114 }
115 }
116
117 }
118
119 if len(records) == 0 {
120 r.Log.Info("no record has been selected")
121 r.Recorder.Event(obj, recorder.Failed{
122 Activity: "select targets",
123 Err: "no record has been selected",
124 })
125 return ctrl.Result{}, nil
126 }
127
128 needRetry := false
129 for index, record := range records {
130 var err error
131 r.Log.Info("iterating record", "record", record, "desiredPhase", desiredPhase)
132
133
134
135
136
137
138 originalPhase := record.Phase
139 operation := Nothing
140 if desiredPhase == v1alpha1.RunningPhase && originalPhase != v1alpha1.Injected {
141
142
143
144 if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
145 operation = Apply
146 } else {
147 operation = Recover
148 }
149 }
150 if desiredPhase == v1alpha1.StoppedPhase && originalPhase != v1alpha1.NotInjected {
151
152
153
154 if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
155 operation = Apply
156 } else {
157 operation = Recover
158 }
159 }
160
161 if operation == Apply {
162 r.Log.Info("apply chaos", "id", records[index].Id)
163 record.Phase, err = r.Impl.Apply(context.TODO(), index, records, obj)
164 if record.Phase != originalPhase {
165 shouldUpdate = true
166 }
167 if err != nil {
168
169
170 r.Log.Error(err, "fail to apply chaos")
171 r.Recorder.Event(obj, recorder.Failed{
172 Activity: "apply chaos",
173 Err: err.Error(),
174 })
175 needRetry = true
176 continue
177 }
178
179 if record.Phase == v1alpha1.Injected {
180 r.Recorder.Event(obj, recorder.Applied{
181 Id: records[index].Id,
182 })
183 }
184 } else if operation == Recover {
185 r.Log.Info("recover chaos", "id", records[index].Id)
186 record.Phase, err = r.Impl.Recover(context.TODO(), index, records, obj)
187 if record.Phase != originalPhase {
188 shouldUpdate = true
189 }
190 if err != nil {
191
192
193 r.Log.Error(err, "fail to recover chaos")
194 r.Recorder.Event(obj, recorder.Failed{
195 Activity: "recover chaos",
196 Err: err.Error(),
197 })
198 needRetry = true
199 continue
200 }
201
202 if record.Phase == v1alpha1.NotInjected {
203 r.Recorder.Event(obj, recorder.Recovered{
204 Id: records[index].Id,
205 })
206 }
207 }
208 }
209
210
211 var customStatus reflect.Value
212 if objWithStatus, ok := obj.(InnerObjectWithCustomStatus); ok {
213 customStatus = reflect.Indirect(reflect.ValueOf(objWithStatus.GetCustomStatus()))
214 }
215 if shouldUpdate {
216 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
217 r.Log.Info("updating records", "records", records)
218 obj := r.Object.DeepCopyObject().(InnerObjectWithSelector)
219
220 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
221 r.Log.Error(err, "unable to get chaos")
222 return err
223 }
224
225 obj.GetStatus().Experiment.Records = records
226 if objWithStatus, ok := obj.(InnerObjectWithCustomStatus); ok {
227 ptrToCustomStatus := objWithStatus.GetCustomStatus()
228
229 reflect.Indirect(reflect.ValueOf(ptrToCustomStatus)).Set(reflect.Indirect(customStatus))
230 }
231 return r.Client.Update(context.TODO(), obj)
232 })
233 if updateError != nil {
234 r.Log.Error(updateError, "fail to update")
235 r.Recorder.Event(obj, recorder.Failed{
236 Activity: "update records",
237 Err: updateError.Error(),
238 })
239 return ctrl.Result{Requeue: true}, nil
240 }
241
242 r.Recorder.Event(obj, recorder.Updated{
243 Field: "records",
244 })
245 }
246 return ctrl.Result{Requeue: needRetry}, nil
247 }
248