...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20
21 "github.com/go-logr/logr"
22 "github.com/pkg/errors"
23 corev1 "k8s.io/api/core/v1"
24 "k8s.io/client-go/util/retry"
25 "sigs.k8s.io/controller-runtime/pkg/client"
26 "sigs.k8s.io/controller-runtime/pkg/reconcile"
27
28 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
29 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
30 )
31
32 type AbortWorkflowReconciler struct {
33 *ChildNodesFetcher
34 kubeClient client.Client
35 eventRecorder recorder.ChaosRecorder
36 logger logr.Logger
37 }
38
39 func NewAbortWorkflowReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *AbortWorkflowReconciler {
40 return &AbortWorkflowReconciler{
41 ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
42 kubeClient: kubeClient,
43 eventRecorder: eventRecorder,
44 logger: logger,
45 }
46 }
47
48
49
50 func (it *AbortWorkflowReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
51 workflow := v1alpha1.Workflow{}
52 err := it.kubeClient.Get(ctx, request.NamespacedName, &workflow)
53 if err != nil {
54 return reconcile.Result{}, client.IgnoreNotFound(err)
55 }
56
57 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
58 workflowNeedUpdate := v1alpha1.Workflow{}
59 err := it.kubeClient.Get(ctx, request.NamespacedName, &workflowNeedUpdate)
60 if err != nil {
61 return errors.Wrapf(err, "get workflow")
62 }
63
64 entryNodes, err := fetchEntryNode(ctx, it.kubeClient, workflowNeedUpdate)
65 if err != nil {
66 return errors.Wrapf(err, "fetch entry nodes of workflow")
67 }
68
69 if len(entryNodes) == 0 {
70 it.logger.Info("omit set abort condition, workflow has no entry node", "key", request.NamespacedName)
71 return nil
72 }
73 if len(entryNodes) > 1 {
74 it.logger.Info("there are more than 1 entry nodes of workflow", "key", request.NamespacedName)
75 }
76
77 entryNode := entryNodes[0]
78 if WorkflowAborted(workflowNeedUpdate) {
79 if !ConditionEqualsTo(entryNode.Status, v1alpha1.ConditionAborted, corev1.ConditionTrue) {
80 it.eventRecorder.Event(&entryNode, recorder.WorkflowAborted{WorkflowName: workflow.Name})
81 }
82 SetCondition(&entryNode.Status, v1alpha1.WorkflowNodeCondition{
83 Type: v1alpha1.ConditionAborted,
84 Status: corev1.ConditionTrue,
85 Reason: v1alpha1.WorkflowAborted,
86 })
87 } else {
88 SetCondition(&entryNode.Status, v1alpha1.WorkflowNodeCondition{
89 Type: v1alpha1.ConditionAborted,
90 Status: corev1.ConditionFalse,
91 Reason: "",
92 })
93 }
94
95 return client.IgnoreNotFound(it.kubeClient.Status().Update(ctx, &entryNode))
96 })
97
98 return reconcile.Result{}, client.IgnoreNotFound(updateError)
99 }
100