1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21 "time"
22
23 "github.com/go-logr/logr"
24 corev1 "k8s.io/api/core/v1"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/types"
27 "k8s.io/client-go/util/retry"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29 "sigs.k8s.io/controller-runtime/pkg/reconcile"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
33 )
34
35 type DeadlineReconciler struct {
36 *ChildNodesFetcher
37 kubeClient client.Client
38 eventRecorder recorder.ChaosRecorder
39 logger logr.Logger
40 }
41
42 func NewDeadlineReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *DeadlineReconciler {
43 return &DeadlineReconciler{
44 ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
45 kubeClient: kubeClient,
46 eventRecorder: eventRecorder,
47 logger: logger}
48 }
49
50 func (it *DeadlineReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
51 node := v1alpha1.WorkflowNode{}
52
53 err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
54 if err != nil {
55 return reconcile.Result{}, client.IgnoreNotFound(err)
56 }
57
58 if ConditionEqualsTo(node.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue) {
59
60 return reconcile.Result{}, it.propagateDeadlineToChildren(ctx, &node)
61 }
62
63 if node.Spec.Deadline == nil {
64 return reconcile.Result{}, nil
65 }
66
67 now := metav1.NewTime(time.Now())
68 if node.Spec.Deadline.Before(&now) {
69
70 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
71 nodeNeedUpdate := v1alpha1.WorkflowNode{}
72 err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
73 if err != nil {
74 return err
75 }
76
77 if ConditionEqualsTo(nodeNeedUpdate.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue) {
78
79 return nil
80 }
81
82 var reason string
83 if ConditionEqualsTo(nodeNeedUpdate.Status, v1alpha1.ConditionAccomplished, corev1.ConditionTrue) {
84 reason = v1alpha1.NodeDeadlineOmitted
85 } else {
86 reason = v1alpha1.NodeDeadlineExceed
87 }
88
89 if !ConditionEqualsTo(nodeNeedUpdate.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue) && reason == v1alpha1.NodeDeadlineExceed {
90 it.eventRecorder.Event(&node, recorder.DeadlineExceed{})
91 }
92
93 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
94 Type: v1alpha1.ConditionDeadlineExceed,
95 Status: corev1.ConditionTrue,
96 Reason: reason,
97 })
98
99 return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
100 })
101
102 if updateError != nil {
103 return reconcile.Result{}, updateError
104 }
105 it.logger.Info("deadline exceed", "key", request.NamespacedName, "deadline", node.Spec.Deadline.Time)
106 propagateErr := it.propagateDeadlineToChildren(ctx, &node)
107 if propagateErr != nil {
108 it.logger.Error(propagateErr, "failed to propagate to children nodes", "key", request.NamespacedName)
109 return reconcile.Result{}, propagateErr
110 }
111 } else {
112 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
113 nodeNeedUpdate := v1alpha1.WorkflowNode{}
114 err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
115 if err != nil {
116 return err
117 }
118
119 if ConditionEqualsTo(nodeNeedUpdate.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionFalse) {
120
121 return nil
122 } else if ConditionEqualsTo(nodeNeedUpdate.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue) &&
123 GetCondition(nodeNeedUpdate.Status, v1alpha1.ConditionDeadlineExceed).Reason == v1alpha1.ParentNodeDeadlineExceed {
124 return nil
125 }
126
127 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
128 Type: v1alpha1.ConditionDeadlineExceed,
129 Status: corev1.ConditionFalse,
130 Reason: v1alpha1.NodeDeadlineNotExceed,
131 })
132 return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
133 })
134
135 if updateError != nil {
136 return reconcile.Result{}, updateError
137 }
138 duration := node.Spec.Deadline.Time.Sub(now.Time)
139 it.logger.Info("deadline not exceed, requeue after a while", "key", request.NamespacedName, "deadline", node.Spec.Deadline.Time,
140 "duration", duration)
141 return reconcile.Result{
142 RequeueAfter: duration,
143 }, nil
144 }
145
146 return reconcile.Result{}, nil
147 }
148
149 func (it *DeadlineReconciler) propagateDeadlineToChildren(ctx context.Context, parent *v1alpha1.WorkflowNode) error {
150 switch parent.Spec.Type {
151 case v1alpha1.TypeSerial, v1alpha1.TypeParallel, v1alpha1.TypeTask:
152 activeChildNodes, _, err := it.ChildNodesFetcher.fetchChildNodes(ctx, *parent)
153 if err != nil {
154 return err
155 }
156 for _, childNode := range activeChildNodes {
157 childNode := childNode
158
159 if WorkflowNodeFinished(childNode.Status) {
160 it.logger.Info("child node already finished, skip for propagate deadline", "node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name))
161 continue
162 }
163
164 err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
165 nodeNeedUpdate := v1alpha1.WorkflowNode{}
166 err := it.kubeClient.Get(ctx, types.NamespacedName{
167 Namespace: childNode.Namespace,
168 Name: childNode.Name,
169 }, &nodeNeedUpdate)
170 if err != nil {
171 return err
172 }
173 if ConditionEqualsTo(nodeNeedUpdate.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue) {
174 it.logger.Info("omit propagate deadline to children, child already in deadline exceed",
175 "node", fmt.Sprintf("%s/%s", nodeNeedUpdate.Namespace, nodeNeedUpdate.Name),
176 "parent node", fmt.Sprintf("%s/%s", parent.Namespace, parent.Name),
177 )
178 return nil
179 }
180 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
181 Type: v1alpha1.ConditionDeadlineExceed,
182 Status: corev1.ConditionTrue,
183 Reason: v1alpha1.ParentNodeDeadlineExceed,
184 })
185 it.eventRecorder.Event(&nodeNeedUpdate, recorder.ParentNodeDeadlineExceed{ParentNodeName: parent.Name})
186 return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
187 })
188 if err != nil {
189 return err
190 }
191 it.logger.Info("propagate deadline for child node",
192 "child node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name),
193 "parent node", fmt.Sprintf("%s/%s", parent.Namespace, parent.Name),
194 )
195 }
196 return nil
197 default:
198 it.logger.V(4).Info("no need to propagate with this type of workflow node", "type", parent.Spec.Type)
199 return nil
200 }
201 }
202