1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21 "time"
22
23 "github.com/go-logr/logr"
24 corev1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/types"
26 "k8s.io/client-go/util/retry"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28 "sigs.k8s.io/controller-runtime/pkg/reconcile"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
32 )
33
34
35 type ParallelNodeReconciler struct {
36 *ChildNodesFetcher
37 kubeClient client.Client
38 eventRecorder recorder.ChaosRecorder
39 logger logr.Logger
40 }
41
42 func NewParallelNodeReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *ParallelNodeReconciler {
43 return &ParallelNodeReconciler{
44 ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
45 kubeClient: kubeClient,
46 eventRecorder: eventRecorder,
47 logger: logger,
48 }
49 }
50
51
52 func (it *ParallelNodeReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
53 startTime := time.Now()
54 defer func() {
55 it.logger.V(4).Info("Finished syncing for parallel node",
56 "node", request.NamespacedName,
57 "duration", time.Since(startTime),
58 )
59 }()
60
61 node := v1alpha1.WorkflowNode{}
62 err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
63 if err != nil {
64 return reconcile.Result{}, client.IgnoreNotFound(err)
65 }
66
67
68 if node.Spec.Type != v1alpha1.TypeParallel {
69 return reconcile.Result{}, nil
70 }
71
72 it.logger.V(4).Info("resolve parallel node", "node", request)
73
74
75 err = it.syncChildNodes(ctx, node)
76 if err != nil {
77 return reconcile.Result{}, err
78 }
79
80
81 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
82 nodeNeedUpdate := v1alpha1.WorkflowNode{}
83 err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
84 if err != nil {
85 return err
86 }
87
88 activeChildren, finishedChildren, err := it.fetchChildNodes(ctx, nodeNeedUpdate)
89 if err != nil {
90 return err
91 }
92
93 nodeNeedUpdate.Status.FinishedChildren = nil
94 for _, finishedChild := range finishedChildren {
95 nodeNeedUpdate.Status.FinishedChildren = append(nodeNeedUpdate.Status.FinishedChildren,
96 corev1.LocalObjectReference{
97 Name: finishedChild.Name,
98 })
99 }
100
101 nodeNeedUpdate.Status.ActiveChildren = nil
102 for _, activeChild := range activeChildren {
103 nodeNeedUpdate.Status.ActiveChildren = append(nodeNeedUpdate.Status.ActiveChildren,
104 corev1.LocalObjectReference{
105 Name: activeChild.Name,
106 })
107 }
108
109
110 if len(finishedChildren) == len(nodeNeedUpdate.Spec.Children) {
111 if !WorkflowNodeFinished(nodeNeedUpdate.Status) {
112 it.eventRecorder.Event(&nodeNeedUpdate, recorder.NodeAccomplished{})
113 }
114 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
115 Type: v1alpha1.ConditionAccomplished,
116 Status: corev1.ConditionTrue,
117 Reason: "",
118 })
119 } else {
120 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
121 Type: v1alpha1.ConditionAccomplished,
122 Status: corev1.ConditionFalse,
123 Reason: "",
124 })
125 }
126
127 return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
128 })
129
130 if updateError != nil {
131 it.logger.Error(err, "failed to update the status of node", "node", request)
132 return reconcile.Result{}, updateError
133 }
134
135 return reconcile.Result{}, nil
136 }
137
138 func (it *ParallelNodeReconciler) syncChildNodes(ctx context.Context, node v1alpha1.WorkflowNode) error {
139
140
141 if len(node.Spec.Children) == 0 {
142 it.logger.V(4).Info("empty parallel node, NOOP",
143 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
144 )
145 return nil
146 }
147
148 if WorkflowNodeFinished(node.Status) {
149 return nil
150 }
151
152 activeChildNodes, finishedChildNodes, err := it.fetchChildNodes(ctx, node)
153 if err != nil {
154 return err
155 }
156 existsChildNodes := append(activeChildNodes, finishedChildNodes...)
157
158 var taskNamesOfNodes []string
159 for _, childNode := range existsChildNodes {
160 taskNamesOfNodes = append(taskNamesOfNodes, getTaskNameFromGeneratedName(childNode.GetName()))
161 }
162
163 var tasksToStartup []string
164
165 if len(existsChildNodes) == 0 {
166 tasksToStartup = node.Spec.Children
167 }
168
169
170 if len(existsChildNodes) > 0 && (len(setDifference(taskNamesOfNodes, node.Spec.Children)) > 0 || len(setDifference(node.Spec.Children, taskNamesOfNodes)) > 0) {
171 tasksToStartup = node.Spec.Children
172
173 var nodesToCleanup []string
174 for _, item := range existsChildNodes {
175 nodesToCleanup = append(nodesToCleanup, item.Name)
176 }
177 it.eventRecorder.Event(&node, recorder.RerunBySpecChanged{CleanedChildrenNode: nodesToCleanup})
178
179 for _, childNode := range existsChildNodes {
180
181 err := it.kubeClient.Delete(ctx, &childNode)
182 if err != nil {
183 it.logger.Error(err, "failed to delete outdated child node",
184 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
185 "child node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name),
186 )
187 }
188 }
189
190 }
191
192 if len(tasksToStartup) == 0 {
193 it.logger.Info("no need to spawn new child node", "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
194 return nil
195 }
196
197 parentWorkflow := v1alpha1.Workflow{}
198 err = it.kubeClient.Get(ctx, types.NamespacedName{
199 Namespace: node.Namespace,
200 Name: node.Spec.WorkflowName,
201 }, &parentWorkflow)
202 if err != nil {
203 it.logger.Error(err, "failed to fetch parent workflow",
204 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
205 "workflow name", node.Spec.WorkflowName)
206 return err
207 }
208
209 childNodes, err := renderNodesByTemplates(&parentWorkflow, &node, tasksToStartup...)
210 if err != nil {
211 it.logger.Error(err, "failed to render children childNodes",
212 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
213 return err
214 }
215
216 var childrenNames []string
217 for _, childNode := range childNodes {
218 err := it.kubeClient.Create(ctx, childNode)
219 if err != nil {
220 it.logger.Error(err, "failed to create child node",
221 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
222 "child node", childNode)
223 return err
224 }
225 childrenNames = append(childrenNames, childNode.Name)
226 }
227 it.eventRecorder.Event(&node, recorder.NodesCreated{ChildNodes: childrenNames})
228 it.logger.Info("parallel node spawn new child node",
229 "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
230 "child node", childrenNames)
231
232 return nil
233 }
234