...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package pause
17
18 import (
19 "context"
20 "fmt"
21 "reflect"
22 "strconv"
23
24 "github.com/go-logr/logr"
25 k8sError "k8s.io/apimachinery/pkg/api/errors"
26 k8sTypes "k8s.io/apimachinery/pkg/types"
27 "k8s.io/client-go/util/retry"
28 ctrl "sigs.k8s.io/controller-runtime"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 "github.com/chaos-mesh/chaos-mesh/controllers/config"
33 "github.com/chaos-mesh/chaos-mesh/controllers/schedule/utils"
34 "github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
35 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
36 )
37
38 type Reconciler struct {
39 client.Client
40 Log logr.Logger
41 ActiveLister *utils.ActiveLister
42
43 Recorder recorder.ChaosRecorder
44 }
45
46 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
47 schedule := &v1alpha1.Schedule{}
48 err := r.Get(ctx, req.NamespacedName, schedule)
49 if err != nil {
50 if !k8sError.IsNotFound(err) {
51 r.Log.Error(err, "unable to get chaos")
52 }
53 return ctrl.Result{}, nil
54 }
55
56 if schedule.Spec.Type == v1alpha1.ScheduleTypeWorkflow {
57 if schedule.IsPaused() {
58 r.Recorder.Event(schedule, recorder.NotSupported{
59 Activity: "pausing a workflow schedule",
60 })
61 }
62 return ctrl.Result{}, nil
63 }
64
65 list, err := r.ActiveLister.ListActiveJobs(ctx, schedule)
66 if err != nil {
67 r.Recorder.Event(schedule, recorder.Failed{
68 Activity: "list active jobs",
69 Err: err.Error(),
70 })
71 return ctrl.Result{}, nil
72 }
73
74 items := reflect.ValueOf(list).Elem().FieldByName("Items")
75 for i := 0; i < items.Len(); i++ {
76 item := items.Index(i).Addr().Interface().(v1alpha1.InnerObject)
77 if item.IsPaused() != schedule.IsPaused() {
78 key := k8sTypes.NamespacedName{
79 Namespace: item.GetNamespace(),
80 Name: item.GetName(),
81 }
82 pause := strconv.FormatBool(schedule.IsPaused())
83
84 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
85 r.Log.Info("updating object", "pause", schedule.IsPaused())
86
87 if err := r.Client.Get(ctx, key, item); err != nil {
88 r.Log.Error(err, "unable to get schedule")
89 return err
90 }
91 annotations := item.GetAnnotations()
92 if annotations == nil {
93 annotations = make(map[string]string)
94 }
95 annotations[v1alpha1.PauseAnnotationKey] = pause
96 item.SetAnnotations(annotations)
97
98 return r.Client.Update(ctx, item)
99 })
100 if updateError != nil {
101 r.Log.Error(updateError, "fail to update")
102 r.Recorder.Event(schedule, recorder.Failed{
103 Activity: fmt.Sprintf("set pause to %s for %s", pause, key),
104 Err: updateError.Error(),
105 })
106 return ctrl.Result{}, nil
107 }
108 }
109 }
110
111 return ctrl.Result{}, nil
112 }
113
114 const controllerName = "schedule-pause"
115
116 func Bootstrap(mgr ctrl.Manager, client client.Client, log logr.Logger, lister *utils.ActiveLister, recorderBuilder *recorder.RecorderBuilder) error {
117 if !config.ShouldSpawnController(controllerName) {
118 return nil
119 }
120 return builder.Default(mgr).
121 For(&v1alpha1.Schedule{}).
122 Named(controllerName).
123 Complete(&Reconciler{
124 client,
125 log.WithName(controllerName),
126 lister,
127 recorderBuilder.Build(controllerName),
128 })
129 }
130