...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package gc
17
18 import (
19 "context"
20 "fmt"
21 "reflect"
22 "sort"
23 "time"
24
25 "github.com/go-logr/logr"
26 "go.uber.org/fx"
27 corev1 "k8s.io/api/core/v1"
28 k8sError "k8s.io/apimachinery/pkg/api/errors"
29 "k8s.io/apimachinery/pkg/runtime"
30 ctrl "sigs.k8s.io/controller-runtime"
31 "sigs.k8s.io/controller-runtime/pkg/client"
32
33 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
34 "github.com/chaos-mesh/chaos-mesh/controllers/config"
35 "github.com/chaos-mesh/chaos-mesh/controllers/schedule/utils"
36 "github.com/chaos-mesh/chaos-mesh/controllers/types"
37 "github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
38 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
39 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
40 "github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers"
41 )
42
43 type Reconciler struct {
44 client.Client
45 Log logr.Logger
46 Recorder recorder.ChaosRecorder
47
48 ActiveLister *utils.ActiveLister
49 }
50
51 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
52
53
54 schedule := &v1alpha1.Schedule{}
55 err := r.Get(ctx, req.NamespacedName, schedule)
56 if err != nil {
57 if !k8sError.IsNotFound(err) {
58 r.Log.Error(err, "unable to get schedule chaos")
59 }
60 return ctrl.Result{}, nil
61 }
62
63 list, err := r.ActiveLister.ListActiveJobs(ctx, schedule)
64 if err != nil {
65 r.Recorder.Event(schedule, recorder.Failed{
66 Activity: "list active jobs",
67 Err: err.Error(),
68 })
69 return ctrl.Result{}, nil
70 }
71
72 items := reflect.ValueOf(list).Elem().FieldByName("Items")
73 metaItems := []client.Object{}
74 for i := 0; i < items.Len(); i++ {
75 item := items.Index(i).Addr().Interface().(client.Object)
76 metaItems = append(metaItems, item)
77 }
78
79 sort.Slice(metaItems, func(x, y int) bool {
80 return metaItems[x].GetCreationTimestamp().Time.Before(metaItems[y].GetCreationTimestamp().Time)
81 })
82
83 exceededHistory := len(metaItems) - schedule.Spec.HistoryLimit
84
85 requeuAfter := time.Duration(0)
86 if exceededHistory > 0 {
87 for _, obj := range metaItems[0:exceededHistory] {
88 innerObj, ok := obj.(v1alpha1.InnerObject)
89 if ok {
90 finished, untilStop := controller.IsChaosFinishedWithUntilStop(innerObj, time.Now())
91
92 if !finished {
93 if untilStop != 0 {
94 if requeuAfter == 0 || requeuAfter > untilStop {
95 requeuAfter = untilStop
96 }
97 continue
98 }
99
100
101 r.Log.Info("untilStop is 0 when the chaos has not finished")
102 }
103 } else {
104 if schedule.Spec.Type == v1alpha1.ScheduleTypeWorkflow {
105 workflow, ok := obj.(*v1alpha1.Workflow)
106 if ok {
107 finished := controllers.WorkflowConditionEqualsTo(workflow.Status, v1alpha1.WorkflowConditionAccomplished, corev1.ConditionTrue)
108
109 if !finished {
110 continue
111 }
112 }
113 }
114 }
115 err := r.Client.Delete(ctx, obj)
116 if err != nil && !k8sError.IsNotFound(err) {
117 r.Recorder.Event(schedule, recorder.Failed{
118 Activity: fmt.Sprintf("delete %s/%s", obj.GetNamespace(), obj.GetName()),
119 Err: err.Error(),
120 })
121 }
122 }
123 }
124
125 return ctrl.Result{
126 RequeueAfter: requeuAfter,
127 }, nil
128 }
129
130 type Objs struct {
131 fx.In
132
133 ScheduleObjs []types.Object `group:"schedule"`
134 Objs []types.Object `group:"objs"`
135 }
136
137 const controllerName = "schedule-gc"
138
139 func Bootstrap(mgr ctrl.Manager, client client.Client, log logr.Logger, objs Objs, scheme *runtime.Scheme, lister *utils.ActiveLister, recorderBuilder *recorder.RecorderBuilder) error {
140 if !config.ShouldSpawnController(controllerName) {
141 return nil
142 }
143 builder := builder.Default(mgr).
144 For(&v1alpha1.Schedule{}).
145 Named(controllerName)
146
147 for _, obj := range objs.Objs {
148
149 builder.Owns(obj.Object)
150 }
151
152 builder = builder.Owns(&v1alpha1.Workflow{})
153
154 return builder.Complete(&Reconciler{
155 client,
156 log.WithName(controllerName),
157 recorderBuilder.Build(controllerName),
158 lister,
159 })
160 }
161