1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package cron
17
18 import (
19 "context"
20 "reflect"
21 "time"
22
23 "github.com/go-logr/logr"
24 corev1 "k8s.io/api/core/v1"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apiserver/pkg/storage/names"
27 "k8s.io/client-go/util/retry"
28 ctrl "sigs.k8s.io/controller-runtime"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 "github.com/chaos-mesh/chaos-mesh/controllers/config"
33 "github.com/chaos-mesh/chaos-mesh/controllers/schedule/utils"
34 "github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
35 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
36 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
37 "github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers"
38 )
39
40 type Reconciler struct {
41 client.Client
42 Log logr.Logger
43 ActiveLister *utils.ActiveLister
44
45 Recorder recorder.ChaosRecorder
46 }
47
48 var t = true
49
50 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
51 schedule := &v1alpha1.Schedule{}
52 err := r.Get(ctx, req.NamespacedName, schedule)
53 if err != nil {
54 r.Log.Error(err, "unable to get chaos")
55 return ctrl.Result{}, nil
56 }
57
58 if schedule.IsPaused() {
59 r.Log.Info("not starting chaos as it is paused")
60 return ctrl.Result{}, nil
61 }
62
63 now := time.Now()
64 shouldSpawn := false
65 r.Log.Info("calculate schedule time", "schedule", schedule.Spec.Schedule, "lastScheduleTime", schedule.Status.LastScheduleTime, "now", now)
66 missedRun, nextRun, err := getRecentUnmetScheduleTime(schedule, now)
67 if err != nil {
68 r.Recorder.Event(schedule, recorder.Failed{
69 Activity: "get run time",
70 Err: err.Error(),
71 })
72 return ctrl.Result{}, nil
73 }
74 if missedRun == nil {
75 r.Log.Info("requeue later to reconcile the schedule", "requeue-after", nextRun.Sub(now))
76 return ctrl.Result{RequeueAfter: nextRun.Sub(now)}, nil
77 }
78
79 if schedule.Spec.StartingDeadlineSeconds != nil {
80 if missedRun.Add(time.Second * time.Duration(*schedule.Spec.StartingDeadlineSeconds)).Before(now) {
81 r.Recorder.Event(schedule, recorder.MissedSchedule{
82 MissedRun: *missedRun,
83 })
84 return ctrl.Result{}, nil
85 }
86 }
87
88 r.Log.Info("schedule to spawn new chaos", "missedRun", missedRun, "nextRun", nextRun)
89 shouldSpawn = true
90
91 if shouldSpawn && schedule.Spec.ConcurrencyPolicy.IsForbid() {
92 list, err := r.ActiveLister.ListActiveJobs(ctx, schedule)
93 if err != nil {
94 r.Recorder.Event(schedule, recorder.Failed{
95 Activity: "list active jobs",
96 Err: err.Error(),
97 })
98 return ctrl.Result{}, nil
99 }
100
101 items := reflect.ValueOf(list).Elem().FieldByName("Items")
102 for i := 0; i < items.Len(); i++ {
103 if schedule.Spec.Type != v1alpha1.ScheduleTypeWorkflow {
104 item := items.Index(i).Addr().Interface().(v1alpha1.InnerObject)
105 if !controller.IsChaosFinished(item, now) {
106 shouldSpawn = false
107 r.Recorder.Event(schedule, recorder.ScheduleForbid{
108 RunningName: item.GetName(),
109 })
110 r.Log.Info("forbid to spawn new chaos", "running", item.GetName())
111 break
112 }
113 } else {
114 workflow := items.Index(i).Addr().Interface().(*v1alpha1.Workflow)
115 if !controllers.WorkflowConditionEqualsTo(workflow.Status, v1alpha1.WorkflowConditionAccomplished, corev1.ConditionTrue) {
116 shouldSpawn = false
117 r.Recorder.Event(schedule, recorder.ScheduleForbid{
118 RunningName: workflow.GetObjectMeta().Name,
119 })
120 r.Log.Info("forbid to spawn new workflow", "running", workflow.GetName())
121 break
122 }
123 }
124 }
125 }
126
127 if shouldSpawn {
128 newObj, err := schedule.Spec.ScheduleItem.SpawnNewObject(schedule.Spec.Type)
129 if err != nil {
130 r.Recorder.Event(schedule, recorder.Failed{
131 Activity: "generate new object",
132 Err: err.Error(),
133 })
134 return ctrl.Result{}, nil
135 }
136
137 newObj.SetOwnerReferences([]metav1.OwnerReference{
138 {
139 APIVersion: schedule.APIVersion,
140 Kind: schedule.Kind,
141 Name: schedule.Name,
142 UID: schedule.UID,
143 Controller: &t,
144 BlockOwnerDeletion: &t,
145 },
146 })
147 newObj.SetLabels(map[string]string{
148 v1alpha1.LabelManagedBy: schedule.Name,
149 })
150 newObj.SetNamespace(schedule.Namespace)
151 newObj.SetName(names.SimpleNameGenerator.GenerateName(schedule.Name + "-"))
152
153 err = r.Create(ctx, newObj)
154 if err != nil {
155 r.Recorder.Event(schedule, recorder.Failed{
156 Activity: "create new object",
157 Err: err.Error(),
158 })
159 r.Log.Error(err, "fail to create new object", "obj", newObj)
160 return ctrl.Result{}, nil
161 }
162 r.Recorder.Event(schedule, recorder.ScheduleSpawn{
163 Name: newObj.GetName(),
164 })
165 r.Log.Info("create new object", "namespace", newObj.GetNamespace(), "name", newObj.GetName())
166
167 lastScheduleTime := now
168 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
169 r.Log.Info("updating lastScheduleTime", "time", lastScheduleTime)
170 schedule = schedule.DeepCopyObject().(*v1alpha1.Schedule)
171
172 if err := r.Client.Get(ctx, req.NamespacedName, schedule); err != nil {
173 r.Log.Error(err, "unable to get schedule")
174 return err
175 }
176
177 schedule.Status.LastScheduleTime.Time = lastScheduleTime
178 return r.Client.Update(ctx, schedule)
179 })
180 if updateError != nil {
181 r.Log.Error(updateError, "fail to update")
182 r.Recorder.Event(schedule, recorder.Failed{
183 Activity: "update lastScheduleTime",
184 Err: updateError.Error(),
185 })
186 return ctrl.Result{}, nil
187 }
188
189 r.Recorder.Event(schedule, recorder.Updated{
190 Field: "lastScheduleTime",
191 })
192 }
193
194 return ctrl.Result{}, nil
195 }
196
197 const controllerName = "schedule-cron"
198
199 func Bootstrap(mgr ctrl.Manager, client client.Client, log logr.Logger, lister *utils.ActiveLister, recorderBuilder *recorder.RecorderBuilder) error {
200 if !config.ShouldSpawnController(controllerName) {
201 return nil
202 }
203
204 return builder.Default(mgr).
205 For(&v1alpha1.Schedule{}).
206 Named(controllerName).
207 Complete(&Reconciler{
208 client,
209 log.WithName(controllerName),
210 lister,
211 recorderBuilder.Build(controllerName),
212 })
213 }
214