1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21
22 "github.com/go-logr/logr"
23 "github.com/pkg/errors"
24 corev1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/types"
26 "k8s.io/client-go/util/retry"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28 "sigs.k8s.io/controller-runtime/pkg/reconcile"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
32 )
33
34 type AbortNodeReconciler struct {
35 *ChildNodesFetcher
36 kubeClient client.Client
37 eventRecorder recorder.ChaosRecorder
38 logger logr.Logger
39 }
40
41 func NewAbortNodeReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *AbortNodeReconciler {
42 return &AbortNodeReconciler{
43 ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
44 kubeClient: kubeClient,
45 eventRecorder: eventRecorder,
46 logger: logger,
47 }
48 }
49
50
51
52
53
54 func (it *AbortNodeReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
55 node := v1alpha1.WorkflowNode{}
56 err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
57 if err != nil {
58 return reconcile.Result{}, client.IgnoreNotFound(err)
59 }
60
61 if !ConditionEqualsTo(node.Status, v1alpha1.ConditionAborted, corev1.ConditionTrue) {
62 return reconcile.Result{}, nil
63 }
64
65 if node.Spec.Type != v1alpha1.TypeStatusCheck {
66
67 return reconcile.Result{}, it.propagateAbortToChildren(ctx, &node)
68 }
69
70 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
71 if err := it.abortWorkflow(ctx, node); client.IgnoreNotFound(err) != nil {
72 return errors.Wrapf(err, "abort parent workflow")
73 }
74 return nil
75 })
76
77 return reconcile.Result{}, client.IgnoreNotFound(updateError)
78 }
79
80 func (it *AbortNodeReconciler) propagateAbortToChildren(ctx context.Context, parent *v1alpha1.WorkflowNode) error {
81 switch parent.Spec.Type {
82 case v1alpha1.TypeSerial, v1alpha1.TypeParallel, v1alpha1.TypeTask:
83 activeChildNodes, _, err := it.ChildNodesFetcher.fetchChildNodes(ctx, *parent)
84 if err != nil {
85 return errors.Wrap(err, "fetch children nodes")
86 }
87 for _, childNode := range activeChildNodes {
88 childNode := childNode
89
90 if WorkflowNodeFinished(childNode.Status) {
91 it.logger.Info("child node already finished, skip for propagate abort", "node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name))
92 continue
93 }
94
95 err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
96 nodeNeedUpdate := v1alpha1.WorkflowNode{}
97 err := it.kubeClient.Get(ctx, types.NamespacedName{
98 Namespace: childNode.Namespace,
99 Name: childNode.Name,
100 }, &nodeNeedUpdate)
101 if err != nil {
102 return errors.Wrap(err, "get child workflow node")
103 }
104 if ConditionEqualsTo(nodeNeedUpdate.Status, v1alpha1.ConditionAborted, corev1.ConditionTrue) {
105 it.logger.Info("omit propagate abort to children, child already aborted",
106 "node", fmt.Sprintf("%s/%s", nodeNeedUpdate.Namespace, nodeNeedUpdate.Name),
107 "parent node", fmt.Sprintf("%s/%s", parent.Namespace, parent.Name),
108 )
109 return nil
110 }
111 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
112 Type: v1alpha1.ConditionAborted,
113 Status: corev1.ConditionTrue,
114 Reason: v1alpha1.ParentNodeAborted,
115 })
116 it.eventRecorder.Event(&nodeNeedUpdate, recorder.ParentNodeAborted{ParentNodeName: parent.Name})
117 return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
118 })
119 if err != nil {
120 return errors.Wrap(err, "update status of child workflow node")
121 }
122 it.logger.Info("propagate abort for child node",
123 "child node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name),
124 "parent node", fmt.Sprintf("%s/%s", parent.Namespace, parent.Name),
125 )
126 }
127 return nil
128 default:
129 it.logger.V(4).Info("no need to propagate with this type of workflow node", "type", parent.Spec.Type)
130 return nil
131 }
132 }
133
134 func (it *AbortNodeReconciler) abortWorkflow(ctx context.Context, node v1alpha1.WorkflowNode) error {
135 parentWorkflow, err := getParentWorkflow(ctx, it.kubeClient, node)
136 if err != nil {
137 return errors.WithStack(err)
138 }
139 if WorkflowAborted(*parentWorkflow) {
140 return nil
141 }
142
143 it.logger.Info("add abort annotation to parent workflow",
144 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
145 "workflow", fmt.Sprintf("%s/%s", parentWorkflow.Namespace, parentWorkflow.Name))
146 parentWorkflow.Annotations[v1alpha1.WorkflowAnnotationAbort] = "true"
147 return it.kubeClient.Update(ctx, parentWorkflow)
148 }
149