1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package desiredphase
17
18 import (
19 "context"
20 "time"
21
22 "github.com/go-logr/logr"
23 apierrors "k8s.io/apimachinery/pkg/api/errors"
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/client-go/util/retry"
26 ctrl "sigs.k8s.io/controller-runtime"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
31 )
32
33
34 type Reconciler struct {
35
36 Object v1alpha1.InnerObject
37
38
39 client.Client
40
41 Recorder recorder.ChaosRecorder
42 Log logr.Logger
43 }
44
45
46 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
47 obj := r.Object.DeepCopyObject().(v1alpha1.InnerObject)
48
49 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
50 if apierrors.IsNotFound(err) {
51 r.Log.Info("chaos not found")
52 } else {
53
54 r.Log.Error(err, "unable to get chaos")
55 }
56 return ctrl.Result{}, nil
57 }
58
59 reconcileInfo := &reconcileInfo{
60 obj: obj,
61 Reconciler: r,
62 shouldUpdate: false,
63 }
64 return reconcileInfo.Reconcile(req)
65 }
66
67 type reconcileInfo struct {
68 obj v1alpha1.InnerObject
69
70 *Reconciler
71 shouldUpdate bool
72 requeueAfter time.Duration
73 }
74
75 func (info *reconcileInfo) GetCreationTimestamp() metav1.Time {
76 return info.obj.GetCreationTimestamp()
77 }
78
79 func (info *reconcileInfo) CalcDesiredPhase() (v1alpha1.DesiredPhase, []recorder.ChaosEvent) {
80 events := []recorder.ChaosEvent{}
81
82
83 if info.obj.IsDeleted() {
84 if info.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.StoppedPhase {
85 events = append(events, recorder.Deleted{})
86 }
87 return v1alpha1.StoppedPhase, events
88 }
89
90 if info.obj.IsOneShot() {
91
92
93 return v1alpha1.RunningPhase, events
94 }
95
96
97 now := time.Now()
98
99 durationExceeded, untilStop, err := info.obj.DurationExceeded(now)
100 if err != nil {
101 info.Log.Error(err, "failed to parse duration")
102 }
103 if durationExceeded {
104 if info.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.StoppedPhase {
105 events = append(events, recorder.TimeUp{})
106 }
107 return v1alpha1.StoppedPhase, events
108 }
109
110 info.requeueAfter = untilStop
111
112
113 if info.obj.IsPaused() {
114 if info.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.StoppedPhase {
115 events = append(events, recorder.Paused{})
116 }
117 return v1alpha1.StoppedPhase, events
118 }
119
120 if info.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.RunningPhase {
121 events = append(events, recorder.Started{})
122 }
123 return v1alpha1.RunningPhase, events
124 }
125
126 func (info *reconcileInfo) Reconcile(req ctrl.Request) (ctrl.Result, error) {
127 desiredPhase, events := info.CalcDesiredPhase()
128
129 info.Log.Info("modify desiredPhase", "desiredPhase", desiredPhase)
130 if info.obj.GetStatus().Experiment.DesiredPhase != desiredPhase {
131 for _, ev := range events {
132 info.Recorder.Event(info.obj, ev)
133 }
134
135 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
136 obj := info.Object.DeepCopyObject().(v1alpha1.InnerObject)
137
138 if err := info.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
139 info.Log.Error(err, "unable to get chaos")
140 return err
141 }
142
143 if obj.GetStatus().Experiment.DesiredPhase != desiredPhase {
144 obj.GetStatus().Experiment.DesiredPhase = desiredPhase
145 info.Log.Info("update object", "namespace", obj.GetNamespace(), "name", obj.GetName())
146 return info.Client.Update(context.TODO(), obj)
147 }
148
149 return nil
150 })
151 if updateError != nil {
152 info.Log.Error(updateError, "fail to update")
153 info.Recorder.Event(info.obj, recorder.Failed{
154 Activity: "update desiredphase",
155 Err: updateError.Error(),
156 })
157 return ctrl.Result{}, nil
158 }
159
160 info.Recorder.Event(info.obj, recorder.Updated{
161 Field: "desiredPhase",
162 })
163 }
164 return ctrl.Result{RequeueAfter: info.requeueAfter}, nil
165 }
166