...
1
2
3
4
5
6
7
8
9
10
11
12
13
14 package desiredphase
15
16 import (
17 "context"
18 "time"
19
20 "github.com/go-logr/logr"
21 apierrors "k8s.io/apimachinery/pkg/api/errors"
22 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23 "k8s.io/client-go/util/retry"
24 ctrl "sigs.k8s.io/controller-runtime"
25 "sigs.k8s.io/controller-runtime/pkg/client"
26
27 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
28 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
29 )
30
31
32 type Reconciler struct {
33
34 Object v1alpha1.InnerObject
35
36
37 client.Client
38
39 Recorder recorder.ChaosRecorder
40 Log logr.Logger
41 }
42
43
44 func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
45 obj := r.Object.DeepCopyObject().(v1alpha1.InnerObject)
46
47 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
48 if apierrors.IsNotFound(err) {
49 r.Log.Info("chaos not found")
50 } else {
51
52 r.Log.Error(err, "unable to get chaos")
53 }
54 return ctrl.Result{}, nil
55 }
56
57 ctx := &reconcileContext{
58 obj: obj,
59 Reconciler: r,
60 shouldUpdate: false,
61 }
62 return ctx.Reconcile(req)
63 }
64
65 type reconcileContext struct {
66 obj v1alpha1.InnerObject
67
68 *Reconciler
69 shouldUpdate bool
70 requeueAfter time.Duration
71 }
72
73 func (ctx *reconcileContext) GetCreationTimestamp() metav1.Time {
74 return ctx.obj.GetObjectMeta().CreationTimestamp
75 }
76
77 func (ctx *reconcileContext) CalcDesiredPhase() (v1alpha1.DesiredPhase, []recorder.ChaosEvent) {
78 events := []recorder.ChaosEvent{}
79
80
81 if ctx.obj.IsDeleted() {
82 if ctx.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.StoppedPhase {
83 events = append(events, recorder.Deleted{})
84 }
85 return v1alpha1.StoppedPhase, events
86 }
87
88 if ctx.obj.IsOneShot() {
89
90
91 return v1alpha1.RunningPhase, events
92 }
93
94
95 now := time.Now()
96
97 durationExceeded, untilStop, err := ctx.obj.DurationExceeded(now)
98 if err != nil {
99 ctx.Log.Error(err, "failed to parse duration")
100 }
101 if durationExceeded {
102 if ctx.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.StoppedPhase {
103 events = append(events, recorder.TimeUp{})
104 }
105 return v1alpha1.StoppedPhase, events
106 }
107
108 ctx.requeueAfter = untilStop
109
110
111 if ctx.obj.IsPaused() {
112 if ctx.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.StoppedPhase {
113 events = append(events, recorder.Paused{})
114 }
115 return v1alpha1.StoppedPhase, events
116 }
117
118 if ctx.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.RunningPhase {
119 events = append(events, recorder.Started{})
120 }
121 return v1alpha1.RunningPhase, events
122 }
123
124 func (ctx *reconcileContext) Reconcile(req ctrl.Request) (ctrl.Result, error) {
125 desiredPhase, events := ctx.CalcDesiredPhase()
126
127 ctx.Log.Info("modify desiredPhase", "desiredPhase", desiredPhase)
128 if ctx.obj.GetStatus().Experiment.DesiredPhase != desiredPhase {
129 for _, ev := range events {
130 ctx.Recorder.Event(ctx.obj, ev)
131 }
132
133 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
134 obj := ctx.Object.DeepCopyObject().(v1alpha1.InnerObject)
135
136 if err := ctx.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
137 ctx.Log.Error(err, "unable to get chaos")
138 return err
139 }
140
141 if obj.GetStatus().Experiment.DesiredPhase != desiredPhase {
142 obj.GetStatus().Experiment.DesiredPhase = desiredPhase
143 ctx.Log.Info("update object", "namespace", obj.GetObjectMeta().GetNamespace(), "name", obj.GetObjectMeta().GetName())
144 return ctx.Client.Update(context.TODO(), obj)
145 }
146
147 return nil
148 })
149 if updateError != nil {
150 ctx.Log.Error(updateError, "fail to update")
151 ctx.Recorder.Event(ctx.obj, recorder.Failed{
152 Activity: "update desiredphase",
153 Err: updateError.Error(),
154 })
155 return ctrl.Result{}, nil
156 }
157
158 ctx.Recorder.Event(ctx.obj, recorder.Updated{
159 Field: "desiredPhase",
160 })
161 }
162 return ctrl.Result{RequeueAfter: ctx.requeueAfter}, nil
163 }
164