1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "encoding/json"
21 "fmt"
22 "time"
23
24 "github.com/go-logr/logr"
25 "github.com/pkg/errors"
26 corev1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/types"
29 "k8s.io/client-go/rest"
30 "k8s.io/client-go/util/retry"
31 "sigs.k8s.io/controller-runtime/pkg/client"
32 "sigs.k8s.io/controller-runtime/pkg/reconcile"
33
34 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
35 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
36 "github.com/chaos-mesh/chaos-mesh/pkg/workflow/task"
37 "github.com/chaos-mesh/chaos-mesh/pkg/workflow/task/collector"
38 )
39
40 type TaskReconciler struct {
41 *ChildNodesFetcher
42 kubeClient client.Client
43 restConfig *rest.Config
44 eventRecorder recorder.ChaosRecorder
45 logger logr.Logger
46 }
47
48 func NewTaskReconciler(kubeClient client.Client, restConfig *rest.Config, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *TaskReconciler {
49 return &TaskReconciler{
50 ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
51 kubeClient: kubeClient,
52 restConfig: restConfig,
53 eventRecorder: eventRecorder,
54 logger: logger,
55 }
56 }
57
58 func (it *TaskReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
59
60 startTime := time.Now()
61 defer func() {
62 it.logger.V(4).Info("Finished syncing for task node",
63 "node", request.NamespacedName,
64 "duration", time.Since(startTime),
65 )
66 }()
67
68 node := v1alpha1.WorkflowNode{}
69 err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
70 if err != nil {
71 return reconcile.Result{}, client.IgnoreNotFound(err)
72 }
73
74
75 if node.Spec.Type != v1alpha1.TypeTask {
76 return reconcile.Result{}, nil
77 }
78
79 it.logger.V(4).Info("resolve task node", "node", request)
80
81 pods, err := it.FetchPodControlledByThisWorkflowNode(ctx, node)
82 if err != nil {
83 return reconcile.Result{}, err
84 }
85
86 if len(pods) == 0 {
87 if workflowName, ok := node.Labels[v1alpha1.LabelWorkflow]; ok {
88 parentWorkflow := v1alpha1.Workflow{}
89 err := it.kubeClient.Get(ctx, types.NamespacedName{
90 Namespace: node.Namespace,
91 Name: workflowName,
92 }, &parentWorkflow)
93 if err != nil {
94 return reconcile.Result{}, err
95 }
96 spawnedPod, err := it.SpawnTaskPod(ctx, &node, &parentWorkflow)
97 if err != nil {
98 it.logger.Error(err, "failed to spawn pod for Task Node", "node", request)
99 it.eventRecorder.Event(&node, recorder.TaskPodSpawnFailed{})
100 return reconcile.Result{}, err
101 }
102 it.eventRecorder.Event(&node, recorder.TaskPodSpawned{PodName: spawnedPod.Name})
103 } else {
104 return reconcile.Result{}, errors.Errorf("node %s/%s does not contains label %s", node.Namespace, node.Name, v1alpha1.LabelWorkflow)
105 }
106
107 }
108
109 if len(pods) > 1 {
110 var podNames []string
111 for _, pod := range pods {
112 podNames = append(podNames, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
113 }
114 it.logger.Info("unexpected more than 1 pod created by task node, it will pick random one",
115 "node", request,
116 "pods", podNames,
117 "picked", fmt.Sprintf("%s/%s", pods[0].Namespace, pods[0].Name),
118 )
119 }
120
121
122 if len(pods) > 0 && (pods[0].Status.Phase == corev1.PodFailed || pods[0].Status.Phase == corev1.PodSucceeded) {
123 evaluated, err := it.conditionalBranchesEvaluated(ctx, node)
124 if err != nil {
125 return reconcile.Result{}, err
126 }
127 if !evaluated {
128 it.eventRecorder.Event(&node, recorder.TaskPodPodCompleted{PodName: pods[0].Name})
129
130 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
131 nodeNeedUpdate := v1alpha1.WorkflowNode{}
132 err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
133 if err != nil {
134 return err
135 }
136
137 if nodeNeedUpdate.Status.ConditionalBranchesStatus == nil {
138 nodeNeedUpdate.Status.ConditionalBranchesStatus = &v1alpha1.ConditionalBranchesStatus{}
139 }
140
141
142 defaultCollector := collector.DefaultCollector(it.kubeClient, it.restConfig, pods[0].Namespace, pods[0].Name, nodeNeedUpdate.Spec.Task.Container.Name)
143 env, err := defaultCollector.CollectContext(ctx)
144 if err != nil {
145 it.logger.Error(err, "failed to fetch env from task",
146 "task", fmt.Sprintf("%s/%s", nodeNeedUpdate.Namespace, nodeNeedUpdate.Name),
147 )
148 return err
149 }
150 if env != nil {
151 jsonString, err := json.Marshal(env)
152 if err != nil {
153 it.logger.Error(err, "failed to convert env to json",
154 "task", fmt.Sprintf("%s/%s", nodeNeedUpdate.Namespace, nodeNeedUpdate.Name),
155 "env", env)
156 } else {
157 nodeNeedUpdate.Status.ConditionalBranchesStatus.Context = []string{string(jsonString)}
158 }
159 }
160
161 evaluator := task.NewEvaluator(it.logger, it.kubeClient)
162 evaluateConditionBranches, err := evaluator.EvaluateConditionBranches(nodeNeedUpdate.Spec.ConditionalBranches, env)
163 if err != nil {
164 it.logger.Error(err, "failed to evaluate expression",
165 "task", fmt.Sprintf("%s/%s", nodeNeedUpdate.Namespace, nodeNeedUpdate.Name),
166 )
167 return err
168 }
169
170 nodeNeedUpdate.Status.ConditionalBranchesStatus.Branches = evaluateConditionBranches
171
172 var selectedBranches []string
173 for _, item := range evaluateConditionBranches {
174 if item.EvaluationResult == corev1.ConditionTrue {
175 selectedBranches = append(selectedBranches, item.Target)
176 }
177 }
178 it.eventRecorder.Event(&nodeNeedUpdate, recorder.ConditionalBranchesSelected{SelectedBranches: selectedBranches})
179
180 err = it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
181 return err
182 })
183 if client.IgnoreNotFound(updateError) != nil {
184 it.logger.Error(updateError, "failed to update the condition status of task",
185 "task", request)
186 }
187 }
188 } else {
189
190 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
191 nodeNeedUpdate := v1alpha1.WorkflowNode{}
192 err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
193 if err != nil {
194 return err
195 }
196
197 var branches []v1alpha1.ConditionalBranchStatus
198
199 if nodeNeedUpdate.Status.ConditionalBranchesStatus == nil {
200 nodeNeedUpdate.Status.ConditionalBranchesStatus = &v1alpha1.ConditionalBranchesStatus{}
201 }
202
203 for _, conditionalTask := range nodeNeedUpdate.Spec.ConditionalBranches {
204 branch := v1alpha1.ConditionalBranchStatus{
205 Target: conditionalTask.Target,
206 EvaluationResult: corev1.ConditionUnknown,
207 }
208 branches = append(branches, branch)
209 }
210
211 nodeNeedUpdate.Status.ConditionalBranchesStatus.Branches = branches
212
213 err = it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
214 return err
215 })
216
217 if client.IgnoreNotFound(updateError) != nil {
218 it.logger.Error(updateError, "k failed to update the condition status of task",
219 "task", request)
220 }
221
222 }
223
224
225 var evaluatedNode v1alpha1.WorkflowNode
226
227 err = it.kubeClient.Get(ctx, request.NamespacedName, &evaluatedNode)
228 if err != nil {
229 return reconcile.Result{}, err
230 }
231 evaluated, err := it.conditionalBranchesEvaluated(ctx, evaluatedNode)
232 if err != nil {
233 return reconcile.Result{}, err
234 }
235 if evaluated {
236 err = it.syncChildNodes(ctx, evaluatedNode)
237 if err != nil {
238 return reconcile.Result{}, err
239 }
240
241
242 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
243 nodeNeedUpdate := v1alpha1.WorkflowNode{}
244 err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
245 if err != nil {
246 return err
247 }
248 var tasks []string
249 for _, branch := range evaluatedNode.Status.ConditionalBranchesStatus.Branches {
250 if branch.EvaluationResult == corev1.ConditionTrue {
251 tasks = append(tasks, branch.Target)
252 }
253 }
254
255 activeChildren, finishedChildren, err := it.fetchChildNodes(ctx, nodeNeedUpdate)
256 if err != nil {
257 return err
258 }
259
260 nodeNeedUpdate.Status.FinishedChildren = nil
261 for _, finishedChild := range finishedChildren {
262 nodeNeedUpdate.Status.FinishedChildren = append(nodeNeedUpdate.Status.FinishedChildren,
263 corev1.LocalObjectReference{
264 Name: finishedChild.Name,
265 })
266 }
267
268 nodeNeedUpdate.Status.ActiveChildren = nil
269 for _, activeChild := range activeChildren {
270 nodeNeedUpdate.Status.ActiveChildren = append(nodeNeedUpdate.Status.ActiveChildren,
271 corev1.LocalObjectReference{
272 Name: activeChild.Name,
273 })
274 }
275
276
277 evaluated, err := it.conditionalBranchesEvaluated(ctx, nodeNeedUpdate)
278 if err != nil {
279 return err
280 }
281 if evaluated && len(finishedChildren) == len(tasks) {
282 if !WorkflowNodeFinished(nodeNeedUpdate.Status) {
283 it.eventRecorder.Event(&nodeNeedUpdate, recorder.NodeAccomplished{})
284 }
285 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
286 Type: v1alpha1.ConditionAccomplished,
287 Status: corev1.ConditionTrue,
288 Reason: "",
289 })
290 } else {
291 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
292 Type: v1alpha1.ConditionAccomplished,
293 Status: corev1.ConditionFalse,
294 Reason: "",
295 })
296 }
297
298 return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
299 })
300
301 return reconcile.Result{}, client.IgnoreNotFound(updateError)
302 }
303
304 return reconcile.Result{}, nil
305
306 }
307
308 func (it *TaskReconciler) syncChildNodes(ctx context.Context, evaluatedNode v1alpha1.WorkflowNode) error {
309
310 var tasks []string
311 for _, branch := range evaluatedNode.Status.ConditionalBranchesStatus.Branches {
312 if branch.EvaluationResult == corev1.ConditionTrue {
313 tasks = append(tasks, branch.Target)
314 }
315 }
316
317 if len(tasks) == 0 {
318 it.logger.V(4).Info("0 condition of branch in task node is True, Noop",
319 "node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name),
320 )
321 return nil
322 }
323
324 activeChildNodes, finishedChildNodes, err := it.fetchChildNodes(ctx, evaluatedNode)
325 if err != nil {
326 return err
327 }
328 existsChildNodes := append(activeChildNodes, finishedChildNodes...)
329
330 var taskNamesOfNodes []string
331 for _, childNode := range existsChildNodes {
332 taskNamesOfNodes = append(taskNamesOfNodes, getTaskNameFromGeneratedName(childNode.GetName()))
333 }
334
335 if len(existsChildNodes) > 0 {
336
337
338 if len(setDifference(taskNamesOfNodes, tasks)) > 0 || len(setDifference(tasks, taskNamesOfNodes)) > 0 {
339
340 var nodesToCleanup []string
341 for _, item := range existsChildNodes {
342 nodesToCleanup = append(nodesToCleanup, item.Name)
343 }
344 it.eventRecorder.Event(&evaluatedNode, recorder.RerunBySpecChanged{CleanedChildrenNode: nodesToCleanup})
345
346 for _, childNode := range existsChildNodes {
347
348 err := it.kubeClient.Delete(ctx, &childNode)
349 if err != nil {
350 it.logger.Error(err, "failed to delete outdated child node",
351 "node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name),
352 "child node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name),
353 )
354 }
355 }
356 } else {
357
358 return nil
359 }
360 }
361
362 parentWorkflow := v1alpha1.Workflow{}
363 err = it.kubeClient.Get(ctx, types.NamespacedName{
364 Namespace: evaluatedNode.Namespace,
365 Name: evaluatedNode.Spec.WorkflowName,
366 }, &parentWorkflow)
367 if err != nil {
368 it.logger.Error(err, "failed to fetch parent workflow",
369 "node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name),
370 "workflow name", evaluatedNode.Spec.WorkflowName)
371 return err
372 }
373
374 childNodes, err := renderNodesByTemplates(&parentWorkflow, &evaluatedNode, tasks...)
375 if err != nil {
376 it.logger.Error(err, "failed to render children childNodes",
377 "node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name))
378 return err
379 }
380
381
382 var childrenNames []string
383 for _, childNode := range childNodes {
384 err := it.kubeClient.Create(ctx, childNode)
385 if err != nil {
386 it.logger.Error(err, "failed to create child node",
387 "node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name),
388 "child node", childNode)
389 return err
390 }
391 childrenNames = append(childrenNames, childNode.Name)
392 }
393 it.eventRecorder.Event(&evaluatedNode, recorder.NodesCreated{ChildNodes: childrenNames})
394 it.logger.Info("task node spawn new child node",
395 "node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name),
396 "child node", childrenNames)
397
398 return nil
399 }
400
401 func (it *TaskReconciler) FetchPodControlledByThisWorkflowNode(ctx context.Context, node v1alpha1.WorkflowNode) ([]corev1.Pod, error) {
402 controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
403 MatchLabels: map[string]string{
404 v1alpha1.LabelControlledBy: node.Name,
405 },
406 })
407
408 if err != nil {
409 it.logger.Error(err, "failed to build label selector with filtering children workflow node controlled by current node",
410 "current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
411 return nil, err
412 }
413
414 var childPods corev1.PodList
415
416 err = it.kubeClient.List(ctx, &childPods, &client.ListOptions{
417 LabelSelector: controlledByThisNode,
418 })
419 if err != nil {
420 return nil, err
421 }
422 return childPods.Items, nil
423 }
424
425 func (it *TaskReconciler) SpawnTaskPod(ctx context.Context, node *v1alpha1.WorkflowNode, workflow *v1alpha1.Workflow) (*corev1.Pod, error) {
426 if node.Spec.Task == nil {
427 return nil, errors.Errorf("node %s/%s does not contains spec of Target", node.Namespace, node.Name)
428 }
429 podSpec, err := task.SpawnPodForTask(*node.Spec.Task)
430 if err != nil {
431 return nil, err
432 }
433 taskPod := corev1.Pod{
434 TypeMeta: metav1.TypeMeta{},
435 ObjectMeta: metav1.ObjectMeta{
436 GenerateName: fmt.Sprintf("%s-", node.Name),
437 Namespace: node.Namespace,
438 Labels: map[string]string{
439 v1alpha1.LabelControlledBy: node.Name,
440 v1alpha1.LabelWorkflow: workflow.Name,
441 },
442 OwnerReferences: []metav1.OwnerReference{
443 {
444 APIVersion: ApiVersion,
445 Kind: KindWorkflowNode,
446 Name: node.Name,
447 UID: node.UID,
448 Controller: &isController,
449 BlockOwnerDeletion: &blockOwnerDeletion,
450 },
451 },
452 Finalizers: []string{metav1.FinalizerDeleteDependents},
453 },
454 Spec: podSpec,
455 }
456 err = it.kubeClient.Create(ctx, &taskPod)
457 if err != nil {
458 return nil, err
459 }
460 return &taskPod, nil
461 }
462
463 func (it *TaskReconciler) conditionalBranchesEvaluated(ctx context.Context, node v1alpha1.WorkflowNode) (bool, error) {
464
465 pods, err := it.FetchPodControlledByThisWorkflowNode(ctx, node)
466 if err != nil {
467 return false, err
468 }
469 if len(pods) == 0 {
470 return false, nil
471 }
472 if !(pods[0].Status.Phase == corev1.PodFailed || pods[0].Status.Phase == corev1.PodSucceeded) {
473 return false, nil
474 }
475
476
477 if node.Status.ConditionalBranchesStatus == nil {
478 return false, nil
479 }
480
481
482 if len(node.Spec.ConditionalBranches) != len(node.Status.ConditionalBranchesStatus.Branches) {
483 return false, nil
484 }
485
486
487 for _, branch := range node.Status.ConditionalBranchesStatus.Branches {
488 if branch.EvaluationResult == corev1.ConditionUnknown {
489 return false, nil
490 }
491 }
492 return true, nil
493 }
494