1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package statuscheck
17
18 import (
19 "context"
20 "fmt"
21 "time"
22
23 "github.com/go-logr/logr"
24 "github.com/pkg/errors"
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/client-go/util/retry"
28 ctrl "sigs.k8s.io/controller-runtime"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
33 )
34
35 type Reconciler struct {
36 logger logr.Logger
37 kubeClient client.Client
38 eventRecorder recorder.ChaosRecorder
39 manager Manager
40 }
41
42 func NewReconciler(logger logr.Logger, kubeClient client.Client, eventRecorder recorder.ChaosRecorder, manager Manager) *Reconciler {
43 return &Reconciler{
44 kubeClient: kubeClient,
45 eventRecorder: eventRecorder,
46 logger: logger,
47 manager: manager,
48 }
49 }
50
51 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
52 startTime := time.Now()
53 obj := &v1alpha1.StatusCheck{}
54 if err := r.kubeClient.Get(ctx, req.NamespacedName, obj); err != nil {
55 if apierrors.IsNotFound(err) {
56 r.logger.Info("status check is deleted", "statuscheck", req.NamespacedName)
57
58 r.manager.Delete(req.NamespacedName)
59 return ctrl.Result{}, nil
60 }
61 return ctrl.Result{}, errors.Wrapf(err, "get status check '%s'", req.NamespacedName.String())
62 }
63
64
65 if obj.IsCompleted() {
66 r.logger.V(1).Info("status check is already completed", "statuscheck", req.NamespacedName)
67 return ctrl.Result{}, nil
68 }
69
70 result, ok := r.manager.Get(*obj)
71 if !ok {
72
73 err := r.manager.Add(*obj)
74 if err != nil {
75 return ctrl.Result{}, errors.Wrapf(err, "add status check '%s' to manager", req.NamespacedName.String())
76 }
77 result, _ = r.manager.Get(*obj)
78 }
79
80 updateError := retry.RetryOnConflict(retry.DefaultBackoff, r.updateStatus(ctx, req, result, startTime))
81
82 return ctrl.Result{RequeueAfter: time.Duration(obj.Spec.IntervalSeconds) * time.Second}, client.IgnoreNotFound(updateError)
83 }
84
85 func (r *Reconciler) updateStatus(ctx context.Context, req ctrl.Request, result Result, startTime time.Time) func() error {
86 return func() error {
87 statusCheck := &v1alpha1.StatusCheck{}
88 if err := r.kubeClient.Get(ctx, req.NamespacedName, statusCheck); err != nil {
89 return errors.Wrapf(err, "get status check '%s'", req.NamespacedName.String())
90 }
91
92 if statusCheck.Status.StartTime == nil {
93 statusCheck.Status.StartTime = &metav1.Time{Time: startTime}
94 }
95 statusCheck.Status.Count = result.Count
96 statusCheck.Status.Records = result.Records
97
98 conditions, err := r.generateConditions(*statusCheck)
99 if err != nil {
100 return errors.Wrapf(err, "generate conditions for status check '%s'", req.NamespacedName.String())
101 }
102
103 if conditions.isCompleted() {
104 if statusCheck.Status.CompletionTime == nil {
105 statusCheck.Status.CompletionTime = &metav1.Time{Time: time.Now()}
106 }
107 r.manager.Complete(*statusCheck)
108 r.eventRecorder.Event(statusCheck, recorder.StatusCheckCompleted{Msg: conditions[v1alpha1.StatusCheckConditionCompleted].Reason})
109 r.logger.Info("status check is completed", "statuscheck", req.NamespacedName)
110 }
111 if conditions.isDurationExceed() {
112 r.eventRecorder.Event(statusCheck, recorder.StatusCheckDurationExceed{})
113 }
114 if conditions.isSuccessThresholdExceed() {
115 r.eventRecorder.Event(statusCheck, recorder.StatusCheckSuccessThresholdExceed{})
116 }
117 if conditions.isFailureThresholdExceed() {
118 r.eventRecorder.Event(statusCheck, recorder.StatusCheckFailureThresholdExceed{})
119 }
120
121 statusCheck.Status.Conditions = toConditionList(conditions)
122
123 r.logger.V(1).Info("update status of status check", "statuscheck", req.NamespacedName)
124 return r.kubeClient.Status().Update(ctx, statusCheck)
125 }
126 }
127
128 func (r *Reconciler) generateConditions(statusCheck v1alpha1.StatusCheck) (conditionMap, error) {
129 conditions := toConditionMap(statusCheck.Status.Conditions)
130
131 if err := setDurationExceedCondition(statusCheck, conditions); err != nil {
132 return nil, errors.Wrapf(err, "set duration exceed condition for status check '%s'", fmt.Sprintf("%s/%s", statusCheck.Namespace, statusCheck.Name))
133 }
134 setFailureThresholdExceedCondition(statusCheck, conditions)
135 setSuccessThresholdExceedCondition(statusCheck, conditions)
136
137
138 setCompletedCondition(statusCheck, conditions)
139
140 return conditions, nil
141 }
142