1
2
3
4
5
6
7
8
9
10
11
12
13
14 package twophase
15
16 import (
17 "context"
18 "strings"
19 "time"
20
21 "github.com/pkg/errors"
22 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23 "k8s.io/apimachinery/pkg/types"
24 "k8s.io/client-go/util/retry"
25 ctrl "sigs.k8s.io/controller-runtime"
26
27 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
28 )
29
30 const iterMax = 1e4
31
32 type chaosStateMachine struct {
33 Chaos v1alpha1.InnerSchedulerObject
34 Req ctrl.Request
35 *Reconciler
36 }
37
38 func unexpected(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
39 currentPhase := m.Chaos.GetStatus().Experiment.Phase
40
41 return true, errors.Errorf("turn from %s into %s is unexpected", currentPhase, targetPhase)
42 }
43
44 func noop(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
45 updated := false
46 currentPhase := m.Chaos.GetStatus().Experiment.Phase
47
48 if currentPhase != targetPhase {
49 m.Chaos.GetStatus().Experiment.Phase = targetPhase
50 updated = true
51 }
52 return updated, nil
53 }
54
55 func apply(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, startTime time.Time) (bool, error) {
56 duration, err := m.Chaos.GetDuration()
57 if err != nil {
58 m.Log.Error(err, "failed to get chaos duration")
59 return false, err
60 }
61 if duration == nil {
62 zero := time.Duration(0)
63 duration = &zero
64 }
65
66 currentPhase := m.Chaos.GetStatus().Experiment.Phase
67 status := m.Chaos.GetStatus()
68
69 m.Log.Info("applying", "current phase", currentPhase, "target phase", targetPhase)
70 err = m.Apply(ctx, m.Req, m.Chaos)
71 if err != nil {
72 m.Log.Error(err, "fail to apply")
73
74 status.Experiment.Phase = v1alpha1.ExperimentPhaseFailed
75 status.FailedMessage = err.Error()
76
77 return true, err
78 }
79
80 status.FailedMessage = emptyString
81 status.Experiment.Phase = targetPhase
82
83 nextStart, nextRecover, err := m.IterateNextTime(startTime, *duration)
84 if err != nil {
85 m.Log.Error(err, "failed to get the next start time and recover time")
86 return true, err
87 }
88
89 m.Chaos.SetNextStart(*nextStart)
90 m.Chaos.SetNextRecover(*nextRecover)
91
92 status.Experiment.StartTime = &metav1.Time{Time: startTime}
93 status.Experiment.EndTime = nil
94 status.Experiment.Duration = duration.String()
95
96 return true, nil
97 }
98
99 func recover(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
100 duration, err := m.Chaos.GetDuration()
101 if err != nil {
102 m.Log.Error(err, "failed to get chaos duration")
103 return false, err
104 }
105 if duration == nil {
106 zero := time.Duration(0)
107 duration = &zero
108 }
109
110 currentPhase := m.Chaos.GetStatus().Experiment.Phase
111 status := m.Chaos.GetStatus()
112
113 m.Log.Info("recovering", "current phase", currentPhase, "target phase", targetPhase)
114 if err := m.Recover(ctx, m.Req, m.Chaos); err != nil {
115 status.FailedMessage = err.Error()
116
117 m.Log.Error(err, "fail to recover")
118 return true, err
119 }
120
121 status.Experiment.Phase = targetPhase
122 status.Experiment.EndTime = &metav1.Time{
123 Time: now,
124 }
125
126 if status.Experiment.StartTime != nil {
127 status.Experiment.Duration = now.Sub(status.Experiment.StartTime.Time).String()
128 }
129
130
131 if !now.Before(m.Chaos.GetNextRecover()) {
132 m.Chaos.SetNextRecover(m.Chaos.GetNextStart().Add(*duration))
133 }
134
135 return true, nil
136 }
137
138 func resume(ctx context.Context, m *chaosStateMachine, _ v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
139 startTime := now
140 duration, err := m.Chaos.GetDuration()
141 if err != nil {
142 m.Log.Error(err, "failed to get chaos duration")
143 return false, err
144 }
145 if duration == nil {
146 zero := time.Duration(0)
147 duration = &zero
148 }
149 status := m.Chaos.GetStatus()
150
151 nextStart := m.Chaos.GetNextStart()
152 nextRecover := m.Chaos.GetNextRecover()
153 var lastStart time.Time
154 if status.Experiment.StartTime == nil {
155
156 nextStart = now
157 lastStart = now
158 } else {
159 lastStart = status.Experiment.StartTime.Time
160 }
161
162 defer func() {
163 m.Chaos.SetNextStart(nextStart)
164 m.Chaos.SetNextRecover(nextRecover)
165 }()
166
167 counter := 0
168 for {
169 if nextRecover.After(now) && nextRecover.Before(nextStart) {
170 startTime = lastStart
171
172 return apply(ctx, m, v1alpha1.ExperimentPhaseRunning, startTime)
173 }
174
175 if nextStart.After(now) {
176 return noop(ctx, m, v1alpha1.ExperimentPhaseWaiting, now)
177 }
178
179 lastStart = nextStart
180 start, recover, err := m.IterateNextTime(nextStart, *duration)
181 if err != nil {
182 m.Log.Error(err, "failed to get the next start time and recover time")
183
184 return false, err
185 }
186
187 nextStart = *start
188 nextRecover = *recover
189
190 counter++
191 if counter > iterMax {
192
193
194 startTime = now
195 start, recover, err = m.IterateNextTime(startTime, *duration)
196 if err != nil {
197 m.Log.Error(err, "failed to get the next start time and recover time")
198 return false, err
199 }
200 nextStart = *start
201 nextRecover = *recover
202
203 return apply(ctx, m, v1alpha1.ExperimentPhaseRunning, startTime)
204 }
205 }
206 }
207
208
209
210 func (m *chaosStateMachine) run(ctx context.Context, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
211 currentPhase := m.Chaos.GetStatus().Experiment.Phase
212 m.Log.Info("change phase", "current phase", currentPhase, "target phase", targetPhase)
213
214 if phaseTransitionMap[currentPhase] == nil {
215 err := errors.Errorf("unexpected current phase '%s'", currentPhase)
216 return false, err
217 }
218
219 if phaseTransitionMap[currentPhase][targetPhase] == nil {
220 err := errors.Errorf("unexpected target phase '%s'", targetPhase)
221 return false, err
222 }
223
224 return phaseTransitionMap[currentPhase][targetPhase](ctx, m, targetPhase, now)
225 }
226
227 func (m *chaosStateMachine) Into(ctx context.Context, targetPhase v1alpha1.ExperimentPhase, now time.Time) error {
228 updated, err := m.run(ctx, targetPhase, now)
229 if err != nil {
230 m.Log.Error(err, "error while executing state machine")
231 }
232
233 if updated {
234
235 updateError := m.Update(ctx, m.Chaos)
236 if updateError == nil {
237 return err
238 }
239
240 if !strings.Contains(updateError.Error(), "the object has been modified; please apply your changes to the latest version and try again") {
241 return updateError
242 }
243
244 m.Log.Error(updateError, "fail to update, and will retry on conflict")
245
246
247 if m.Chaos.GetChaos() == nil || m.Chaos.GetStatus() == nil {
248 return updateError
249 }
250
251 namespacedName := types.NamespacedName{
252 Namespace: m.Chaos.GetChaos().Namespace,
253 Name: m.Chaos.GetChaos().Name,
254 }
255
256 updateError = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
257
258 _chaos := m.Object()
259 if err := m.Client.Get(ctx, namespacedName, _chaos); err != nil {
260 m.Log.Error(err, "unable to get chaos")
261 return err
262 }
263 chaos := _chaos.(v1alpha1.InnerSchedulerObject)
264
265
266 status := chaos.GetStatus()
267 status.FailedMessage = m.Chaos.GetStatus().FailedMessage
268 status.Scheduler = m.Chaos.GetStatus().Scheduler
269 status.Experiment = m.Chaos.GetStatus().Experiment
270
271
272 return m.Update(ctx, chaos)
273 })
274
275 if updateError != nil {
276 return updateError
277 }
278 }
279
280 return err
281 }
282
283 func (m *chaosStateMachine) IterateNextTime(startTime time.Time, duration time.Duration) (*time.Time, *time.Time, error) {
284 scheduler := m.Chaos.GetScheduler()
285 if scheduler == nil {
286 return nil, nil, errors.Errorf("misdefined scheduler")
287 }
288 m.Log.Info("iterate nextStart and nextRecover", "startTime", startTime, "duration", duration, "scheduler", scheduler)
289 nextStart, err := nextTime(*scheduler, startTime)
290
291 if err != nil {
292 m.Log.Error(err, "failed to get the next start time")
293 return nil, nil, err
294 }
295 nextRecover := startTime.Add(duration)
296
297 counter := 0
298
299
300 for nextRecover.After(*nextStart) {
301 nextStart, err = nextTime(*scheduler, *nextStart)
302 if err != nil {
303 m.Log.Error(err, "failed to get the next start time")
304 return nil, nil, err
305 }
306
307 counter++
308 if counter > iterMax {
309 err = errors.Errorf("the number of iterations exceeded with nextRecover(%s) nextStart(%s)", nextRecover, nextStart)
310 return nil, nil, err
311 }
312 }
313
314 return nextStart, &nextRecover, nil
315 }
316
317 var phaseTransitionMap = map[v1alpha1.ExperimentPhase]map[v1alpha1.ExperimentPhase]func(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error){
318 v1alpha1.ExperimentPhaseUninitialized: {
319 v1alpha1.ExperimentPhaseUninitialized: noop,
320 v1alpha1.ExperimentPhaseRunning: apply,
321 v1alpha1.ExperimentPhaseWaiting: noop,
322 v1alpha1.ExperimentPhasePaused: noop,
323 v1alpha1.ExperimentPhaseFailed: unexpected,
324 v1alpha1.ExperimentPhaseFinished: noop,
325 },
326 v1alpha1.ExperimentPhaseRunning: {
327 v1alpha1.ExperimentPhaseUninitialized: unexpected,
328 v1alpha1.ExperimentPhaseRunning: noop,
329 v1alpha1.ExperimentPhaseWaiting: recover,
330 v1alpha1.ExperimentPhasePaused: recover,
331 v1alpha1.ExperimentPhaseFailed: unexpected,
332 v1alpha1.ExperimentPhaseFinished: recover,
333 },
334 v1alpha1.ExperimentPhaseWaiting: {
335 v1alpha1.ExperimentPhaseUninitialized: unexpected,
336 v1alpha1.ExperimentPhaseRunning: apply,
337 v1alpha1.ExperimentPhaseWaiting: noop,
338 v1alpha1.ExperimentPhasePaused: noop,
339 v1alpha1.ExperimentPhaseFailed: unexpected,
340 v1alpha1.ExperimentPhaseFinished: noop,
341 },
342 v1alpha1.ExperimentPhasePaused: {
343 v1alpha1.ExperimentPhaseUninitialized: unexpected,
344 v1alpha1.ExperimentPhaseRunning: resume,
345 v1alpha1.ExperimentPhaseWaiting: resume,
346 v1alpha1.ExperimentPhasePaused: noop,
347 v1alpha1.ExperimentPhaseFailed: unexpected,
348 v1alpha1.ExperimentPhaseFinished: noop,
349 },
350 v1alpha1.ExperimentPhaseFailed: {
351 v1alpha1.ExperimentPhaseUninitialized: unexpected,
352 v1alpha1.ExperimentPhaseRunning: apply,
353 v1alpha1.ExperimentPhaseWaiting: noop,
354 v1alpha1.ExperimentPhasePaused: noop,
355 v1alpha1.ExperimentPhaseFailed: noop,
356 v1alpha1.ExperimentPhaseFinished: recover,
357 },
358 v1alpha1.ExperimentPhaseFinished: {
359 v1alpha1.ExperimentPhaseUninitialized: unexpected,
360 v1alpha1.ExperimentPhaseRunning: unexpected,
361 v1alpha1.ExperimentPhaseWaiting: unexpected,
362 v1alpha1.ExperimentPhasePaused: unexpected,
363 v1alpha1.ExperimentPhaseFailed: unexpected,
364 v1alpha1.ExperimentPhaseFinished: noop,
365 },
366 }
367