1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21 "time"
22
23 "github.com/go-logr/logr"
24 corev1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/types"
26 "k8s.io/client-go/util/retry"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28 "sigs.k8s.io/controller-runtime/pkg/reconcile"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
32 )
33
34
35 type ParallelNodeReconciler struct {
36 *ChildNodesFetcher
37 kubeClient client.Client
38 eventRecorder recorder.ChaosRecorder
39 logger logr.Logger
40 }
41
42 func NewParallelNodeReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *ParallelNodeReconciler {
43 return &ParallelNodeReconciler{
44 ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
45 kubeClient: kubeClient,
46 eventRecorder: eventRecorder,
47 logger: logger,
48 }
49 }
50
51
52 func (it *ParallelNodeReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
53 startTime := time.Now()
54 defer func() {
55 it.logger.V(4).Info("Finished syncing for parallel node",
56 "node", request.NamespacedName,
57 "duration", time.Since(startTime),
58 )
59 }()
60
61 node := v1alpha1.WorkflowNode{}
62 err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
63 if err != nil {
64 return reconcile.Result{}, client.IgnoreNotFound(err)
65 }
66
67
68 if node.Spec.Type != v1alpha1.TypeParallel {
69 return reconcile.Result{}, nil
70 }
71
72 it.logger.V(4).Info("resolve parallel node", "node", request)
73
74
75 err = it.syncChildNodes(ctx, node)
76 if err != nil {
77 return reconcile.Result{}, err
78 }
79
80
81 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
82 nodeNeedUpdate := v1alpha1.WorkflowNode{}
83 err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
84 if err != nil {
85 return err
86 }
87
88 activeChildren, finishedChildren, err := it.fetchChildNodes(ctx, nodeNeedUpdate)
89 if err != nil {
90 return err
91 }
92
93 nodeNeedUpdate.Status.FinishedChildren = nil
94 for _, finishedChild := range finishedChildren {
95 nodeNeedUpdate.Status.FinishedChildren = append(nodeNeedUpdate.Status.FinishedChildren,
96 corev1.LocalObjectReference{
97 Name: finishedChild.Name,
98 })
99 }
100
101 nodeNeedUpdate.Status.ActiveChildren = nil
102 for _, activeChild := range activeChildren {
103 nodeNeedUpdate.Status.ActiveChildren = append(nodeNeedUpdate.Status.ActiveChildren,
104 corev1.LocalObjectReference{
105 Name: activeChild.Name,
106 })
107 }
108
109
110 if len(finishedChildren) == len(nodeNeedUpdate.Spec.Children) {
111 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
112 Type: v1alpha1.ConditionAccomplished,
113 Status: corev1.ConditionTrue,
114 Reason: "",
115 })
116 it.eventRecorder.Event(&nodeNeedUpdate, recorder.NodeAccomplished{})
117 } else {
118 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
119 Type: v1alpha1.ConditionAccomplished,
120 Status: corev1.ConditionFalse,
121 Reason: "",
122 })
123 }
124
125 return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
126 })
127
128 if updateError != nil {
129 it.logger.Error(err, "failed to update the status of node", "node", request)
130 return reconcile.Result{}, updateError
131 }
132
133 return reconcile.Result{}, nil
134 }
135
136 func (it *ParallelNodeReconciler) syncChildNodes(ctx context.Context, node v1alpha1.WorkflowNode) error {
137
138
139 if len(node.Spec.Children) == 0 {
140 it.logger.V(4).Info("empty parallel node, NOOP",
141 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
142 )
143 return nil
144 }
145
146 if WorkflowNodeFinished(node.Status) {
147 return nil
148 }
149
150 activeChildNodes, finishedChildNodes, err := it.fetchChildNodes(ctx, node)
151 if err != nil {
152 return err
153 }
154 existsChildNodes := append(activeChildNodes, finishedChildNodes...)
155
156 var taskNamesOfNodes []string
157 for _, childNode := range existsChildNodes {
158 taskNamesOfNodes = append(taskNamesOfNodes, getTaskNameFromGeneratedName(childNode.GetName()))
159 }
160
161 var tasksToStartup []string
162
163 if len(existsChildNodes) == 0 {
164 tasksToStartup = node.Spec.Children
165 }
166
167
168 if len(existsChildNodes) > 0 && (len(setDifference(taskNamesOfNodes, node.Spec.Children)) > 0 || len(setDifference(node.Spec.Children, taskNamesOfNodes)) > 0) {
169 tasksToStartup = node.Spec.Children
170
171 var nodesToCleanup []string
172 for _, item := range existsChildNodes {
173 nodesToCleanup = append(nodesToCleanup, item.Name)
174 }
175 it.eventRecorder.Event(&node, recorder.RerunBySpecChanged{CleanedChildrenNode: nodesToCleanup})
176
177 for _, childNode := range existsChildNodes {
178
179 err := it.kubeClient.Delete(ctx, &childNode)
180 if err != nil {
181 it.logger.Error(err, "failed to delete outdated child node",
182 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
183 "child node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name),
184 )
185 }
186 }
187
188 }
189
190 if len(tasksToStartup) == 0 {
191 it.logger.Info("no need to spawn new child node", "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
192 return nil
193 }
194
195 parentWorkflow := v1alpha1.Workflow{}
196 err = it.kubeClient.Get(ctx, types.NamespacedName{
197 Namespace: node.Namespace,
198 Name: node.Spec.WorkflowName,
199 }, &parentWorkflow)
200 if err != nil {
201 it.logger.Error(err, "failed to fetch parent workflow",
202 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
203 "workflow name", node.Spec.WorkflowName)
204 return err
205 }
206
207 childNodes, err := renderNodesByTemplates(&parentWorkflow, &node, tasksToStartup...)
208 if err != nil {
209 it.logger.Error(err, "failed to render children childNodes",
210 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
211 return err
212 }
213
214 var childrenNames []string
215 for _, childNode := range childNodes {
216 err := it.kubeClient.Create(ctx, childNode)
217 if err != nil {
218 it.logger.Error(err, "failed to create child node",
219 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
220 "child node", childNode)
221 return err
222 }
223 childrenNames = append(childrenNames, childNode.Name)
224 }
225 it.eventRecorder.Event(&node, recorder.NodesCreated{ChildNodes: childrenNames})
226 it.logger.Info("parallel node spawn new child node",
227 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
228 "child node", childrenNames)
229
230 return nil
231 }
232