1
2
3
4
5
6
7
8
9
10
11
12
13
14 package twophase
15
16 import (
17 "context"
18 "reflect"
19 "time"
20
21 "github.com/pkg/errors"
22 k8sError "k8s.io/apimachinery/pkg/api/errors"
23 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24 "k8s.io/apimachinery/pkg/types"
25 "k8s.io/client-go/util/retry"
26 ctrl "sigs.k8s.io/controller-runtime"
27
28 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
29 )
30
31 const iterMax = 1e4
32
33 type chaosStateMachine struct {
34 Chaos v1alpha1.InnerSchedulerObject
35 Req ctrl.Request
36 *Reconciler
37 }
38
39 func unexpected(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
40 currentPhase := m.Chaos.GetStatus().Experiment.Phase
41
42 return true, errors.Errorf("turn from %s into %s is unexpected", currentPhase, targetPhase)
43 }
44
45 func noop(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
46 updated := false
47 currentPhase := m.Chaos.GetStatus().Experiment.Phase
48
49 if currentPhase != targetPhase {
50 m.Chaos.GetStatus().Experiment.Phase = targetPhase
51 updated = true
52 }
53 return updated, nil
54 }
55
56 func apply(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, startTime time.Time) (bool, error) {
57 duration, err := m.Chaos.GetDuration()
58 if err != nil {
59 m.Log.Error(err, "failed to get chaos duration")
60 return false, err
61 }
62 if duration == nil {
63 zero := time.Duration(0)
64 duration = &zero
65 }
66
67 currentPhase := m.Chaos.GetStatus().Experiment.Phase
68 status := m.Chaos.GetStatus()
69
70 m.Log.Info("applying", "current phase", currentPhase, "target phase", targetPhase)
71 err = m.Apply(ctx, m.Req, m.Chaos)
72 if err != nil {
73 m.Log.Error(err, "fail to apply")
74
75 status.Experiment.Phase = v1alpha1.ExperimentPhaseFailed
76 status.FailedMessage = err.Error()
77
78 return true, err
79 }
80
81 status.FailedMessage = emptyString
82 status.Experiment.Phase = targetPhase
83
84 nextStart, nextRecover, err := m.IterateNextTime(startTime, *duration)
85 if err != nil {
86 m.Log.Error(err, "failed to get the next start time and recover time")
87 return true, err
88 }
89
90 m.Chaos.SetNextStart(*nextStart)
91 m.Chaos.SetNextRecover(*nextRecover)
92
93 status.Experiment.StartTime = &metav1.Time{Time: startTime}
94 status.Experiment.EndTime = nil
95 status.Experiment.Duration = duration.String()
96
97 return true, nil
98 }
99
100 func recover(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
101 duration, err := m.Chaos.GetDuration()
102 if err != nil {
103 m.Log.Error(err, "failed to get chaos duration")
104 return false, err
105 }
106 if duration == nil {
107 zero := time.Duration(0)
108 duration = &zero
109 }
110
111 currentPhase := m.Chaos.GetStatus().Experiment.Phase
112 status := m.Chaos.GetStatus()
113
114 m.Log.Info("recovering", "current phase", currentPhase, "target phase", targetPhase)
115 if err := m.Recover(ctx, m.Req, m.Chaos); err != nil {
116 status.FailedMessage = err.Error()
117
118 m.Log.Error(err, "fail to recover")
119 return true, err
120 }
121
122 status.Experiment.Phase = targetPhase
123 status.Experiment.EndTime = &metav1.Time{
124 Time: now,
125 }
126
127 if status.Experiment.StartTime != nil {
128 status.Experiment.Duration = now.Sub(status.Experiment.StartTime.Time).String()
129 }
130
131
132 if !now.Before(m.Chaos.GetNextRecover()) {
133 m.Chaos.SetNextRecover(m.Chaos.GetNextStart().Add(*duration))
134 }
135
136 return true, nil
137 }
138
139 func resume(ctx context.Context, m *chaosStateMachine, _ v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
140 startTime := now
141 duration, err := m.Chaos.GetDuration()
142 if err != nil {
143 m.Log.Error(err, "failed to get chaos duration")
144 return false, err
145 }
146 if duration == nil {
147 zero := time.Duration(0)
148 duration = &zero
149 }
150 status := m.Chaos.GetStatus()
151
152 nextStart := m.Chaos.GetNextStart()
153 nextRecover := m.Chaos.GetNextRecover()
154 var lastStart time.Time
155 if status.Experiment.StartTime == nil {
156
157 nextStart = now
158 lastStart = now
159 } else {
160 lastStart = status.Experiment.StartTime.Time
161 }
162
163 defer func() {
164 m.Chaos.SetNextStart(nextStart)
165 m.Chaos.SetNextRecover(nextRecover)
166 }()
167
168 counter := 0
169 for {
170 if nextRecover.After(now) && nextRecover.Before(nextStart) {
171 startTime = lastStart
172
173 return apply(ctx, m, v1alpha1.ExperimentPhaseRunning, startTime)
174 }
175
176 if nextStart.After(now) {
177 return noop(ctx, m, v1alpha1.ExperimentPhaseWaiting, now)
178 }
179
180 lastStart = nextStart
181 start, recover, err := m.IterateNextTime(nextStart, *duration)
182 if err != nil {
183 m.Log.Error(err, "failed to get the next start time and recover time")
184
185 return false, err
186 }
187
188 nextStart = *start
189 nextRecover = *recover
190
191 counter++
192 if counter > iterMax {
193
194
195 startTime = now
196 start, recover, err = m.IterateNextTime(startTime, *duration)
197 if err != nil {
198 m.Log.Error(err, "failed to get the next start time and recover time")
199 return false, err
200 }
201 nextStart = *start
202 nextRecover = *recover
203
204 return apply(ctx, m, v1alpha1.ExperimentPhaseRunning, startTime)
205 }
206 }
207 }
208
209
210
211 func (m *chaosStateMachine) run(ctx context.Context, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
212 currentPhase := m.Chaos.GetStatus().Experiment.Phase
213 m.Log.Info("change phase", "current phase", currentPhase, "target phase", targetPhase)
214
215 if phaseTransitionMap[currentPhase] == nil {
216 err := errors.Errorf("unexpected current phase '%s'", currentPhase)
217 return false, err
218 }
219
220 if phaseTransitionMap[currentPhase][targetPhase] == nil {
221 err := errors.Errorf("unexpected target phase '%s'", targetPhase)
222 return false, err
223 }
224
225 return phaseTransitionMap[currentPhase][targetPhase](ctx, m, targetPhase, now)
226 }
227
228 func (m *chaosStateMachine) Into(ctx context.Context, targetPhase v1alpha1.ExperimentPhase, now time.Time) error {
229 originFinalizers := m.Chaos.DeepCopyObject().(v1alpha1.InnerSchedulerObject).GetMeta().GetFinalizers()
230 updated, err := m.run(ctx, targetPhase, now)
231 if err != nil {
232 m.Log.Error(err, "error while executing state machine")
233 }
234
235 if updated {
236
237 updateError := m.Update(ctx, m.Chaos)
238 if updateError == nil {
239 return err
240 }
241
242 if !k8sError.IsConflict(updateError) {
243 return updateError
244 }
245
246 m.Log.Info("fail to update, and will retry on conflict")
247
248
249 if m.Chaos.GetChaos() == nil || m.Chaos.GetStatus() == nil {
250 return updateError
251 }
252
253 namespacedName := types.NamespacedName{
254 Namespace: m.Chaos.GetChaos().Namespace,
255 Name: m.Chaos.GetChaos().Name,
256 }
257
258 updateError = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
259
260 _chaos := m.Object()
261 if err := m.Client.Get(ctx, namespacedName, _chaos); err != nil {
262 m.Log.Error(err, "unable to get chaos")
263 return err
264 }
265 chaos := _chaos.(v1alpha1.InnerSchedulerObject)
266
267
268 status := chaos.GetStatus()
269 status.FailedMessage = m.Chaos.GetStatus().FailedMessage
270 status.Scheduler = m.Chaos.GetStatus().Scheduler
271 status.Experiment = m.Chaos.GetStatus().Experiment
272
273 newFinalizers := m.Chaos.GetMeta().GetFinalizers()
274 if !reflect.DeepEqual(originFinalizers, newFinalizers) {
275 chaos.GetMeta().SetFinalizers(newFinalizers)
276 }
277
278
279 return m.Update(ctx, chaos)
280 })
281
282 if updateError != nil {
283 return updateError
284 }
285 }
286
287 return err
288 }
289
290 func (m *chaosStateMachine) IterateNextTime(startTime time.Time, duration time.Duration) (*time.Time, *time.Time, error) {
291 scheduler := m.Chaos.GetScheduler()
292 if scheduler == nil {
293 return nil, nil, errors.Errorf("misdefined scheduler")
294 }
295 m.Log.Info("iterate nextStart and nextRecover", "startTime", startTime, "duration", duration, "scheduler", scheduler)
296 nextStart, err := nextTime(*scheduler, startTime)
297
298 if err != nil {
299 m.Log.Error(err, "failed to get the next start time")
300 return nil, nil, err
301 }
302 nextRecover := startTime.Add(duration)
303
304 counter := 0
305
306
307 for nextRecover.After(*nextStart) {
308 nextStart, err = nextTime(*scheduler, *nextStart)
309 if err != nil {
310 m.Log.Error(err, "failed to get the next start time")
311 return nil, nil, err
312 }
313
314 counter++
315 if counter > iterMax {
316 err = errors.Errorf("the number of iterations exceeded with nextRecover(%s) nextStart(%s)", nextRecover, nextStart)
317 return nil, nil, err
318 }
319 }
320
321 return nextStart, &nextRecover, nil
322 }
323
324 var phaseTransitionMap = map[v1alpha1.ExperimentPhase]map[v1alpha1.ExperimentPhase]func(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error){
325 v1alpha1.ExperimentPhaseUninitialized: {
326 v1alpha1.ExperimentPhaseUninitialized: noop,
327 v1alpha1.ExperimentPhaseRunning: apply,
328 v1alpha1.ExperimentPhaseWaiting: noop,
329 v1alpha1.ExperimentPhasePaused: noop,
330 v1alpha1.ExperimentPhaseFailed: unexpected,
331 v1alpha1.ExperimentPhaseFinished: noop,
332 },
333 v1alpha1.ExperimentPhaseRunning: {
334 v1alpha1.ExperimentPhaseUninitialized: unexpected,
335 v1alpha1.ExperimentPhaseRunning: noop,
336 v1alpha1.ExperimentPhaseWaiting: recover,
337 v1alpha1.ExperimentPhasePaused: recover,
338 v1alpha1.ExperimentPhaseFailed: unexpected,
339 v1alpha1.ExperimentPhaseFinished: recover,
340 },
341 v1alpha1.ExperimentPhaseWaiting: {
342 v1alpha1.ExperimentPhaseUninitialized: unexpected,
343 v1alpha1.ExperimentPhaseRunning: apply,
344 v1alpha1.ExperimentPhaseWaiting: noop,
345 v1alpha1.ExperimentPhasePaused: noop,
346 v1alpha1.ExperimentPhaseFailed: unexpected,
347 v1alpha1.ExperimentPhaseFinished: noop,
348 },
349 v1alpha1.ExperimentPhasePaused: {
350 v1alpha1.ExperimentPhaseUninitialized: unexpected,
351 v1alpha1.ExperimentPhaseRunning: resume,
352 v1alpha1.ExperimentPhaseWaiting: resume,
353 v1alpha1.ExperimentPhasePaused: noop,
354 v1alpha1.ExperimentPhaseFailed: unexpected,
355 v1alpha1.ExperimentPhaseFinished: noop,
356 },
357 v1alpha1.ExperimentPhaseFailed: {
358 v1alpha1.ExperimentPhaseUninitialized: unexpected,
359 v1alpha1.ExperimentPhaseRunning: apply,
360 v1alpha1.ExperimentPhaseWaiting: noop,
361 v1alpha1.ExperimentPhasePaused: noop,
362 v1alpha1.ExperimentPhaseFailed: noop,
363 v1alpha1.ExperimentPhaseFinished: recover,
364 },
365 v1alpha1.ExperimentPhaseFinished: {
366 v1alpha1.ExperimentPhaseUninitialized: unexpected,
367 v1alpha1.ExperimentPhaseRunning: unexpected,
368 v1alpha1.ExperimentPhaseWaiting: unexpected,
369 v1alpha1.ExperimentPhasePaused: unexpected,
370 v1alpha1.ExperimentPhaseFailed: unexpected,
371 v1alpha1.ExperimentPhaseFinished: noop,
372 },
373 }
374