1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21 "sort"
22 "time"
23
24 "github.com/go-logr/logr"
25 "github.com/pkg/errors"
26 corev1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/client-go/util/retry"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30 "sigs.k8s.io/controller-runtime/pkg/reconcile"
31
32 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
33 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
34 )
35
36 type ChaosNodeReconciler struct {
37 kubeClient client.Client
38 eventRecorder recorder.ChaosRecorder
39 logger logr.Logger
40 }
41
42 func NewChaosNodeReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *ChaosNodeReconciler {
43 return &ChaosNodeReconciler{kubeClient: kubeClient, eventRecorder: eventRecorder, logger: logger}
44 }
45
46 func (it *ChaosNodeReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
47
48 startTime := time.Now()
49 defer func() {
50 it.logger.V(4).Info("Finished syncing for chaos node",
51 "node", request.NamespacedName,
52 "duration", time.Since(startTime),
53 )
54 }()
55
56 node := v1alpha1.WorkflowNode{}
57
58 err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
59 if err != nil {
60 return reconcile.Result{}, client.IgnoreNotFound(err)
61 }
62
63 if !v1alpha1.IsChaosTemplateType(node.Spec.Type) {
64 return reconcile.Result{}, nil
65 }
66
67 it.logger.V(4).Info("resolve chaos node", "node", request)
68
69 if node.Spec.Type == v1alpha1.TypeSchedule {
70 err := it.syncSchedule(ctx, node)
71 if err != nil {
72 return reconcile.Result{}, err
73 }
74 } else {
75 err = it.syncChaosResources(ctx, node)
76 if err != nil {
77 return reconcile.Result{}, err
78 }
79 }
80
81 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
82 nodeNeedUpdate := v1alpha1.WorkflowNode{}
83 err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
84 if err != nil {
85 return client.IgnoreNotFound(err)
86 }
87
88 if nodeNeedUpdate.Spec.Type == v1alpha1.TypeSchedule {
89
90 scheduleList, err := it.fetchChildrenSchedule(ctx, nodeNeedUpdate)
91 if err != nil {
92 return client.IgnoreNotFound(err)
93 }
94 if len(scheduleList) > 1 {
95 it.logger.Info("the number of schedule custom resource affected by chaos node is more than 1",
96 "chaos node", fmt.Sprintf("%s/%s", nodeNeedUpdate.Namespace, nodeNeedUpdate.Name),
97 "schedule custom resources", scheduleList,
98 )
99 }
100 if len(scheduleList) > 0 {
101 scheduleObject := scheduleList[0]
102 group := scheduleObject.GetObjectKind().GroupVersionKind().Group
103 chaosRef := corev1.TypedLocalObjectReference{
104 APIGroup: &group,
105 Kind: scheduleObject.GetObjectKind().GroupVersionKind().Kind,
106 Name: scheduleObject.GetName(),
107 }
108 nodeNeedUpdate.Status.ChaosResource = &chaosRef
109 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
110 Type: v1alpha1.ConditionChaosInjected,
111 Status: corev1.ConditionTrue,
112 Reason: v1alpha1.ChaosCRCreated,
113 })
114 } else {
115 nodeNeedUpdate.Status.ChaosResource = nil
116 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
117 Type: v1alpha1.ConditionChaosInjected,
118 Status: corev1.ConditionFalse,
119 Reason: v1alpha1.ChaosCRNotExists,
120 })
121 }
122
123 return client.IgnoreNotFound(it.kubeClient.Status().Update(ctx, &nodeNeedUpdate))
124 }
125
126
127 chaosList, err := it.fetchChildrenChaosCustomResource(ctx, nodeNeedUpdate)
128 if err != nil {
129 return client.IgnoreNotFound(err)
130 }
131 if len(chaosList) > 1 {
132 it.logger.Info("the number of chaos custom resource affected by chaos node is more than 1",
133 "chaos node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
134 "chaos custom resources", chaosList,
135 )
136 }
137
138 if len(chaosList) > 0 {
139 chaosObject := chaosList[0]
140 group := chaosObject.GetObjectKind().GroupVersionKind().Group
141 chaosRef := corev1.TypedLocalObjectReference{
142 APIGroup: &group,
143 Kind: chaosObject.GetObjectKind().GroupVersionKind().Kind,
144 Name: chaosObject.GetName(),
145 }
146 nodeNeedUpdate.Status.ChaosResource = &chaosRef
147 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
148 Type: v1alpha1.ConditionChaosInjected,
149 Status: corev1.ConditionTrue,
150 Reason: v1alpha1.ChaosCRCreated,
151 })
152 } else {
153 nodeNeedUpdate.Status.ChaosResource = nil
154 SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
155 Type: v1alpha1.ConditionChaosInjected,
156 Status: corev1.ConditionFalse,
157 Reason: v1alpha1.ChaosCRNotExists,
158 })
159 }
160
161 return client.IgnoreNotFound(it.kubeClient.Status().Update(ctx, &nodeNeedUpdate))
162 })
163
164 return reconcile.Result{}, updateError
165 }
166
167 func (it *ChaosNodeReconciler) syncSchedule(ctx context.Context, node v1alpha1.WorkflowNode) error {
168 scheduleList, err := it.fetchChildrenSchedule(ctx, node)
169 if err != nil {
170 return err
171 }
172 if WorkflowNodeFinished(node.Status) {
173
174 for _, item := range scheduleList {
175 item := item
176 err := it.kubeClient.Delete(ctx, &item)
177 if client.IgnoreNotFound(err) != nil {
178 it.logger.Error(err, "failed to delete schedule CR for workflow chaos node",
179 "namespace", node.Namespace,
180 "chaos node", node.Name,
181 "schedule CR name", item.GetName(),
182 )
183 it.eventRecorder.Event(&node, recorder.ChaosCustomResourceDeleteFailed{
184 Name: item.GetName(),
185 Kind: item.GetObjectKind().GroupVersionKind().Kind,
186 })
187 } else {
188 it.eventRecorder.Event(&node, recorder.ChaosCustomResourceDeleted{
189 Name: item.GetName(),
190 Kind: item.GetObjectKind().GroupVersionKind().Kind,
191 })
192 }
193 }
194 return nil
195 }
196 if len(scheduleList) == 0 {
197 return it.createSchedule(ctx, node)
198 } else if len(scheduleList) > 1 {
199
200
201 var scheduleCrToRemove []string
202 for _, item := range scheduleList[1:] {
203 scheduleCrToRemove = append(scheduleCrToRemove, item.GetName())
204 }
205
206 it.logger.Info("removing duplicated schedule custom resource",
207 "chaos node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
208 "schedule cr to remove", scheduleCrToRemove,
209 )
210
211 for _, item := range scheduleList[1:] {
212
213 item := item
214 err := it.kubeClient.Delete(ctx, &item)
215 if client.IgnoreNotFound(err) != nil {
216 it.logger.Error(err, "failed to delete schedule CR for workflow chaos node",
217 "namespace", node.Namespace,
218 "chaos node", node.Name,
219 "schedule CR name", item.GetName(),
220 )
221 }
222 }
223 } else {
224 it.logger.V(4).Info("do not need spawn or remove schedule CR")
225 }
226 return nil
227
228 }
229
230 func (it *ChaosNodeReconciler) syncChaosResources(ctx context.Context, node v1alpha1.WorkflowNode) error {
231
232 chaosList, err := it.fetchChildrenChaosCustomResource(ctx, node)
233 if err != nil {
234 return err
235 }
236
237 if WorkflowNodeFinished(node.Status) {
238
239 for _, item := range chaosList {
240
241 item := item
242
243 err := it.kubeClient.Delete(ctx, item)
244 if client.IgnoreNotFound(err) != nil {
245 it.logger.Error(err, "failed to delete chaos CR for workflow chaos node",
246 "namespace", node.Namespace,
247 "chaos node", node.Name,
248 "chaos CR name", item.GetName(),
249 )
250 it.eventRecorder.Event(&node, recorder.ChaosCustomResourceDeleteFailed{
251 Name: item.GetName(),
252 Kind: item.GetObjectKind().GroupVersionKind().Kind,
253 })
254 } else {
255 it.eventRecorder.Event(&node, recorder.ChaosCustomResourceDeleted{
256 Name: item.GetName(),
257 Kind: item.GetObjectKind().GroupVersionKind().Kind,
258 })
259 }
260 }
261 return nil
262 }
263
264 if len(chaosList) == 0 {
265 return it.createChaos(ctx, node)
266 } else if len(chaosList) > 1 {
267
268 var chaosCrToRemove []string
269 for _, item := range chaosList[1:] {
270 chaosCrToRemove = append(chaosCrToRemove, item.GetName())
271 }
272
273 it.logger.Info("removing duplicated chaos custom resource",
274 "chaos node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
275 "chaos cr to remove", chaosCrToRemove,
276 )
277
278 for _, item := range chaosList[1:] {
279
280 item := item
281 err := it.kubeClient.Delete(ctx, item)
282 if client.IgnoreNotFound(err) != nil {
283 it.logger.Error(err, "failed to delete chaos CR for workflow chaos node",
284 "namespace", node.Namespace,
285 "chaos node", node.Name,
286 "chaos CR name", item.GetName(),
287 )
288 }
289 }
290 } else {
291 it.logger.V(4).Info("do not need spawn or remove chaos CR")
292 }
293
294
295
296 return nil
297 }
298
299
300 func (it *ChaosNodeReconciler) createChaos(ctx context.Context, node v1alpha1.WorkflowNode) error {
301
302 chaosObject, err := node.Spec.EmbedChaos.SpawnNewObject(node.Spec.Type)
303 if err != nil {
304 return err
305 }
306
307 chaosObject.SetGenerateName(fmt.Sprintf("%s-", node.Name))
308 chaosObject.SetNamespace(node.Namespace)
309 chaosObject.SetOwnerReferences(append(chaosObject.GetOwnerReferences(), metav1.OwnerReference{
310 APIVersion: node.APIVersion,
311 Kind: node.Kind,
312 Name: node.Name,
313 UID: node.UID,
314 Controller: &isController,
315 BlockOwnerDeletion: &blockOwnerDeletion,
316 }))
317 chaosObject.SetLabels(map[string]string{
318 v1alpha1.LabelControlledBy: node.Name,
319 v1alpha1.LabelWorkflow: node.Spec.WorkflowName,
320 })
321
322 err = it.kubeClient.Create(ctx, chaosObject)
323 if err != nil {
324 it.eventRecorder.Event(&node, recorder.ChaosCustomResourceCreateFailed{})
325 it.logger.Error(err, "failed to create chaos")
326 return nil
327 }
328 it.logger.Info("chaos object created", "namespace", chaosObject.GetNamespace(), "name", chaosObject.GetName(), "parent node", node)
329 it.eventRecorder.Event(&node, recorder.ChaosCustomResourceCreated{
330 Name: chaosObject.GetName(),
331 Kind: chaosObject.GetObjectKind().GroupVersionKind().Kind,
332 })
333 return nil
334 }
335
336 func (it *ChaosNodeReconciler) fetchChildrenChaosCustomResource(ctx context.Context, node v1alpha1.WorkflowNode) ([]v1alpha1.GenericChaos, error) {
337 genericChaosList, err := node.Spec.EmbedChaos.SpawnNewList(node.Spec.Type)
338 if err != nil {
339 return nil, err
340 }
341 controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
342 MatchLabels: map[string]string{
343 v1alpha1.LabelControlledBy: node.Name,
344 },
345 })
346 if err != nil {
347 it.logger.Error(err, "failed to build label selector with filtering children workflow node controlled by current node",
348 "current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
349 return nil, err
350 }
351
352 err = it.kubeClient.List(ctx, genericChaosList, &client.ListOptions{
353 LabelSelector: controlledByThisNode,
354 })
355 if err != nil {
356 return nil, err
357 }
358
359 var sorted SortGenericChaosByCreationTimestamp = genericChaosList.GetItems()
360 sort.Sort(sorted)
361 return sorted, err
362 }
363
364 func (it ChaosNodeReconciler) createSchedule(ctx context.Context, node v1alpha1.WorkflowNode) error {
365 if node.Spec.Schedule == nil {
366 return errors.New("invalid workfow node, the spec of schedule is nil")
367 }
368 scheduleToCreate := v1alpha1.Schedule{
369 TypeMeta: metav1.TypeMeta{},
370 ObjectMeta: metav1.ObjectMeta{
371 Namespace: node.Namespace,
372 GenerateName: fmt.Sprintf("%s-", node.Name),
373 Labels: map[string]string{
374 v1alpha1.LabelControlledBy: node.Name,
375 v1alpha1.LabelWorkflow: node.Spec.WorkflowName,
376 },
377 OwnerReferences: []metav1.OwnerReference{
378 {
379 APIVersion: node.APIVersion,
380 Kind: node.Kind,
381 Name: node.Name,
382 UID: node.UID,
383 Controller: &isController,
384 BlockOwnerDeletion: &blockOwnerDeletion,
385 },
386 },
387 },
388 Spec: *node.Spec.Schedule,
389 }
390 err := it.kubeClient.Create(ctx, &scheduleToCreate)
391 if err != nil {
392 it.eventRecorder.Event(&node, recorder.ChaosCustomResourceCreateFailed{})
393 it.logger.Error(err, "failed to create schedule CR")
394 return nil
395 }
396 it.logger.Info("schedule CR created", "namespace", scheduleToCreate.GetNamespace(), "name", scheduleToCreate.GetName())
397 it.eventRecorder.Event(&node, recorder.ChaosCustomResourceCreated{
398 Name: scheduleToCreate.GetName(),
399 Kind: scheduleToCreate.GetObjectKind().GroupVersionKind().Kind,
400 })
401 return nil
402
403 }
404
405 func (it *ChaosNodeReconciler) fetchChildrenSchedule(ctx context.Context, node v1alpha1.WorkflowNode) ([]v1alpha1.Schedule, error) {
406 var scheduleList v1alpha1.ScheduleList
407 controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
408 MatchLabels: map[string]string{
409 v1alpha1.LabelControlledBy: node.Name,
410 },
411 })
412 if err != nil {
413 it.logger.Error(err, "failed to build label selector with filtering children workflow node controlled by current node",
414 "current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
415 return nil, err
416 }
417 err = it.kubeClient.List(ctx, &scheduleList, &client.ListOptions{
418 LabelSelector: controlledByThisNode,
419 })
420 if err != nil {
421 return nil, err
422 }
423 var sorted SortScheduleByCreationTimestamp = scheduleList.Items
424 sort.Sort(sorted)
425 return sorted, err
426 }
427
428 type SortGenericChaosByCreationTimestamp []v1alpha1.GenericChaos
429
430 func (it SortGenericChaosByCreationTimestamp) Len() int {
431 return len(it)
432 }
433
434 func (it SortGenericChaosByCreationTimestamp) Less(i, j int) bool {
435 return it[j].GetCreationTimestamp().After(it[i].GetCreationTimestamp().Time)
436 }
437
438 func (it SortGenericChaosByCreationTimestamp) Swap(i, j int) {
439 it[i], it[j] = it[j], it[i]
440 }
441
442 type SortScheduleByCreationTimestamp []v1alpha1.Schedule
443
444 func (it SortScheduleByCreationTimestamp) Len() int {
445 return len(it)
446 }
447
448 func (it SortScheduleByCreationTimestamp) Less(i, j int) bool {
449 return it[j].GetCreationTimestamp().After(it[i].GetCreationTimestamp().Time)
450 }
451
452 func (it SortScheduleByCreationTimestamp) Swap(i, j int) {
453 it[i], it[j] = it[j], it[i]
454 }
455