1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21 "strings"
22 "time"
23
24 . "github.com/onsi/ginkgo/v2"
25 . "github.com/onsi/gomega"
26 corev1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/types"
29 "k8s.io/client-go/util/retry"
30 "k8s.io/utils/pointer"
31 "sigs.k8s.io/controller-runtime/pkg/client"
32
33 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
34 )
35
36
37 var _ = Describe("Workflow", func() {
38 var ns string
39 BeforeEach(func() {
40 ctx := context.TODO()
41 newNs := corev1.Namespace{
42 ObjectMeta: metav1.ObjectMeta{
43 GenerateName: "chaos-mesh-",
44 },
45 Spec: corev1.NamespaceSpec{},
46 }
47 Expect(kubeClient.Create(ctx, &newNs)).To(Succeed())
48 ns = newNs.Name
49 By(fmt.Sprintf("create new namespace %s", ns))
50 })
51
52 AfterEach(func() {
53 ctx := context.TODO()
54 nsToDelete := corev1.Namespace{}
55 Expect(kubeClient.Get(ctx, types.NamespacedName{Name: ns}, &nsToDelete)).To(Succeed())
56 Expect(kubeClient.Delete(ctx, &nsToDelete)).To(Succeed())
57 By(fmt.Sprintf("cleanup namespace %s", ns))
58 })
59
60 Context("with deadline", func() {
61 Context("on suspend", func() {
62 It("should do nothing except waiting", func() {
63 ctx := context.TODO()
64 now := time.Now()
65 duration := 5 * time.Second
66 toleratedJitter := 3 * time.Second
67
68 By("create simple suspend node")
69 startTime := metav1.NewTime(now)
70 deadline := metav1.NewTime(now.Add(duration))
71 node := v1alpha1.WorkflowNode{
72 ObjectMeta: metav1.ObjectMeta{
73 Namespace: ns,
74 GenerateName: "suspend-node-",
75 },
76 Spec: v1alpha1.WorkflowNodeSpec{
77 WorkflowName: "",
78 Type: v1alpha1.TypeSuspend,
79 StartTime: &startTime,
80 Deadline: &deadline,
81 },
82 }
83 Expect(kubeClient.Create(ctx, &node)).To(Succeed())
84
85
86
87 By("assert this node is finished")
88 Eventually(func() bool {
89 updatedNode := v1alpha1.WorkflowNode{}
90 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
91 return ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue)
92 }, duration+toleratedJitter, time.Second).Should(BeTrue())
93
94 Eventually(func() bool {
95 updatedNode := v1alpha1.WorkflowNode{}
96 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
97 return WorkflowNodeFinished(updatedNode.Status)
98 }, toleratedJitter, time.Second).Should(BeTrue())
99 })
100 })
101
102 Context("on chaos node with chaos", func() {
103 It("should delete chaos as soon as deadline exceed", func() {
104 ctx := context.TODO()
105 now := time.Now()
106 duration := 5 * time.Second
107 toleratedJitter := 3 * time.Second
108
109 By("create simple chaos node with pod chaos")
110 startTime := metav1.NewTime(now)
111 deadline := metav1.NewTime(now.Add(duration))
112 node := v1alpha1.WorkflowNode{
113 ObjectMeta: metav1.ObjectMeta{
114 Namespace: ns,
115 GenerateName: "pod-chaos-",
116 },
117 Spec: v1alpha1.WorkflowNodeSpec{
118 WorkflowName: "",
119 Type: v1alpha1.TypePodChaos,
120 StartTime: &startTime,
121 Deadline: &deadline,
122 EmbedChaos: &v1alpha1.EmbedChaos{
123 PodChaos: &v1alpha1.PodChaosSpec{
124 ContainerSelector: v1alpha1.ContainerSelector{
125 PodSelector: v1alpha1.PodSelector{
126 Selector: v1alpha1.PodSelectorSpec{
127 GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
128 Namespaces: []string{ns},
129 LabelSelectors: map[string]string{
130 "app": "not-actually-exist",
131 },
132 },
133 },
134 Mode: v1alpha1.AllMode,
135 },
136 ContainerNames: nil,
137 },
138 Action: v1alpha1.PodKillAction,
139 },
140 },
141 },
142 }
143 Expect(kubeClient.Create(ctx, &node)).To(Succeed())
144
145 By("assert that pod chaos CR is created")
146 Eventually(func() bool {
147 updatedNode := v1alpha1.WorkflowNode{}
148 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
149 if !ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionChaosInjected, corev1.ConditionTrue) {
150 return false
151 }
152 chaos := v1alpha1.PodChaos{}
153 err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: updatedNode.Status.ChaosResource.Name}, &chaos)
154 return err == nil
155 }, toleratedJitter, time.Second).Should(BeTrue())
156
157 By("assert that pod chaos should be purged")
158 Eventually(func() bool {
159 podChaosList := v1alpha1.PodChaosList{}
160 Expect(kubeClient.List(ctx, &podChaosList, &client.ListOptions{Namespace: ns})).To(Succeed())
161 return len(podChaosList.Items) == 0
162 }, duration+toleratedJitter, time.Second).Should(BeTrue())
163 })
164 })
165
166 Context("on chaos node with schedule", func() {
167 It("should delete schedule as soon as deadline exceed", func() {
168 ctx := context.TODO()
169 now := time.Now()
170 duration := 5 * time.Second
171 toleratedJitter := 3 * time.Second
172
173 By("create simple chaos node with pod chaos")
174 startTime := metav1.NewTime(now)
175 deadline := metav1.NewTime(now.Add(duration))
176 node := v1alpha1.WorkflowNode{
177 ObjectMeta: metav1.ObjectMeta{
178 Namespace: ns,
179 GenerateName: "pod-chaos-",
180 },
181 Spec: v1alpha1.WorkflowNodeSpec{
182 WorkflowName: "",
183 Type: v1alpha1.TypeSchedule,
184 StartTime: &startTime,
185 Deadline: &deadline,
186 Schedule: &v1alpha1.ScheduleSpec{
187 Schedule: "@every 1s",
188 StartingDeadlineSeconds: nil,
189 ConcurrencyPolicy: v1alpha1.AllowConcurrent,
190 HistoryLimit: 5,
191 Type: v1alpha1.ScheduleTypePodChaos,
192 ScheduleItem: v1alpha1.ScheduleItem{
193 EmbedChaos: v1alpha1.EmbedChaos{
194 PodChaos: &v1alpha1.PodChaosSpec{
195 ContainerSelector: v1alpha1.ContainerSelector{
196 PodSelector: v1alpha1.PodSelector{
197 Selector: v1alpha1.PodSelectorSpec{
198 GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
199 Namespaces: []string{ns},
200 LabelSelectors: map[string]string{
201 "app": "not-actually-exist",
202 },
203 },
204 },
205 Mode: v1alpha1.AllMode,
206 },
207 ContainerNames: nil,
208 },
209 Action: v1alpha1.PodKillAction,
210 },
211 },
212 },
213 },
214 },
215 }
216 Expect(kubeClient.Create(ctx, &node)).To(Succeed())
217
218 By("assert that schedule CR is created")
219 Eventually(func() bool {
220 updatedNode := v1alpha1.WorkflowNode{}
221 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
222 if !ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionChaosInjected, corev1.ConditionTrue) {
223 return false
224 }
225 schedule := v1alpha1.Schedule{}
226 err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: updatedNode.Status.ChaosResource.Name}, &schedule)
227 return err == nil
228 }, toleratedJitter, time.Second).Should(BeTrue())
229
230 By("assert that schedule should be purged")
231 Eventually(func() bool {
232 scheduleList := v1alpha1.ScheduleList{}
233 Expect(kubeClient.List(ctx, &scheduleList, &client.ListOptions{Namespace: ns})).To(Succeed())
234 return len(scheduleList.Items) == 0
235 }, duration+toleratedJitter, time.Second).Should(BeTrue())
236 })
237 })
238
239 Context("on serial", func() {
240 It("should shutdown all children of serial", func() {
241 ctx := context.TODO()
242 serialDuration := 3 * time.Second
243 durationOfSubTask1 := time.Second
244 durationOfSubTask2 := 5 * time.Second
245 durationOfSubTask3 := 5 * time.Second
246 toleratedJitter := 2 * time.Second
247
248 maxConsisting := durationOfSubTask1 + durationOfSubTask2 + durationOfSubTask3
249
250 workflow := v1alpha1.Workflow{
251 ObjectMeta: metav1.ObjectMeta{
252 Namespace: ns,
253 GenerateName: "fake-workflow-serial-",
254 },
255 Spec: v1alpha1.WorkflowSpec{
256 Entry: "entry-serial",
257 Templates: []v1alpha1.Template{{
258 Name: "entry-serial",
259 Type: v1alpha1.TypeSerial,
260 Deadline: pointer.StringPtr(serialDuration.String()),
261 Children: []string{
262 "serial-task-1",
263 "serial-task-2",
264 "serial-task-3",
265 },
266 }, {
267 Name: "serial-task-1",
268 Type: v1alpha1.TypeSuspend,
269 Deadline: pointer.StringPtr(durationOfSubTask1.String()),
270 }, {
271 Name: "serial-task-2",
272 Type: v1alpha1.TypeSuspend,
273 Deadline: pointer.StringPtr(durationOfSubTask2.String()),
274 }, {
275 Name: "serial-task-3",
276 Type: v1alpha1.TypeSuspend,
277 Deadline: pointer.StringPtr(durationOfSubTask3.String()),
278 }},
279 },
280 Status: v1alpha1.WorkflowStatus{},
281 }
282
283 By("create workflow with serial entry")
284 Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
285
286 By("task 1 should be created")
287 task1Name := ""
288 Eventually(func() bool {
289 workflowNodes := v1alpha1.WorkflowNodeList{}
290 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
291 for _, item := range workflowNodes.Items {
292 if strings.HasPrefix(item.Name, "serial-task-1") {
293 task1Name = item.Name
294 return true
295 }
296 }
297 return false
298 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
299
300 Expect(task1Name).NotTo(BeEmpty())
301
302 By("task 1 will be DeadlineExceed by itself")
303 Eventually(func() bool {
304 taskNode1 := v1alpha1.WorkflowNode{}
305 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task1Name}, &taskNode1)).To(Succeed())
306 condition := GetCondition(taskNode1.Status, v1alpha1.ConditionDeadlineExceed)
307 if condition == nil {
308 return false
309 }
310 if condition.Status != corev1.ConditionTrue {
311 return false
312 }
313 if condition.Reason != v1alpha1.NodeDeadlineExceed {
314 return false
315 }
316 return true
317 }, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
318
319 By("task 2 should be created")
320 task2Name := ""
321 Eventually(func() bool {
322 workflowNodes := v1alpha1.WorkflowNodeList{}
323 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
324 for _, item := range workflowNodes.Items {
325 if strings.HasPrefix(item.Name, "serial-task-2") {
326 task2Name = item.Name
327 return true
328 }
329 }
330 return false
331 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
332 Expect(task2Name).NotTo(BeEmpty())
333
334 By("task 2 should be DeadlineExceed by parent")
335 taskNode2 := v1alpha1.WorkflowNode{}
336 Eventually(func() bool {
337 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task2Name}, &taskNode2)).To(Succeed())
338 condition := GetCondition(taskNode2.Status, v1alpha1.ConditionDeadlineExceed)
339 if condition == nil {
340 return false
341 }
342 if condition.Status != corev1.ConditionTrue {
343 return false
344 }
345 if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
346 return false
347 }
348 return true
349 }, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
350
351 By("entry serial should also be DeadlineExceed by itself")
352 entryNode := v1alpha1.WorkflowNode{}
353 entryNodeName := taskNode2.Labels[v1alpha1.LabelControlledBy]
354 Expect(entryNodeName).NotTo(BeEmpty())
355 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: entryNodeName}, &entryNode)).To(Succeed())
356 condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
357 Expect(condition).NotTo(BeNil())
358 Expect(condition.Status).To(Equal(corev1.ConditionTrue))
359 Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
360
361 By("task 3 should NEVER be created")
362 Consistently(
363 func() bool {
364 workflowNodes := v1alpha1.WorkflowNodeList{}
365 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
366 for _, item := range workflowNodes.Items {
367 if strings.HasPrefix(item.Name, "serial-task-3") {
368 return false
369 }
370 }
371 return true
372 },
373 maxConsisting+toleratedJitter, time.Second).Should(BeTrue())
374 })
375 })
376
377 Context("on parallel", func() {
378 It("should shutdown all children of parallel", func() {
379 ctx := context.TODO()
380 parallelDuration := 3 * time.Second
381 durationOfSubTask1 := time.Second
382 durationOfSubTask2 := 5 * time.Second
383 durationOfSubTask3 := 5 * time.Second
384 toleratedJitter := 2 * time.Second
385
386 workflow := v1alpha1.Workflow{
387 ObjectMeta: metav1.ObjectMeta{
388 Namespace: ns,
389 GenerateName: "fake-workflow-parallel-",
390 },
391 Spec: v1alpha1.WorkflowSpec{
392 Entry: "entry-parallel",
393 Templates: []v1alpha1.Template{{
394 Name: "entry-parallel",
395 Type: v1alpha1.TypeParallel,
396 Deadline: pointer.StringPtr(parallelDuration.String()),
397 Children: []string{
398 "parallel-task-1",
399 "parallel-task-2",
400 "parallel-task-3",
401 },
402 }, {
403 Name: "parallel-task-1",
404 Type: v1alpha1.TypeSuspend,
405 Deadline: pointer.StringPtr(durationOfSubTask1.String()),
406 }, {
407 Name: "parallel-task-2",
408 Type: v1alpha1.TypeSuspend,
409 Deadline: pointer.StringPtr(durationOfSubTask2.String()),
410 }, {
411 Name: "parallel-task-3",
412 Type: v1alpha1.TypeSuspend,
413 Deadline: pointer.StringPtr(durationOfSubTask3.String()),
414 }},
415 },
416 Status: v1alpha1.WorkflowStatus{},
417 }
418
419 By("create workflow with parallel entry")
420 Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
421
422 By("task 1,task 2,task 3 should be created")
423 task1Name := ""
424 task2Name := ""
425 task3Name := ""
426 Eventually(func() bool {
427 workflowNodes := v1alpha1.WorkflowNodeList{}
428 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
429 for _, item := range workflowNodes.Items {
430 if strings.HasPrefix(item.Name, "parallel-task-1") {
431 task1Name = item.Name
432 return true
433 }
434 }
435 return false
436 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
437 Eventually(func() bool {
438 workflowNodes := v1alpha1.WorkflowNodeList{}
439 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
440 for _, item := range workflowNodes.Items {
441 if strings.HasPrefix(item.Name, "parallel-task-2") {
442 task2Name = item.Name
443 return true
444 }
445 }
446 return false
447 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
448 Eventually(func() bool {
449 workflowNodes := v1alpha1.WorkflowNodeList{}
450 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
451 for _, item := range workflowNodes.Items {
452 if strings.HasPrefix(item.Name, "parallel-task-3") {
453 task3Name = item.Name
454 return true
455 }
456 }
457 return false
458 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
459
460 Expect(task1Name).NotTo(BeEmpty())
461 Expect(task2Name).NotTo(BeEmpty())
462 Expect(task3Name).NotTo(BeEmpty())
463
464 By("task 1 should be DeadlineExceed by itself")
465 Eventually(func() bool {
466 taskNode := v1alpha1.WorkflowNode{}
467 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task1Name}, &taskNode)).To(Succeed())
468 condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
469 if condition == nil {
470 return false
471 }
472 if condition.Status != corev1.ConditionTrue {
473 return false
474 }
475 if condition.Reason != v1alpha1.NodeDeadlineExceed {
476 return false
477 }
478 return true
479 }, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
480
481 By("task 2 and task 3 should be DeadlineExceed by parent")
482 for _, nodeName := range []string{task2Name, task3Name} {
483 Eventually(func() bool {
484 taskNode := v1alpha1.WorkflowNode{}
485 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: nodeName}, &taskNode)).To(Succeed())
486 condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
487 if condition == nil {
488 return false
489 }
490 if condition.Status != corev1.ConditionTrue {
491 return false
492 }
493 if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
494 return false
495 }
496 return true
497 }, parallelDuration+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
498 }
499 By("entry parallel should also be DeadlineExceed by itself")
500 updateWorkflow := v1alpha1.Workflow{}
501 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: workflow.Name}, &updateWorkflow)).To(Succeed())
502 entryNodeName := updateWorkflow.Status.EntryNode
503 Expect(entryNodeName).NotTo(BeNil())
504 Expect(*entryNodeName).NotTo(BeEmpty())
505 entryNode := v1alpha1.WorkflowNode{}
506 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: *entryNodeName}, &entryNode)).To(Succeed())
507 condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
508 Expect(condition).NotTo(BeNil())
509 Expect(condition.Status).To(Equal(corev1.ConditionTrue))
510 Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
511 })
512 })
513
514 Context("nested serial or parallel", func() {
515 It("should shutdown children recursively", func() {
516 ctx := context.TODO()
517 parallelDuration := 3 * time.Second
518 durationOfSuspend := 10 * time.Second
519 toleratedJitter := 2 * time.Second
520
521 workflow := v1alpha1.Workflow{
522 ObjectMeta: metav1.ObjectMeta{
523 Namespace: ns,
524 GenerateName: "fake-workflow-parallel-",
525 },
526 Spec: v1alpha1.WorkflowSpec{
527 Entry: "entry-parallel",
528 Templates: []v1alpha1.Template{{
529 Name: "entry-parallel",
530 Type: v1alpha1.TypeParallel,
531 Deadline: pointer.StringPtr(parallelDuration.String()),
532 Children: []string{
533 "parallel-level-1",
534 },
535 }, {
536 Name: "parallel-level-1",
537 Type: v1alpha1.TypeParallel,
538 Children: []string{
539 "parallel-level-2",
540 },
541 }, {
542 Name: "parallel-level-2",
543 Type: v1alpha1.TypeParallel,
544 Children: []string{
545 "suspend-task",
546 },
547 }, {
548 Name: "suspend-task",
549 Type: v1alpha1.TypeSuspend,
550 Deadline: pointer.StringPtr(durationOfSuspend.String()),
551 }},
552 },
553 Status: v1alpha1.WorkflowStatus{},
554 }
555
556 By("create workflow with parallel entry")
557 Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
558
559 By("all the node should be created")
560 parallelLevel1NodeName := ""
561 parallelLevel2NodeName := ""
562 suspendTaskNodeName := ""
563 Eventually(func() bool {
564 workflowNodes := v1alpha1.WorkflowNodeList{}
565 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
566 for _, item := range workflowNodes.Items {
567 if strings.HasPrefix(item.Name, "parallel-level-1") {
568 parallelLevel1NodeName = item.Name
569 return true
570 }
571 }
572 return false
573 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
574 Eventually(func() bool {
575 workflowNodes := v1alpha1.WorkflowNodeList{}
576 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
577 for _, item := range workflowNodes.Items {
578 if strings.HasPrefix(item.Name, "parallel-level-2") {
579 parallelLevel2NodeName = item.Name
580 return true
581 }
582 }
583 return false
584 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
585 Eventually(func() bool {
586 workflowNodes := v1alpha1.WorkflowNodeList{}
587 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
588 for _, item := range workflowNodes.Items {
589 if strings.HasPrefix(item.Name, "suspend-task") {
590 suspendTaskNodeName = item.Name
591 return true
592 }
593 }
594 return false
595 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
596
597 Expect(parallelLevel1NodeName).NotTo(BeEmpty())
598 Expect(parallelLevel2NodeName).NotTo(BeEmpty())
599 Expect(suspendTaskNodeName).NotTo(BeEmpty())
600
601 By("parallel level 1, parallel level 2 and suspend task should be DeadlineExceed by parent")
602 for _, nodeName := range []string{parallelLevel1NodeName, parallelLevel2NodeName, suspendTaskNodeName} {
603 Eventually(func() bool {
604 taskNode := v1alpha1.WorkflowNode{}
605 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: nodeName}, &taskNode)).To(Succeed())
606 condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
607 if condition == nil {
608 return false
609 }
610 if condition.Status != corev1.ConditionTrue {
611 return false
612 }
613 if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
614 return false
615 }
616 return true
617 }, parallelDuration+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
618 }
619
620 By("entry parallel should also be DeadlineExceed by itself")
621 updateWorkflow := v1alpha1.Workflow{}
622 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: workflow.Name}, &updateWorkflow)).To(Succeed())
623 entryNodeName := updateWorkflow.Status.EntryNode
624 Expect(entryNodeName).NotTo(BeNil())
625 Expect(*entryNodeName).NotTo(BeEmpty())
626 entryNode := v1alpha1.WorkflowNode{}
627 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: *entryNodeName}, &entryNode)).To(Succeed())
628 condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
629 Expect(condition).NotTo(BeNil())
630 Expect(condition.Status).To(Equal(corev1.ConditionTrue))
631 Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
632
633 })
634 })
635
636 Context("if this node is already in DeadlineExceed because of ParentNodeDeadlineExceed", func() {
637 It("should omit the next coming deadline", func() {
638 ctx := context.TODO()
639 now := time.Now()
640 duration := 5 * time.Second
641 toleratedJitter := 3 * time.Second
642
643 startTime := metav1.NewTime(now)
644 deadline := metav1.NewTime(now.Add(duration))
645
646 By("create one empty podchaos workflow node, with deadline: 3s")
647 node := v1alpha1.WorkflowNode{
648 ObjectMeta: metav1.ObjectMeta{
649 Namespace: ns,
650 GenerateName: "pod-chaos-",
651 },
652 Spec: v1alpha1.WorkflowNodeSpec{
653 WorkflowName: "",
654 Type: v1alpha1.TypePodChaos,
655 StartTime: &startTime,
656 Deadline: &deadline,
657 EmbedChaos: &v1alpha1.EmbedChaos{
658 PodChaos: &v1alpha1.PodChaosSpec{
659 ContainerSelector: v1alpha1.ContainerSelector{
660 PodSelector: v1alpha1.PodSelector{
661 Selector: v1alpha1.PodSelectorSpec{
662 GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
663 Namespaces: []string{ns},
664 LabelSelectors: map[string]string{
665 "app": "not-actually-exist",
666 },
667 },
668 },
669 Mode: v1alpha1.AllMode,
670 },
671 ContainerNames: nil,
672 },
673 Action: v1alpha1.PodKillAction,
674 },
675 },
676 },
677 Status: v1alpha1.WorkflowNodeStatus{},
678 }
679 Expect(kubeClient.Create(ctx, &node)).To(Succeed())
680 By("manually set condition ConditionDeadlineExceed to true, because of v1alpha1.ParentNodeDeadlineExceed")
681 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
682 deadlineExceedNode := v1alpha1.WorkflowNode{}
683
684 err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &deadlineExceedNode)
685 if err != nil {
686 return err
687 }
688 deadlineExceedNode.Status.Conditions = []v1alpha1.WorkflowNodeCondition{
689 {
690 Type: v1alpha1.ConditionDeadlineExceed,
691 Status: corev1.ConditionTrue,
692 Reason: v1alpha1.ParentNodeDeadlineExceed,
693 },
694 }
695 err = kubeClient.Status().Update(ctx, &deadlineExceedNode)
696 if err != nil {
697 return err
698 }
699 return nil
700 })
701 Expect(updateError).To(BeNil())
702 By("after 3 seconds, the condition ConditionDeadlineExceed should not be modified")
703 Consistently(func() bool {
704 updatedNode := v1alpha1.WorkflowNode{}
705 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
706
707 condition := GetCondition(updatedNode.Status, v1alpha1.ConditionDeadlineExceed)
708 if condition == nil {
709 return false
710 }
711 if condition.Status != corev1.ConditionTrue {
712 return false
713 }
714 if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
715 return false
716 }
717 return true
718 },
719 duration+toleratedJitter, time.Second,
720 ).Should(BeTrue())
721 })
722
723 It("should NOT omit the next coming deadline otherwise", func() {
724 ctx := context.TODO()
725 now := time.Now()
726 duration := 5 * time.Second
727 toleratedJitter := 3 * time.Second
728
729 startTime := metav1.NewTime(now)
730 deadline := metav1.NewTime(now.Add(duration))
731
732 By("create one empty podchaos workflow node, with deadline: 3s")
733 node := v1alpha1.WorkflowNode{
734 ObjectMeta: metav1.ObjectMeta{
735 Namespace: ns,
736 GenerateName: "pod-chaos-",
737 },
738 Spec: v1alpha1.WorkflowNodeSpec{
739 WorkflowName: "",
740 Type: v1alpha1.TypePodChaos,
741 StartTime: &startTime,
742 Deadline: &deadline,
743 EmbedChaos: &v1alpha1.EmbedChaos{
744 PodChaos: &v1alpha1.PodChaosSpec{
745 ContainerSelector: v1alpha1.ContainerSelector{
746 PodSelector: v1alpha1.PodSelector{
747 Selector: v1alpha1.PodSelectorSpec{
748 GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
749 Namespaces: []string{ns},
750 LabelSelectors: map[string]string{
751 "app": "not-actually-exist",
752 },
753 },
754 },
755 Mode: v1alpha1.AllMode,
756 },
757 ContainerNames: nil,
758 },
759 Action: v1alpha1.PodKillAction,
760 },
761 },
762 },
763 Status: v1alpha1.WorkflowNodeStatus{},
764 }
765 Expect(kubeClient.Create(ctx, &node)).To(Succeed())
766 By("manually set condition ConditionDeadlineExceed to true, but NOT caused by v1alpha1.ParentNodeDeadlineExceed")
767 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
768 deadlineExceedNode := v1alpha1.WorkflowNode{}
769
770 err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &deadlineExceedNode)
771 if err != nil {
772 return err
773 }
774 deadlineExceedNode.Status.Conditions = []v1alpha1.WorkflowNodeCondition{
775 {
776 Type: v1alpha1.ConditionDeadlineExceed,
777 Status: corev1.ConditionTrue,
778 Reason: v1alpha1.NodeDeadlineExceed,
779 },
780 }
781 err = kubeClient.Status().Update(ctx, &deadlineExceedNode)
782 if err != nil {
783 return err
784 }
785 return nil
786 })
787 Expect(updateError).To(BeNil())
788 By("condition ConditionDeadlineExceed should be corrected soon")
789 Eventually(func() bool {
790 updatedNode := v1alpha1.WorkflowNode{}
791 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
792 return ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionFalse)
793 },
794 toleratedJitter,
795 time.Second)
796 By("after 5 seconds, the condition ConditionDeadlineExceed should not be modified, caused by NodeDeadlineExceed itself")
797 Eventually(func() bool {
798 updatedNode := v1alpha1.WorkflowNode{}
799 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
800
801 condition := GetCondition(updatedNode.Status, v1alpha1.ConditionDeadlineExceed)
802 if condition == nil {
803 return false
804 }
805 if condition.Status != corev1.ConditionTrue {
806 return false
807 }
808 if condition.Reason != v1alpha1.NodeDeadlineExceed {
809 return false
810 }
811 return true
812 },
813 duration+toleratedJitter, time.Second,
814 ).Should(BeTrue())
815 })
816 })
817 })
818 })
819