1
2
3
4
5
6
7
8
9
10
11
12
13
14 package controllers
15
16 import (
17 "context"
18 "fmt"
19 "strings"
20 "time"
21
22 . "github.com/onsi/ginkgo"
23 . "github.com/onsi/gomega"
24 corev1 "k8s.io/api/core/v1"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/types"
27 "k8s.io/client-go/util/retry"
28 "k8s.io/utils/pointer"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 )
33
34
35 var _ = Describe("Workflow", func() {
36 var ns string
37 BeforeEach(func() {
38 ctx := context.TODO()
39 newNs := corev1.Namespace{
40 ObjectMeta: metav1.ObjectMeta{
41 GenerateName: "chaos-mesh-",
42 },
43 Spec: corev1.NamespaceSpec{},
44 }
45 Expect(kubeClient.Create(ctx, &newNs)).To(Succeed())
46 ns = newNs.Name
47 By(fmt.Sprintf("create new namespace %s", ns))
48 })
49
50 AfterEach(func() {
51 ctx := context.TODO()
52 nsToDelete := corev1.Namespace{}
53 Expect(kubeClient.Get(ctx, types.NamespacedName{Name: ns}, &nsToDelete)).To(Succeed())
54 Expect(kubeClient.Delete(ctx, &nsToDelete)).To(Succeed())
55 By(fmt.Sprintf("cleanup namespace %s", ns))
56 })
57
58 Context("with deadline", func() {
59 Context("on suspend", func() {
60 It("should do nothing except waiting", func() {
61 ctx := context.TODO()
62 now := time.Now()
63 duration := 5 * time.Second
64 toleratedJitter := 3 * time.Second
65
66 By("create simple suspend node")
67 startTime := metav1.NewTime(now)
68 deadline := metav1.NewTime(now.Add(duration))
69 node := v1alpha1.WorkflowNode{
70 ObjectMeta: metav1.ObjectMeta{
71 Namespace: ns,
72 GenerateName: "suspend-node-",
73 },
74 Spec: v1alpha1.WorkflowNodeSpec{
75 WorkflowName: "",
76 Type: v1alpha1.TypeSuspend,
77 StartTime: &startTime,
78 Deadline: &deadline,
79 },
80 }
81 Expect(kubeClient.Create(ctx, &node)).To(Succeed())
82
83
84
85 By("assert this node is finished")
86 Eventually(func() bool {
87 updatedNode := v1alpha1.WorkflowNode{}
88 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
89 return ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue)
90 }, duration+toleratedJitter, time.Second).Should(BeTrue())
91
92 Eventually(func() bool {
93 updatedNode := v1alpha1.WorkflowNode{}
94 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
95 return WorkflowNodeFinished(updatedNode.Status)
96 }, toleratedJitter, time.Second).Should(BeTrue())
97 })
98 })
99
100 Context("on chaos node with chaos", func() {
101 It("should delete chaos as soon as deadline exceed", func() {
102 ctx := context.TODO()
103 now := time.Now()
104 duration := 5 * time.Second
105 toleratedJitter := 3 * time.Second
106
107 By("create simple chaos node with pod chaos")
108 startTime := metav1.NewTime(now)
109 deadline := metav1.NewTime(now.Add(duration))
110 node := v1alpha1.WorkflowNode{
111 ObjectMeta: metav1.ObjectMeta{
112 Namespace: ns,
113 GenerateName: "pod-chaos-",
114 },
115 Spec: v1alpha1.WorkflowNodeSpec{
116 WorkflowName: "",
117 Type: v1alpha1.TypePodChaos,
118 StartTime: &startTime,
119 Deadline: &deadline,
120 EmbedChaos: &v1alpha1.EmbedChaos{
121 PodChaos: &v1alpha1.PodChaosSpec{
122 ContainerSelector: v1alpha1.ContainerSelector{
123 PodSelector: v1alpha1.PodSelector{
124 Selector: v1alpha1.PodSelectorSpec{
125 Namespaces: []string{ns},
126 LabelSelectors: map[string]string{
127 "app": "not-actually-exist",
128 },
129 },
130 Mode: v1alpha1.AllPodMode,
131 },
132 ContainerNames: nil,
133 },
134 Action: v1alpha1.PodKillAction,
135 },
136 },
137 },
138 }
139 Expect(kubeClient.Create(ctx, &node)).To(Succeed())
140
141 By("assert that pod chaos CR is created")
142 Eventually(func() bool {
143 updatedNode := v1alpha1.WorkflowNode{}
144 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
145 if !ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionChaosInjected, corev1.ConditionTrue) {
146 return false
147 }
148 chaos := v1alpha1.PodChaos{}
149 err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: updatedNode.Status.ChaosResource.Name}, &chaos)
150 return err == nil
151 }, toleratedJitter, time.Second).Should(BeTrue())
152
153 By("assert that pod chaos should be purged")
154 Eventually(func() bool {
155 podChaosList := v1alpha1.PodChaosList{}
156 Expect(kubeClient.List(ctx, &podChaosList, &client.ListOptions{Namespace: ns})).To(Succeed())
157 return len(podChaosList.Items) == 0
158 }, duration+toleratedJitter, time.Second).Should(BeTrue())
159 })
160 })
161
162 Context("on chaos node with schedule", func() {
163 It("should delete schedule as soon as deadline exceed", func() {
164 ctx := context.TODO()
165 now := time.Now()
166 duration := 5 * time.Second
167 toleratedJitter := 3 * time.Second
168
169 By("create simple chaos node with pod chaos")
170 startTime := metav1.NewTime(now)
171 deadline := metav1.NewTime(now.Add(duration))
172 node := v1alpha1.WorkflowNode{
173 ObjectMeta: metav1.ObjectMeta{
174 Namespace: ns,
175 GenerateName: "pod-chaos-",
176 },
177 Spec: v1alpha1.WorkflowNodeSpec{
178 WorkflowName: "",
179 Type: v1alpha1.TypeSchedule,
180 StartTime: &startTime,
181 Deadline: &deadline,
182 Schedule: &v1alpha1.ScheduleSpec{
183 Schedule: "@every 1s",
184 StartingDeadlineSeconds: nil,
185 ConcurrencyPolicy: v1alpha1.AllowConcurrent,
186 HistoryLimit: 5,
187 Type: v1alpha1.ScheduleTypePodChaos,
188 ScheduleItem: v1alpha1.ScheduleItem{
189 EmbedChaos: v1alpha1.EmbedChaos{
190 PodChaos: &v1alpha1.PodChaosSpec{
191 ContainerSelector: v1alpha1.ContainerSelector{
192 PodSelector: v1alpha1.PodSelector{
193 Selector: v1alpha1.PodSelectorSpec{
194 Namespaces: []string{ns},
195 LabelSelectors: map[string]string{
196 "app": "not-actually-exist",
197 },
198 },
199 Mode: v1alpha1.AllPodMode,
200 },
201 ContainerNames: nil,
202 },
203 Action: v1alpha1.PodKillAction,
204 },
205 },
206 },
207 },
208 },
209 }
210 Expect(kubeClient.Create(ctx, &node)).To(Succeed())
211
212 By("assert that schedule CR is created")
213 Eventually(func() bool {
214 updatedNode := v1alpha1.WorkflowNode{}
215 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
216 if !ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionChaosInjected, corev1.ConditionTrue) {
217 return false
218 }
219 schedule := v1alpha1.Schedule{}
220 err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: updatedNode.Status.ChaosResource.Name}, &schedule)
221 return err == nil
222 }, toleratedJitter, time.Second).Should(BeTrue())
223
224 By("assert that schedule should be purged")
225 Eventually(func() bool {
226 scheduleList := v1alpha1.ScheduleList{}
227 Expect(kubeClient.List(ctx, &scheduleList, &client.ListOptions{Namespace: ns})).To(Succeed())
228 return len(scheduleList.Items) == 0
229 }, duration+toleratedJitter, time.Second).Should(BeTrue())
230 })
231 })
232
233 Context("on serial", func() {
234 It("should shutdown all children of serial", func() {
235 ctx := context.TODO()
236 serialDuration := 3 * time.Second
237 durationOfSubTask1 := time.Second
238 durationOfSubTask2 := 5 * time.Second
239 durationOfSubTask3 := 5 * time.Second
240 toleratedJitter := 2 * time.Second
241
242 maxConsisting := durationOfSubTask1 + durationOfSubTask2 + durationOfSubTask3
243
244 workflow := v1alpha1.Workflow{
245 ObjectMeta: metav1.ObjectMeta{
246 Namespace: ns,
247 GenerateName: "fake-workflow-serial-",
248 },
249 Spec: v1alpha1.WorkflowSpec{
250 Entry: "entry-serial",
251 Templates: []v1alpha1.Template{{
252 Name: "entry-serial",
253 Type: v1alpha1.TypeSerial,
254 Deadline: pointer.StringPtr(serialDuration.String()),
255 Children: []string{
256 "serial-task-1",
257 "serial-task-2",
258 "serial-task-3",
259 },
260 }, {
261 Name: "serial-task-1",
262 Type: v1alpha1.TypeSuspend,
263 Deadline: pointer.StringPtr(durationOfSubTask1.String()),
264 }, {
265 Name: "serial-task-2",
266 Type: v1alpha1.TypeSuspend,
267 Deadline: pointer.StringPtr(durationOfSubTask2.String()),
268 }, {
269 Name: "serial-task-3",
270 Type: v1alpha1.TypeSuspend,
271 Deadline: pointer.StringPtr(durationOfSubTask3.String()),
272 }},
273 },
274 Status: v1alpha1.WorkflowStatus{},
275 }
276
277 By("create workflow with serial entry")
278 Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
279
280 By("task 1 should be created")
281 task1Name := ""
282 Eventually(func() bool {
283 workflowNodes := v1alpha1.WorkflowNodeList{}
284 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
285 for _, item := range workflowNodes.Items {
286 if strings.HasPrefix(item.Name, "serial-task-1") {
287 task1Name = item.Name
288 return true
289 }
290 }
291 return false
292 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
293
294 Expect(task1Name).NotTo(BeEmpty())
295
296 By("task 1 will be DeadlineExceed by itself")
297 Eventually(func() bool {
298 taskNode1 := v1alpha1.WorkflowNode{}
299 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task1Name}, &taskNode1)).To(Succeed())
300 condition := GetCondition(taskNode1.Status, v1alpha1.ConditionDeadlineExceed)
301 if condition == nil {
302 return false
303 }
304 if condition.Status != corev1.ConditionTrue {
305 return false
306 }
307 if condition.Reason != v1alpha1.NodeDeadlineExceed {
308 return false
309 }
310 return true
311 }, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
312
313 By("task 2 should be created")
314 task2Name := ""
315 Eventually(func() bool {
316 workflowNodes := v1alpha1.WorkflowNodeList{}
317 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
318 for _, item := range workflowNodes.Items {
319 if strings.HasPrefix(item.Name, "serial-task-2") {
320 task2Name = item.Name
321 return true
322 }
323 }
324 return false
325 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
326 Expect(task2Name).NotTo(BeEmpty())
327
328 By("task 2 should be DeadlineExceed by parent")
329 taskNode2 := v1alpha1.WorkflowNode{}
330 Eventually(func() bool {
331 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task2Name}, &taskNode2)).To(Succeed())
332 condition := GetCondition(taskNode2.Status, v1alpha1.ConditionDeadlineExceed)
333 if condition == nil {
334 return false
335 }
336 if condition.Status != corev1.ConditionTrue {
337 return false
338 }
339 if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
340 return false
341 }
342 return true
343 }, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
344
345 By("entry serial should also be DeadlineExceed by itself")
346 entryNode := v1alpha1.WorkflowNode{}
347 entryNodeName := taskNode2.Labels[v1alpha1.LabelControlledBy]
348 Expect(entryNodeName).NotTo(BeEmpty())
349 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: entryNodeName}, &entryNode)).To(Succeed())
350 condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
351 Expect(condition).NotTo(BeNil())
352 Expect(condition.Status).To(Equal(corev1.ConditionTrue))
353 Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
354
355 By("task 3 should NEVER be created")
356 Consistently(
357 func() bool {
358 workflowNodes := v1alpha1.WorkflowNodeList{}
359 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
360 for _, item := range workflowNodes.Items {
361 if strings.HasPrefix(item.Name, "serial-task-3") {
362 return false
363 }
364 }
365 return true
366 },
367 maxConsisting+toleratedJitter, time.Second).Should(BeTrue())
368 })
369 })
370
371 Context("on parallel", func() {
372 It("should shutdown all children of parallel", func() {
373 ctx := context.TODO()
374 parallelDuration := 3 * time.Second
375 durationOfSubTask1 := time.Second
376 durationOfSubTask2 := 5 * time.Second
377 durationOfSubTask3 := 5 * time.Second
378 toleratedJitter := 2 * time.Second
379
380 workflow := v1alpha1.Workflow{
381 ObjectMeta: metav1.ObjectMeta{
382 Namespace: ns,
383 GenerateName: "fake-workflow-parallel-",
384 },
385 Spec: v1alpha1.WorkflowSpec{
386 Entry: "entry-parallel",
387 Templates: []v1alpha1.Template{{
388 Name: "entry-parallel",
389 Type: v1alpha1.TypeParallel,
390 Deadline: pointer.StringPtr(parallelDuration.String()),
391 Children: []string{
392 "parallel-task-1",
393 "parallel-task-2",
394 "parallel-task-3",
395 },
396 }, {
397 Name: "parallel-task-1",
398 Type: v1alpha1.TypeSuspend,
399 Deadline: pointer.StringPtr(durationOfSubTask1.String()),
400 }, {
401 Name: "parallel-task-2",
402 Type: v1alpha1.TypeSuspend,
403 Deadline: pointer.StringPtr(durationOfSubTask2.String()),
404 }, {
405 Name: "parallel-task-3",
406 Type: v1alpha1.TypeSuspend,
407 Deadline: pointer.StringPtr(durationOfSubTask3.String()),
408 }},
409 },
410 Status: v1alpha1.WorkflowStatus{},
411 }
412
413 By("create workflow with parallel entry")
414 Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
415
416 By("task 1,task 2,task 3 should be created")
417 task1Name := ""
418 task2Name := ""
419 task3Name := ""
420 Eventually(func() bool {
421 workflowNodes := v1alpha1.WorkflowNodeList{}
422 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
423 for _, item := range workflowNodes.Items {
424 if strings.HasPrefix(item.Name, "parallel-task-1") {
425 task1Name = item.Name
426 return true
427 }
428 }
429 return false
430 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
431 Eventually(func() bool {
432 workflowNodes := v1alpha1.WorkflowNodeList{}
433 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
434 for _, item := range workflowNodes.Items {
435 if strings.HasPrefix(item.Name, "parallel-task-2") {
436 task2Name = item.Name
437 return true
438 }
439 }
440 return false
441 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
442 Eventually(func() bool {
443 workflowNodes := v1alpha1.WorkflowNodeList{}
444 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
445 for _, item := range workflowNodes.Items {
446 if strings.HasPrefix(item.Name, "parallel-task-3") {
447 task3Name = item.Name
448 return true
449 }
450 }
451 return false
452 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
453
454 Expect(task1Name).NotTo(BeEmpty())
455 Expect(task2Name).NotTo(BeEmpty())
456 Expect(task3Name).NotTo(BeEmpty())
457
458 By("task 1 should be DeadlineExceed by itself")
459 Eventually(func() bool {
460 taskNode := v1alpha1.WorkflowNode{}
461 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task1Name}, &taskNode)).To(Succeed())
462 condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
463 if condition == nil {
464 return false
465 }
466 if condition.Status != corev1.ConditionTrue {
467 return false
468 }
469 if condition.Reason != v1alpha1.NodeDeadlineExceed {
470 return false
471 }
472 return true
473 }, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
474
475 By("task 2 and task 3 should be DeadlineExceed by parent")
476 for _, nodeName := range []string{task2Name, task3Name} {
477 Eventually(func() bool {
478 taskNode := v1alpha1.WorkflowNode{}
479 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: nodeName}, &taskNode)).To(Succeed())
480 condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
481 if condition == nil {
482 return false
483 }
484 if condition.Status != corev1.ConditionTrue {
485 return false
486 }
487 if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
488 return false
489 }
490 return true
491 }, parallelDuration+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
492 }
493 By("entry parallel should also be DeadlineExceed by itself")
494 updateWorkflow := v1alpha1.Workflow{}
495 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: workflow.Name}, &updateWorkflow)).To(Succeed())
496 entryNodeName := updateWorkflow.Status.EntryNode
497 Expect(entryNodeName).NotTo(BeNil())
498 Expect(*entryNodeName).NotTo(BeEmpty())
499 entryNode := v1alpha1.WorkflowNode{}
500 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: *entryNodeName}, &entryNode)).To(Succeed())
501 condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
502 Expect(condition).NotTo(BeNil())
503 Expect(condition.Status).To(Equal(corev1.ConditionTrue))
504 Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
505 })
506 })
507
508 Context("nested serial or parallel", func() {
509 It("should shutdown children recursively", func() {
510 ctx := context.TODO()
511 parallelDuration := 3 * time.Second
512 durationOfSuspend := 10 * time.Second
513 toleratedJitter := 2 * time.Second
514
515 workflow := v1alpha1.Workflow{
516 ObjectMeta: metav1.ObjectMeta{
517 Namespace: ns,
518 GenerateName: "fake-workflow-parallel-",
519 },
520 Spec: v1alpha1.WorkflowSpec{
521 Entry: "entry-parallel",
522 Templates: []v1alpha1.Template{{
523 Name: "entry-parallel",
524 Type: v1alpha1.TypeParallel,
525 Deadline: pointer.StringPtr(parallelDuration.String()),
526 Children: []string{
527 "parallel-level-1",
528 },
529 }, {
530 Name: "parallel-level-1",
531 Type: v1alpha1.TypeParallel,
532 Children: []string{
533 "parallel-level-2",
534 },
535 }, {
536 Name: "parallel-level-2",
537 Type: v1alpha1.TypeParallel,
538 Children: []string{
539 "suspend-task",
540 },
541 }, {
542 Name: "suspend-task",
543 Type: v1alpha1.TypeSuspend,
544 Deadline: pointer.StringPtr(durationOfSuspend.String()),
545 }},
546 },
547 Status: v1alpha1.WorkflowStatus{},
548 }
549
550 By("create workflow with parallel entry")
551 Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
552
553 By("all the node should be created")
554 parallelLevel1NodeName := ""
555 parallelLevel2NodeName := ""
556 suspendTaskNodeName := ""
557 Eventually(func() bool {
558 workflowNodes := v1alpha1.WorkflowNodeList{}
559 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
560 for _, item := range workflowNodes.Items {
561 if strings.HasPrefix(item.Name, "parallel-level-1") {
562 parallelLevel1NodeName = item.Name
563 return true
564 }
565 }
566 return false
567 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
568 Eventually(func() bool {
569 workflowNodes := v1alpha1.WorkflowNodeList{}
570 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
571 for _, item := range workflowNodes.Items {
572 if strings.HasPrefix(item.Name, "parallel-level-2") {
573 parallelLevel2NodeName = item.Name
574 return true
575 }
576 }
577 return false
578 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
579 Eventually(func() bool {
580 workflowNodes := v1alpha1.WorkflowNodeList{}
581 Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
582 for _, item := range workflowNodes.Items {
583 if strings.HasPrefix(item.Name, "suspend-task") {
584 suspendTaskNodeName = item.Name
585 return true
586 }
587 }
588 return false
589 }, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
590
591 Expect(parallelLevel1NodeName).NotTo(BeEmpty())
592 Expect(parallelLevel2NodeName).NotTo(BeEmpty())
593 Expect(suspendTaskNodeName).NotTo(BeEmpty())
594
595 By("parallel level 1, parallel level 2 and suspend task should be DeadlineExceed by parent")
596 for _, nodeName := range []string{parallelLevel1NodeName, parallelLevel2NodeName, suspendTaskNodeName} {
597 Eventually(func() bool {
598 taskNode := v1alpha1.WorkflowNode{}
599 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: nodeName}, &taskNode)).To(Succeed())
600 condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
601 if condition == nil {
602 return false
603 }
604 if condition.Status != corev1.ConditionTrue {
605 return false
606 }
607 if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
608 return false
609 }
610 return true
611 }, parallelDuration+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
612 }
613
614 By("entry parallel should also be DeadlineExceed by itself")
615 updateWorkflow := v1alpha1.Workflow{}
616 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: workflow.Name}, &updateWorkflow)).To(Succeed())
617 entryNodeName := updateWorkflow.Status.EntryNode
618 Expect(entryNodeName).NotTo(BeNil())
619 Expect(*entryNodeName).NotTo(BeEmpty())
620 entryNode := v1alpha1.WorkflowNode{}
621 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: *entryNodeName}, &entryNode)).To(Succeed())
622 condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
623 Expect(condition).NotTo(BeNil())
624 Expect(condition.Status).To(Equal(corev1.ConditionTrue))
625 Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
626
627 })
628 })
629
630 Context("if this node is already in DeadlineExceed because of ParentNodeDeadlineExceed", func() {
631 It("should omit the next coming deadline", func() {
632 ctx := context.TODO()
633 now := time.Now()
634 duration := 5 * time.Second
635 toleratedJitter := 3 * time.Second
636
637 startTime := metav1.NewTime(now)
638 deadline := metav1.NewTime(now.Add(duration))
639
640 By("create one empty podchaos workflow node, with deadline: 3s")
641 node := v1alpha1.WorkflowNode{
642 ObjectMeta: metav1.ObjectMeta{
643 Namespace: ns,
644 GenerateName: "pod-chaos-",
645 },
646 Spec: v1alpha1.WorkflowNodeSpec{
647 WorkflowName: "",
648 Type: v1alpha1.TypePodChaos,
649 StartTime: &startTime,
650 Deadline: &deadline,
651 EmbedChaos: &v1alpha1.EmbedChaos{
652 PodChaos: &v1alpha1.PodChaosSpec{
653 ContainerSelector: v1alpha1.ContainerSelector{
654 PodSelector: v1alpha1.PodSelector{
655 Selector: v1alpha1.PodSelectorSpec{
656 Namespaces: []string{ns},
657 LabelSelectors: map[string]string{
658 "app": "not-actually-exist",
659 },
660 },
661 Mode: v1alpha1.AllPodMode,
662 },
663 ContainerNames: nil,
664 },
665 Action: v1alpha1.PodKillAction,
666 },
667 },
668 },
669 Status: v1alpha1.WorkflowNodeStatus{},
670 }
671 Expect(kubeClient.Create(ctx, &node)).To(Succeed())
672 By("manually set condition ConditionDeadlineExceed to true, because of v1alpha1.ParentNodeDeadlineExceed")
673 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
674 deadlineExceedNode := v1alpha1.WorkflowNode{}
675
676 err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &deadlineExceedNode)
677 if err != nil {
678 return err
679 }
680 deadlineExceedNode.Status.Conditions = []v1alpha1.WorkflowNodeCondition{
681 {
682 Type: v1alpha1.ConditionDeadlineExceed,
683 Status: corev1.ConditionTrue,
684 Reason: v1alpha1.ParentNodeDeadlineExceed,
685 },
686 }
687 err = kubeClient.Status().Update(ctx, &deadlineExceedNode)
688 if err != nil {
689 return err
690 }
691 return nil
692 })
693 Expect(updateError).To(BeNil())
694 By("after 3 seconds, the condition ConditionDeadlineExceed should not be modified")
695 Consistently(func() bool {
696 updatedNode := v1alpha1.WorkflowNode{}
697 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
698
699 condition := GetCondition(updatedNode.Status, v1alpha1.ConditionDeadlineExceed)
700 if condition == nil {
701 return false
702 }
703 if condition.Status != corev1.ConditionTrue {
704 return false
705 }
706 if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
707 return false
708 }
709 return true
710 },
711 duration+toleratedJitter, time.Second,
712 ).Should(BeTrue())
713 })
714
715 It("should NOT omit the next coming deadline otherwise", func() {
716 ctx := context.TODO()
717 now := time.Now()
718 duration := 5 * time.Second
719 toleratedJitter := 3 * time.Second
720
721 startTime := metav1.NewTime(now)
722 deadline := metav1.NewTime(now.Add(duration))
723
724 By("create one empty podchaos workflow node, with deadline: 3s")
725 node := v1alpha1.WorkflowNode{
726 ObjectMeta: metav1.ObjectMeta{
727 Namespace: ns,
728 GenerateName: "pod-chaos-",
729 },
730 Spec: v1alpha1.WorkflowNodeSpec{
731 WorkflowName: "",
732 Type: v1alpha1.TypePodChaos,
733 StartTime: &startTime,
734 Deadline: &deadline,
735 EmbedChaos: &v1alpha1.EmbedChaos{
736 PodChaos: &v1alpha1.PodChaosSpec{
737 ContainerSelector: v1alpha1.ContainerSelector{
738 PodSelector: v1alpha1.PodSelector{
739 Selector: v1alpha1.PodSelectorSpec{
740 Namespaces: []string{ns},
741 LabelSelectors: map[string]string{
742 "app": "not-actually-exist",
743 },
744 },
745 Mode: v1alpha1.AllPodMode,
746 },
747 ContainerNames: nil,
748 },
749 Action: v1alpha1.PodKillAction,
750 },
751 },
752 },
753 Status: v1alpha1.WorkflowNodeStatus{},
754 }
755 Expect(kubeClient.Create(ctx, &node)).To(Succeed())
756 By("manually set condition ConditionDeadlineExceed to true, but NOT caused by v1alpha1.ParentNodeDeadlineExceed")
757 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
758 deadlineExceedNode := v1alpha1.WorkflowNode{}
759
760 err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &deadlineExceedNode)
761 if err != nil {
762 return err
763 }
764 deadlineExceedNode.Status.Conditions = []v1alpha1.WorkflowNodeCondition{
765 {
766 Type: v1alpha1.ConditionDeadlineExceed,
767 Status: corev1.ConditionTrue,
768 Reason: v1alpha1.NodeDeadlineExceed,
769 },
770 }
771 err = kubeClient.Status().Update(ctx, &deadlineExceedNode)
772 if err != nil {
773 return err
774 }
775 return nil
776 })
777 Expect(updateError).To(BeNil())
778 By("condition ConditionDeadlineExceed should be corrected soon")
779 Eventually(func() bool {
780 updatedNode := v1alpha1.WorkflowNode{}
781 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
782 return ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionFalse)
783 },
784 toleratedJitter,
785 time.Second)
786 By("after 5 seconds, the condition ConditionDeadlineExceed should not be modified, caused by NodeDeadlineExceed itself")
787 Eventually(func() bool {
788 updatedNode := v1alpha1.WorkflowNode{}
789 Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
790
791 condition := GetCondition(updatedNode.Status, v1alpha1.ConditionDeadlineExceed)
792 if condition == nil {
793 return false
794 }
795 if condition.Status != corev1.ConditionTrue {
796 return false
797 }
798 if condition.Reason != v1alpha1.NodeDeadlineExceed {
799 return false
800 }
801 return true
802 },
803 duration+toleratedJitter, time.Second,
804 ).Should(BeTrue())
805 })
806 })
807 })
808 })
809