1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21 "time"
22
23 "github.com/go-logr/logr"
24 "github.com/pkg/errors"
25 corev1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/types"
28 "k8s.io/client-go/util/retry"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30 "sigs.k8s.io/controller-runtime/pkg/reconcile"
31
32 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
33 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
34 )
35
36 type StatusCheckReconciler struct {
37 kubeClient client.Client
38 eventRecorder recorder.ChaosRecorder
39 logger logr.Logger
40 }
41
42 func NewStatusCheckReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *StatusCheckReconciler {
43 return &StatusCheckReconciler{kubeClient: kubeClient, eventRecorder: eventRecorder, logger: logger}
44 }
45
46 func (it *StatusCheckReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
47 startTime := time.Now()
48 defer func() {
49 it.logger.V(4).Info("finished syncing for status check node",
50 "node", request.NamespacedName,
51 "duration", time.Since(startTime),
52 )
53 }()
54
55 node := v1alpha1.WorkflowNode{}
56 err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
57 if err != nil {
58 return reconcile.Result{}, client.IgnoreNotFound(err)
59 }
60 if node.Spec.Type != v1alpha1.TypeStatusCheck {
61 return reconcile.Result{}, nil
62 }
63
64 it.logger.V(4).Info("resolve status check node", "node", request)
65 if err := it.syncStatusCheck(ctx, request, node); err != nil {
66 return reconcile.Result{}, errors.Wrap(err, "sync status check")
67 }
68
69 updateError := retry.RetryOnConflict(retry.DefaultRetry, it.updateNodeStatus(ctx, request))
70
71 return reconcile.Result{}, updateError
72 }
73
74 func (it *StatusCheckReconciler) syncStatusCheck(ctx context.Context, request reconcile.Request, node v1alpha1.WorkflowNode) error {
75 statusChecks, err := it.fetchChildrenStatusCheck(ctx, node)
76 if err != nil {
77 return errors.Wrap(err, "fetch children status check")
78 }
79
80 if WorkflowNodeFinished(node.Status) {
81 for _, item := range statusChecks {
82
83 item := item
84 err := it.kubeClient.Delete(ctx, &item)
85 if client.IgnoreNotFound(err) != nil {
86 it.logger.Error(err, "failed to delete StatusCheck for workflow status check node",
87 "namespace", node.Namespace,
88 "nodeName", node.Name,
89 "statusCheckName", item.GetName(),
90 )
91 it.eventRecorder.Event(&node, recorder.StatusCheckDeletedFailed{Name: item.GetName()})
92 } else {
93 it.eventRecorder.Event(&node, recorder.StatusCheckDeleted{Name: item.GetName()})
94 }
95 }
96 return nil
97 }
98
99 if len(statusChecks) == 0 {
100 parentWorkflow, err := getParentWorkflow(ctx, it.kubeClient, node)
101 if err != nil {
102 return errors.WithStack(err)
103 }
104 spawnedStatusCheck, err := it.spawnStatusCheck(ctx, &node, parentWorkflow)
105 if err != nil {
106 it.eventRecorder.Event(&node, recorder.StatusCheckCreatedFailed{Name: spawnedStatusCheck.GetName()})
107 return errors.Wrap(err, "spawn status check")
108 }
109 it.eventRecorder.Event(&node, recorder.StatusCheckCreated{Name: spawnedStatusCheck.GetName()})
110 } else if len(statusChecks) > 1 {
111 var statusCheckToRemove []string
112 for _, item := range statusChecks[1:] {
113 statusCheckToRemove = append(statusCheckToRemove, item.GetName())
114 }
115 it.logger.Info("removing duplicated StatusCheck",
116 "node", request,
117 "statusCheckToRemove", statusCheckToRemove)
118
119 for _, item := range statusChecks[1:] {
120
121 item := item
122 err := it.kubeClient.Delete(ctx, &item)
123 if client.IgnoreNotFound(err) != nil {
124 it.logger.Error(err, "failed to delete StatusCheck for workflow status check node",
125 "namespace", node.Namespace,
126 "node", node.Name,
127 "statusCheck", item.GetName(),
128 )
129 }
130 }
131 } else {
132 it.logger.V(4).Info("do not need spawn or remove StatusCheck")
133 }
134
135 return nil
136 }
137
138 func (it *StatusCheckReconciler) updateNodeStatus(ctx context.Context, request reconcile.Request) func() error {
139 return func() error {
140 node := v1alpha1.WorkflowNode{}
141 if err := it.kubeClient.Get(ctx, request.NamespacedName, &node); err != nil {
142 return client.IgnoreNotFound(err)
143 }
144
145 statusChecks, err := it.fetchChildrenStatusCheck(ctx, node)
146 if err != nil {
147 return client.IgnoreNotFound(err)
148 }
149 if len(statusChecks) > 1 {
150 it.logger.Info("the number of StatusCheck affected by status check node is more than 1",
151 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
152 "statusCheck", statusChecks,
153 )
154 } else if len(statusChecks) == 0 {
155 it.logger.Info("the number of StatusCheck affected by status check node is 0",
156 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
157 )
158 return nil
159 }
160
161 statusCheck := statusChecks[0]
162 if statusCheck.IsCompleted() {
163 SetCondition(&node.Status, v1alpha1.WorkflowNodeCondition{
164 Type: v1alpha1.ConditionAccomplished,
165 Status: corev1.ConditionTrue,
166 Reason: v1alpha1.StatusCheckCompleted,
167 })
168 } else {
169 SetCondition(&node.Status, v1alpha1.WorkflowNodeCondition{
170 Type: v1alpha1.ConditionAccomplished,
171 Status: corev1.ConditionFalse,
172 Reason: "",
173 })
174 }
175
176 if node.Spec.AbortWithStatusCheck && needToAbort(statusCheck) {
177 SetCondition(&node.Status, v1alpha1.WorkflowNodeCondition{
178 Type: v1alpha1.ConditionAborted,
179 Status: corev1.ConditionTrue,
180 Reason: v1alpha1.StatusCheckNotExceedSuccessThreshold,
181 })
182 } else {
183 SetCondition(&node.Status, v1alpha1.WorkflowNodeCondition{
184 Type: v1alpha1.ConditionAborted,
185 Status: corev1.ConditionFalse,
186 Reason: "",
187 })
188 }
189
190 return client.IgnoreNotFound(it.kubeClient.Status().Update(ctx, &node))
191 }
192 }
193
194 func (it *StatusCheckReconciler) fetchChildrenStatusCheck(ctx context.Context, node v1alpha1.WorkflowNode) ([]v1alpha1.StatusCheck, error) {
195 controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
196 MatchLabels: map[string]string{
197 v1alpha1.LabelControlledBy: node.Name,
198 },
199 })
200 if err != nil {
201 return nil, errors.Wrap(err, "build label selector")
202 }
203
204 var childStatusChecks v1alpha1.StatusCheckList
205 if err = it.kubeClient.List(ctx, &childStatusChecks, &client.ListOptions{LabelSelector: controlledByThisNode}); err != nil {
206 return nil, errors.Wrap(err, "list child status checks")
207 }
208 return childStatusChecks.Items, nil
209 }
210
211 func (it *StatusCheckReconciler) spawnStatusCheck(ctx context.Context, node *v1alpha1.WorkflowNode, workflow *v1alpha1.Workflow) (*v1alpha1.StatusCheck, error) {
212 if node.Spec.StatusCheck == nil {
213 return nil, errors.Errorf("node %s/%s does not contains spec of Target", node.Namespace, node.Name)
214 }
215 statusCheckSpec := node.Spec.StatusCheck.DeepCopy()
216 statusCheck := v1alpha1.StatusCheck{
217 ObjectMeta: metav1.ObjectMeta{
218 GenerateName: fmt.Sprintf("%s-", node.Name),
219 Namespace: node.Namespace,
220 Labels: map[string]string{
221 v1alpha1.LabelControlledBy: node.Name,
222 v1alpha1.LabelWorkflow: workflow.Name,
223 },
224 OwnerReferences: []metav1.OwnerReference{
225 {
226 APIVersion: ApiVersion,
227 Kind: KindWorkflowNode,
228 Name: node.Name,
229 UID: node.UID,
230 Controller: &isController,
231 BlockOwnerDeletion: &blockOwnerDeletion,
232 },
233 },
234 Finalizers: []string{metav1.FinalizerDeleteDependents},
235 },
236 Spec: *statusCheckSpec,
237 }
238 if err := it.kubeClient.Create(ctx, &statusCheck); err != nil {
239 return nil, errors.Wrap(err, "create status check")
240 }
241 return &statusCheck, nil
242 }
243
244 func getParentWorkflow(ctx context.Context, kubeClient client.Client, node v1alpha1.WorkflowNode) (*v1alpha1.Workflow, error) {
245 workflowName, ok := node.Labels[v1alpha1.LabelWorkflow]
246 if !ok {
247 return nil, errors.Errorf("node %s/%s does not contains label %s", node.Namespace, node.Name, v1alpha1.LabelWorkflow)
248 }
249 parentWorkflow := v1alpha1.Workflow{}
250 if err := kubeClient.Get(ctx, types.NamespacedName{
251 Namespace: node.Namespace,
252 Name: workflowName,
253 }, &parentWorkflow); err != nil {
254 return nil, errors.Wrap(err, "get parent workflow")
255 }
256 return &parentWorkflow, nil
257 }
258
259 func needToAbort(statusCheck v1alpha1.StatusCheck) bool {
260 if !statusCheck.IsCompleted() {
261 return false
262 }
263 for _, condition := range statusCheck.Status.Conditions {
264 if condition.Type == v1alpha1.StatusCheckConditionSuccessThresholdExceed &&
265 condition.Status != corev1.ConditionTrue {
266 return true
267 }
268 }
269 return false
270 }
271