1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21 "strings"
22 "time"
23
24 "github.com/go-logr/logr"
25 corev1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/types"
27 "k8s.io/client-go/util/retry"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29 "sigs.k8s.io/controller-runtime/pkg/reconcile"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
33 )
34
35
36 type SerialNodeReconciler struct {
37 *ChildNodesFetcher
38 kubeClient client.Client
39 eventRecorder recorder.ChaosRecorder
40 logger logr.Logger
41 }
42
43 func NewSerialNodeReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *SerialNodeReconciler {
44 return &SerialNodeReconciler{
45 ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
46 kubeClient: kubeClient,
47 eventRecorder: eventRecorder,
48 logger: logger,
49 }
50 }
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 func (it *SerialNodeReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
69 startTime := time.Now()
70 defer func() {
71 it.logger.V(4).Info("Finished syncing for serial node",
72 "node", request.NamespacedName,
73 "duration", time.Since(startTime),
74 )
75 }()
76
77 node := v1alpha1.WorkflowNode{}
78 err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
79 if err != nil {
80 return reconcile.Result{}, client.IgnoreNotFound(err)
81 }
82
83
84 if node.Spec.Type != v1alpha1.TypeSerial {
85 return reconcile.Result{}, nil
86 }
87
88 it.logger.V(4).Info("resolve serial node", "node", request)
89
90
91 err = it.syncChildNodes(ctx, node)
92 if err != nil {
93 return reconcile.Result{}, err
94 }
95
96
97 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
98 nodeNeedUpdate := v1alpha1.WorkflowNode{}
99 err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
100 if err != nil {
101 return err
102 }
103
104 activeChildren, finishedChildren, err := it.fetchChildNodes(ctx, nodeNeedUpdate)
105 if err != nil {
106 return err
107 }
108
109 nodeNeedUpdate.Status.FinishedChildren = nil
110 for _, finishedChild := range finishedChildren {
111 nodeNeedUpdate.Status.FinishedChildren = append(nodeNeedUpdate.Status.FinishedChildren,
112 corev1.LocalObjectReference{
113 Name: finishedChild.Name,
114 })
115 }
116
117 nodeNeedUpdate.Status.ActiveChildren = nil
118 for _, activeChild := range activeChildren {
119 nodeNeedUpdate.Status.ActiveChildren = append(nodeNeedUpdate.Status.ActiveChildren,
120 corev1.LocalObjectReference{
121 Name: activeChild.Name,
122 })
123 }
124
125 if len(activeChildren) > 1 {
126 it.logger.Info("warning: serial node has more than 1 active children", "namespace", nodeNeedUpdate.Namespace, "name", nodeNeedUpdate.Name, "children", nodeNeedUpdate.Status.ActiveChildren)
127 }
128
129
130 if len(finishedChildren) == len(nodeNeedUpdate.Spec.Children) {
131 if !WorkflowNodeFinished(nodeNeedUpdate.Status) {
132 it.eventRecorder.Event(&nodeNeedUpdate, recorder.NodeAccomplished{})
133 }
134 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
135 Type: v1alpha1.ConditionAccomplished,
136 Status: corev1.ConditionTrue,
137 Reason: "",
138 })
139 } else {
140 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
141 Type: v1alpha1.ConditionAccomplished,
142 Status: corev1.ConditionFalse,
143 Reason: "",
144 })
145 }
146
147 return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
148 })
149
150 if updateError != nil {
151 it.logger.Error(err, "failed to update the status of node", "node", request)
152 return reconcile.Result{}, updateError
153 }
154
155 return reconcile.Result{}, nil
156 }
157
158
159
160
161
162
163 func (it *SerialNodeReconciler) syncChildNodes(ctx context.Context, node v1alpha1.WorkflowNode) error {
164
165
166 if len(node.Spec.Children) == 0 {
167 it.logger.V(4).Info("empty serial node, NOOP",
168 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
169 )
170 return nil
171 }
172
173 if WorkflowNodeFinished(node.Status) {
174 return nil
175 }
176
177 activeChildNodes, finishedChildNodes, err := it.fetchChildNodes(ctx, node)
178 if err != nil {
179 return err
180 }
181 var taskToStartup string
182 if len(activeChildNodes) == 0 {
183
184 for index, task := range node.Spec.Children {
185
186
187
188
189
190
191
192
193 if index < len(finishedChildNodes) {
194
195
196
197
198
199
200 if strings.HasPrefix(task, finishedChildNodes[index].Name) {
201
202 taskToStartup = task
203
204
205
206 nodesToDelete := finishedChildNodes[index:]
207
208 if len(nodesToDelete) > 0 {
209 var nodesToCleanup []string
210 for _, item := range nodesToDelete {
211 nodesToCleanup = append(nodesToCleanup, item.Name)
212 }
213 it.eventRecorder.Event(&node, recorder.RerunBySpecChanged{CleanedChildrenNode: nodesToCleanup})
214
215 for _, refToDelete := range nodesToDelete {
216 nodeToDelete := v1alpha1.WorkflowNode{}
217 err := it.kubeClient.Get(ctx, types.NamespacedName{
218 Namespace: node.Namespace,
219 Name: refToDelete.Name,
220 }, &nodeToDelete)
221 if client.IgnoreNotFound(err) != nil {
222 it.logger.Error(err, "failed to fetch outdated child node",
223 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
224 "child node", fmt.Sprintf("%s/%s", node.Namespace, nodeToDelete.Name))
225 }
226 err = it.kubeClient.Delete(ctx, &nodeToDelete)
227 if client.IgnoreNotFound(err) != nil {
228 it.logger.Error(err, "failed to fetch outdated child node",
229 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
230 "child node", fmt.Sprintf("%s/%s", node.Namespace, nodeToDelete.Name))
231 }
232 }
233 }
234 break
235 }
236 } else {
237
238 taskToStartup = task
239 break
240 }
241 }
242 } else {
243 it.logger.V(4).Info("serial node has active child/children, skip scheduling",
244 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
245 "active children", activeChildNodes)
246 }
247
248 if len(taskToStartup) == 0 {
249 it.logger.Info("no need to spawn new child node", "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
250 return nil
251 }
252
253 parentWorkflow := v1alpha1.Workflow{}
254 err = it.kubeClient.Get(ctx, types.NamespacedName{
255 Namespace: node.Namespace,
256 Name: node.Spec.WorkflowName,
257 }, &parentWorkflow)
258 if err != nil {
259 it.logger.Error(err, "failed to fetch parent workflow",
260 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
261 "workflow name", node.Spec.WorkflowName)
262 return err
263 }
264
265 childNodes, err := renderNodesByTemplates(&parentWorkflow, &node, taskToStartup)
266 if err != nil {
267 it.logger.Error(err, "failed to render children childNodes",
268 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
269 return err
270 }
271
272 var childrenNames []string
273 for _, childNode := range childNodes {
274 err := it.kubeClient.Create(ctx, childNode)
275 if err != nil {
276 it.logger.Error(err, "failed to create child node",
277 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
278 "child node", childNode)
279 return err
280 }
281 childrenNames = append(childrenNames, childNode.Name)
282 }
283 it.eventRecorder.Event(&node, recorder.NodesCreated{ChildNodes: childrenNames})
284 it.logger.Info("serial node spawn new child node",
285 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
286 "child node", childrenNames)
287
288 return nil
289 }
290