...
1
2
3
4
5
6
7
8
9
10
11
12
13
14 package pause
15
16 import (
17 "context"
18 "fmt"
19 "reflect"
20 "strconv"
21
22 "github.com/go-logr/logr"
23 k8sError "k8s.io/apimachinery/pkg/api/errors"
24 k8sTypes "k8s.io/apimachinery/pkg/types"
25 "k8s.io/client-go/util/retry"
26 ctrl "sigs.k8s.io/controller-runtime"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 "github.com/chaos-mesh/chaos-mesh/controllers/schedule/utils"
31 "github.com/chaos-mesh/chaos-mesh/controllers/types"
32 "github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
33 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
34 )
35
36 type Reconciler struct {
37 client.Client
38 Log logr.Logger
39 ActiveLister *utils.ActiveLister
40
41 Recorder recorder.ChaosRecorder
42 }
43
44 func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
45 ctx := context.Background()
46
47 schedule := &v1alpha1.Schedule{}
48 err := r.Get(ctx, req.NamespacedName, schedule)
49 if err != nil {
50 if !k8sError.IsNotFound(err) {
51 r.Log.Error(err, "unable to get chaos")
52 }
53 return ctrl.Result{}, nil
54 }
55
56 if schedule.Spec.Type == v1alpha1.ScheduleTypeWorkflow {
57 if schedule.IsPaused() {
58 r.Recorder.Event(schedule, recorder.NotSupported{
59 Activity: "pausing a workflow schedule",
60 })
61 }
62 return ctrl.Result{}, nil
63 }
64
65 list, err := r.ActiveLister.ListActiveJobs(ctx, schedule)
66 if err != nil {
67 r.Recorder.Event(schedule, recorder.Failed{
68 Activity: "list active jobs",
69 Err: err.Error(),
70 })
71 return ctrl.Result{}, nil
72 }
73
74 items := reflect.ValueOf(list).Elem().FieldByName("Items")
75 for i := 0; i < items.Len(); i++ {
76 item := items.Index(i).Addr().Interface().(v1alpha1.InnerObject)
77 if item.IsPaused() != schedule.IsPaused() {
78 key := k8sTypes.NamespacedName{
79 Namespace: item.GetObjectMeta().GetNamespace(),
80 Name: item.GetObjectMeta().GetName(),
81 }
82 pause := strconv.FormatBool(schedule.IsPaused())
83
84 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
85 r.Log.Info("updating object", "pause", schedule.IsPaused())
86
87 if err := r.Client.Get(ctx, key, item); err != nil {
88 r.Log.Error(err, "unable to get schedule")
89 return err
90 }
91 if item.GetObjectMeta().Annotations == nil {
92 item.GetObjectMeta().Annotations = make(map[string]string)
93 }
94 item.GetObjectMeta().Annotations[v1alpha1.PauseAnnotationKey] = pause
95
96 return r.Client.Update(ctx, item)
97 })
98 if updateError != nil {
99 r.Log.Error(updateError, "fail to update")
100 r.Recorder.Event(schedule, recorder.Failed{
101 Activity: fmt.Sprintf("set pause to %s for %s", pause, key),
102 Err: updateError.Error(),
103 })
104 return ctrl.Result{}, nil
105 }
106 }
107 }
108
109 return ctrl.Result{}, nil
110 }
111
112 func NewController(mgr ctrl.Manager, client client.Client, log logr.Logger, lister *utils.ActiveLister, recorderBuilder *recorder.RecorderBuilder) (types.Controller, error) {
113 builder.Default(mgr).
114 For(&v1alpha1.Schedule{}).
115 Named("schedule-pause").
116 Complete(&Reconciler{
117 client,
118 log.WithName("schedule-pause"),
119 lister,
120 recorderBuilder.Build("schedule-pause"),
121 })
122 return "schedule-pause", nil
123 }
124