1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package active
17
18 import (
19 "context"
20 "reflect"
21 "sort"
22
23 "github.com/go-logr/logr"
24 "go.uber.org/fx"
25 v1 "k8s.io/api/core/v1"
26 k8sError "k8s.io/apimachinery/pkg/api/errors"
27 "k8s.io/apimachinery/pkg/runtime"
28 "k8s.io/client-go/tools/reference"
29 "k8s.io/client-go/util/retry"
30 ctrl "sigs.k8s.io/controller-runtime"
31 "sigs.k8s.io/controller-runtime/pkg/client"
32
33 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
34 "github.com/chaos-mesh/chaos-mesh/controllers/config"
35 "github.com/chaos-mesh/chaos-mesh/controllers/schedule/utils"
36 "github.com/chaos-mesh/chaos-mesh/controllers/types"
37 "github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
38 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
39 )
40
41 type Reconciler struct {
42 scheme *runtime.Scheme
43
44 client.Client
45 Log logr.Logger
46
47 ActiveLister *utils.ActiveLister
48
49 Recorder recorder.ChaosRecorder
50 }
51
52 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
53 schedule := &v1alpha1.Schedule{}
54 err := r.Get(ctx, req.NamespacedName, schedule)
55 if err != nil {
56 if !k8sError.IsNotFound(err) {
57 r.Log.Error(err, "unable to get chaos")
58 }
59 return ctrl.Result{}, nil
60 }
61
62 list, err := r.ActiveLister.ListActiveJobs(ctx, schedule)
63 if err != nil {
64 r.Recorder.Event(schedule, recorder.Failed{
65 Activity: "list active jobs",
66 Err: err.Error(),
67 })
68 return ctrl.Result{}, nil
69 }
70
71 active := []v1.ObjectReference{}
72 items := reflect.ValueOf(list).Elem().FieldByName("Items")
73 for i := 0; i < items.Len(); i++ {
74 item := items.Index(i).Addr().Interface().(runtime.Object)
75
76 ref, err := reference.GetReference(r.scheme, item)
77 if err != nil {
78 r.Log.Error(err, "fail to get reference")
79 r.Recorder.Event(schedule, recorder.Failed{
80 Activity: "get reference from object",
81 Err: err.Error(),
82 })
83 return ctrl.Result{}, nil
84 }
85
86 active = append(active, *ref)
87 }
88 sort.Slice(active, func(i, j int) bool {
89 return active[i].Name < active[j].Name
90 })
91 if reflect.DeepEqual(active, schedule.Status.Active) {
92 r.Log.Info("don't need to update active")
93 return ctrl.Result{}, nil
94 }
95
96 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
97 r.Log.Info("updating active", "active", active)
98 schedule = schedule.DeepCopyObject().(*v1alpha1.Schedule)
99
100 if err := r.Client.Get(ctx, req.NamespacedName, schedule); err != nil {
101 r.Log.Error(err, "unable to get schedule")
102 return err
103 }
104
105 schedule.Status.Active = active
106 return r.Client.Update(ctx, schedule)
107 })
108 if updateError != nil {
109 r.Log.Error(updateError, "fail to update")
110 r.Recorder.Event(schedule, recorder.Failed{
111 Activity: "update active",
112 Err: updateError.Error(),
113 })
114 return ctrl.Result{}, nil
115 }
116
117 return ctrl.Result{}, nil
118 }
119
120 type Objs struct {
121 fx.In
122
123 Objs []types.Object `group:"objs"`
124 }
125
126 const controllerName = "schedule-active"
127
128 func Bootstrap(mgr ctrl.Manager, client client.Client, log logr.Logger, objs Objs, scheme *runtime.Scheme, lister *utils.ActiveLister, recorderBuilder *recorder.RecorderBuilder) error {
129 if !config.ShouldSpawnController(controllerName) {
130 return nil
131 }
132 builder := builder.Default(mgr).
133 For(&v1alpha1.Schedule{}).
134 Named(controllerName)
135
136 for _, obj := range objs.Objs {
137
138 builder = builder.Owns(obj.Object)
139 }
140 builder = builder.Owns(&v1alpha1.Workflow{})
141
142 return builder.Complete(&Reconciler{
143 scheme,
144 client,
145 log.WithName(controllerName),
146 lister,
147 recorderBuilder.Build(controllerName),
148 })
149 }
150