...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/deadline_reconciler_test.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package controllers
    15  
    16  import (
    17  	"context"
    18  	"fmt"
    19  	"strings"
    20  	"time"
    21  
    22  	. "github.com/onsi/ginkgo"
    23  	. "github.com/onsi/gomega"
    24  	corev1 "k8s.io/api/core/v1"
    25  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    26  	"k8s.io/apimachinery/pkg/types"
    27  	"k8s.io/client-go/util/retry"
    28  	"k8s.io/utils/pointer"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  
    31  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    32  )
    33  
    34  // integration tests
    35  var _ = Describe("Workflow", func() {
    36  	var ns string
    37  	BeforeEach(func() {
    38  		ctx := context.TODO()
    39  		newNs := corev1.Namespace{
    40  			ObjectMeta: metav1.ObjectMeta{
    41  				GenerateName: "chaos-mesh-",
    42  			},
    43  			Spec: corev1.NamespaceSpec{},
    44  		}
    45  		Expect(kubeClient.Create(ctx, &newNs)).To(Succeed())
    46  		ns = newNs.Name
    47  		By(fmt.Sprintf("create new namespace %s", ns))
    48  	})
    49  
    50  	AfterEach(func() {
    51  		ctx := context.TODO()
    52  		nsToDelete := corev1.Namespace{}
    53  		Expect(kubeClient.Get(ctx, types.NamespacedName{Name: ns}, &nsToDelete)).To(Succeed())
    54  		Expect(kubeClient.Delete(ctx, &nsToDelete)).To(Succeed())
    55  		By(fmt.Sprintf("cleanup namespace %s", ns))
    56  	})
    57  
    58  	Context("with deadline", func() {
    59  		Context("on suspend", func() {
    60  			It("should do nothing except waiting", func() {
    61  				ctx := context.TODO()
    62  				now := time.Now()
    63  				duration := 5 * time.Second
    64  				toleratedJitter := 3 * time.Second
    65  
    66  				By("create simple suspend node")
    67  				startTime := metav1.NewTime(now)
    68  				deadline := metav1.NewTime(now.Add(duration))
    69  				node := v1alpha1.WorkflowNode{
    70  					ObjectMeta: metav1.ObjectMeta{
    71  						Namespace:    ns,
    72  						GenerateName: "suspend-node-",
    73  					},
    74  					Spec: v1alpha1.WorkflowNodeSpec{
    75  						WorkflowName: "",
    76  						Type:         v1alpha1.TypeSuspend,
    77  						StartTime:    &startTime,
    78  						Deadline:     &deadline,
    79  					},
    80  				}
    81  				Expect(kubeClient.Create(ctx, &node)).To(Succeed())
    82  
    83  				// TODO: no other side effects
    84  
    85  				By("assert this node is finished")
    86  				Eventually(func() bool {
    87  					updatedNode := v1alpha1.WorkflowNode{}
    88  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
    89  					return ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue)
    90  				}, duration+toleratedJitter, time.Second).Should(BeTrue())
    91  
    92  				Eventually(func() bool {
    93  					updatedNode := v1alpha1.WorkflowNode{}
    94  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
    95  					return WorkflowNodeFinished(updatedNode.Status)
    96  				}, toleratedJitter, time.Second).Should(BeTrue())
    97  			})
    98  		})
    99  
   100  		Context("on chaos node with chaos", func() {
   101  			It("should delete chaos as soon as deadline exceed", func() {
   102  				ctx := context.TODO()
   103  				now := time.Now()
   104  				duration := 5 * time.Second
   105  				toleratedJitter := 3 * time.Second
   106  
   107  				By("create simple chaos node with pod chaos")
   108  				startTime := metav1.NewTime(now)
   109  				deadline := metav1.NewTime(now.Add(duration))
   110  				node := v1alpha1.WorkflowNode{
   111  					ObjectMeta: metav1.ObjectMeta{
   112  						Namespace:    ns,
   113  						GenerateName: "pod-chaos-",
   114  					},
   115  					Spec: v1alpha1.WorkflowNodeSpec{
   116  						WorkflowName: "",
   117  						Type:         v1alpha1.TypePodChaos,
   118  						StartTime:    &startTime,
   119  						Deadline:     &deadline,
   120  						EmbedChaos: &v1alpha1.EmbedChaos{
   121  							PodChaos: &v1alpha1.PodChaosSpec{
   122  								ContainerSelector: v1alpha1.ContainerSelector{
   123  									PodSelector: v1alpha1.PodSelector{
   124  										Selector: v1alpha1.PodSelectorSpec{
   125  											Namespaces: []string{ns},
   126  											LabelSelectors: map[string]string{
   127  												"app": "not-actually-exist",
   128  											},
   129  										},
   130  										Mode: v1alpha1.AllPodMode,
   131  									},
   132  									ContainerNames: nil,
   133  								},
   134  								Action: v1alpha1.PodKillAction,
   135  							},
   136  						},
   137  					},
   138  				}
   139  				Expect(kubeClient.Create(ctx, &node)).To(Succeed())
   140  
   141  				By("assert that pod chaos CR is created")
   142  				Eventually(func() bool {
   143  					updatedNode := v1alpha1.WorkflowNode{}
   144  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
   145  					if !ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionChaosInjected, corev1.ConditionTrue) {
   146  						return false
   147  					}
   148  					chaos := v1alpha1.PodChaos{}
   149  					err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: updatedNode.Status.ChaosResource.Name}, &chaos)
   150  					return err == nil
   151  				}, toleratedJitter, time.Second).Should(BeTrue())
   152  
   153  				By("assert that pod chaos should be purged")
   154  				Eventually(func() bool {
   155  					podChaosList := v1alpha1.PodChaosList{}
   156  					Expect(kubeClient.List(ctx, &podChaosList, &client.ListOptions{Namespace: ns})).To(Succeed())
   157  					return len(podChaosList.Items) == 0
   158  				}, duration+toleratedJitter, time.Second).Should(BeTrue())
   159  			})
   160  		})
   161  
   162  		Context("on chaos node with schedule", func() {
   163  			It("should delete schedule as soon as deadline exceed", func() {
   164  				ctx := context.TODO()
   165  				now := time.Now()
   166  				duration := 5 * time.Second
   167  				toleratedJitter := 3 * time.Second
   168  
   169  				By("create simple chaos node with pod chaos")
   170  				startTime := metav1.NewTime(now)
   171  				deadline := metav1.NewTime(now.Add(duration))
   172  				node := v1alpha1.WorkflowNode{
   173  					ObjectMeta: metav1.ObjectMeta{
   174  						Namespace:    ns,
   175  						GenerateName: "pod-chaos-",
   176  					},
   177  					Spec: v1alpha1.WorkflowNodeSpec{
   178  						WorkflowName: "",
   179  						Type:         v1alpha1.TypeSchedule,
   180  						StartTime:    &startTime,
   181  						Deadline:     &deadline,
   182  						Schedule: &v1alpha1.ScheduleSpec{
   183  							Schedule:                "@every 1s",
   184  							StartingDeadlineSeconds: nil,
   185  							ConcurrencyPolicy:       v1alpha1.AllowConcurrent,
   186  							HistoryLimit:            5,
   187  							Type:                    v1alpha1.ScheduleTypePodChaos,
   188  							ScheduleItem: v1alpha1.ScheduleItem{
   189  								EmbedChaos: v1alpha1.EmbedChaos{
   190  									PodChaos: &v1alpha1.PodChaosSpec{
   191  										ContainerSelector: v1alpha1.ContainerSelector{
   192  											PodSelector: v1alpha1.PodSelector{
   193  												Selector: v1alpha1.PodSelectorSpec{
   194  													Namespaces: []string{ns},
   195  													LabelSelectors: map[string]string{
   196  														"app": "not-actually-exist",
   197  													},
   198  												},
   199  												Mode: v1alpha1.AllPodMode,
   200  											},
   201  											ContainerNames: nil,
   202  										},
   203  										Action: v1alpha1.PodKillAction,
   204  									},
   205  								},
   206  							},
   207  						},
   208  					},
   209  				}
   210  				Expect(kubeClient.Create(ctx, &node)).To(Succeed())
   211  
   212  				By("assert that schedule CR is created")
   213  				Eventually(func() bool {
   214  					updatedNode := v1alpha1.WorkflowNode{}
   215  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
   216  					if !ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionChaosInjected, corev1.ConditionTrue) {
   217  						return false
   218  					}
   219  					schedule := v1alpha1.Schedule{}
   220  					err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: updatedNode.Status.ChaosResource.Name}, &schedule)
   221  					return err == nil
   222  				}, toleratedJitter, time.Second).Should(BeTrue())
   223  
   224  				By("assert that schedule should be purged")
   225  				Eventually(func() bool {
   226  					scheduleList := v1alpha1.ScheduleList{}
   227  					Expect(kubeClient.List(ctx, &scheduleList, &client.ListOptions{Namespace: ns})).To(Succeed())
   228  					return len(scheduleList.Items) == 0
   229  				}, duration+toleratedJitter, time.Second).Should(BeTrue())
   230  			})
   231  		})
   232  
   233  		Context("on serial", func() {
   234  			It("should shutdown all children of serial", func() {
   235  				ctx := context.TODO()
   236  				serialDuration := 3 * time.Second
   237  				durationOfSubTask1 := time.Second
   238  				durationOfSubTask2 := 5 * time.Second
   239  				durationOfSubTask3 := 5 * time.Second
   240  				toleratedJitter := 2 * time.Second
   241  
   242  				maxConsisting := durationOfSubTask1 + durationOfSubTask2 + durationOfSubTask3
   243  
   244  				workflow := v1alpha1.Workflow{
   245  					ObjectMeta: metav1.ObjectMeta{
   246  						Namespace:    ns,
   247  						GenerateName: "fake-workflow-serial-",
   248  					},
   249  					Spec: v1alpha1.WorkflowSpec{
   250  						Entry: "entry-serial",
   251  						Templates: []v1alpha1.Template{{
   252  							Name:     "entry-serial",
   253  							Type:     v1alpha1.TypeSerial,
   254  							Deadline: pointer.StringPtr(serialDuration.String()),
   255  							Children: []string{
   256  								"serial-task-1",
   257  								"serial-task-2",
   258  								"serial-task-3",
   259  							},
   260  						}, {
   261  							Name:     "serial-task-1",
   262  							Type:     v1alpha1.TypeSuspend,
   263  							Deadline: pointer.StringPtr(durationOfSubTask1.String()),
   264  						}, {
   265  							Name:     "serial-task-2",
   266  							Type:     v1alpha1.TypeSuspend,
   267  							Deadline: pointer.StringPtr(durationOfSubTask2.String()),
   268  						}, {
   269  							Name:     "serial-task-3",
   270  							Type:     v1alpha1.TypeSuspend,
   271  							Deadline: pointer.StringPtr(durationOfSubTask3.String()),
   272  						}},
   273  					},
   274  					Status: v1alpha1.WorkflowStatus{},
   275  				}
   276  
   277  				By("create workflow with serial entry")
   278  				Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
   279  
   280  				By("task 1 should be created")
   281  				task1Name := ""
   282  				Eventually(func() bool {
   283  					workflowNodes := v1alpha1.WorkflowNodeList{}
   284  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   285  					for _, item := range workflowNodes.Items {
   286  						if strings.HasPrefix(item.Name, "serial-task-1") {
   287  							task1Name = item.Name
   288  							return true
   289  						}
   290  					}
   291  					return false
   292  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   293  
   294  				Expect(task1Name).NotTo(BeEmpty())
   295  
   296  				By("task 1 will be DeadlineExceed by itself")
   297  				Eventually(func() bool {
   298  					taskNode1 := v1alpha1.WorkflowNode{}
   299  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task1Name}, &taskNode1)).To(Succeed())
   300  					condition := GetCondition(taskNode1.Status, v1alpha1.ConditionDeadlineExceed)
   301  					if condition == nil {
   302  						return false
   303  					}
   304  					if condition.Status != corev1.ConditionTrue {
   305  						return false
   306  					}
   307  					if condition.Reason != v1alpha1.NodeDeadlineExceed {
   308  						return false
   309  					}
   310  					return true
   311  				}, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   312  
   313  				By("task 2 should be created")
   314  				task2Name := ""
   315  				Eventually(func() bool {
   316  					workflowNodes := v1alpha1.WorkflowNodeList{}
   317  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   318  					for _, item := range workflowNodes.Items {
   319  						if strings.HasPrefix(item.Name, "serial-task-2") {
   320  							task2Name = item.Name
   321  							return true
   322  						}
   323  					}
   324  					return false
   325  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   326  				Expect(task2Name).NotTo(BeEmpty())
   327  
   328  				By("task 2 should be DeadlineExceed by parent")
   329  				taskNode2 := v1alpha1.WorkflowNode{}
   330  				Eventually(func() bool {
   331  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task2Name}, &taskNode2)).To(Succeed())
   332  					condition := GetCondition(taskNode2.Status, v1alpha1.ConditionDeadlineExceed)
   333  					if condition == nil {
   334  						return false
   335  					}
   336  					if condition.Status != corev1.ConditionTrue {
   337  						return false
   338  					}
   339  					if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
   340  						return false
   341  					}
   342  					return true
   343  				}, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   344  
   345  				By("entry serial should also be DeadlineExceed by itself")
   346  				entryNode := v1alpha1.WorkflowNode{}
   347  				entryNodeName := taskNode2.Labels[v1alpha1.LabelControlledBy]
   348  				Expect(entryNodeName).NotTo(BeEmpty())
   349  				Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: entryNodeName}, &entryNode)).To(Succeed())
   350  				condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
   351  				Expect(condition).NotTo(BeNil())
   352  				Expect(condition.Status).To(Equal(corev1.ConditionTrue))
   353  				Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
   354  
   355  				By("task 3 should NEVER be created")
   356  				Consistently(
   357  					func() bool {
   358  						workflowNodes := v1alpha1.WorkflowNodeList{}
   359  						Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   360  						for _, item := range workflowNodes.Items {
   361  							if strings.HasPrefix(item.Name, "serial-task-3") {
   362  								return false
   363  							}
   364  						}
   365  						return true
   366  					},
   367  					maxConsisting+toleratedJitter, time.Second).Should(BeTrue())
   368  			})
   369  		})
   370  
   371  		Context("on parallel", func() {
   372  			It("should shutdown all children of parallel", func() {
   373  				ctx := context.TODO()
   374  				parallelDuration := 3 * time.Second
   375  				durationOfSubTask1 := time.Second
   376  				durationOfSubTask2 := 5 * time.Second
   377  				durationOfSubTask3 := 5 * time.Second
   378  				toleratedJitter := 2 * time.Second
   379  
   380  				workflow := v1alpha1.Workflow{
   381  					ObjectMeta: metav1.ObjectMeta{
   382  						Namespace:    ns,
   383  						GenerateName: "fake-workflow-parallel-",
   384  					},
   385  					Spec: v1alpha1.WorkflowSpec{
   386  						Entry: "entry-parallel",
   387  						Templates: []v1alpha1.Template{{
   388  							Name:     "entry-parallel",
   389  							Type:     v1alpha1.TypeParallel,
   390  							Deadline: pointer.StringPtr(parallelDuration.String()),
   391  							Children: []string{
   392  								"parallel-task-1",
   393  								"parallel-task-2",
   394  								"parallel-task-3",
   395  							},
   396  						}, {
   397  							Name:     "parallel-task-1",
   398  							Type:     v1alpha1.TypeSuspend,
   399  							Deadline: pointer.StringPtr(durationOfSubTask1.String()),
   400  						}, {
   401  							Name:     "parallel-task-2",
   402  							Type:     v1alpha1.TypeSuspend,
   403  							Deadline: pointer.StringPtr(durationOfSubTask2.String()),
   404  						}, {
   405  							Name:     "parallel-task-3",
   406  							Type:     v1alpha1.TypeSuspend,
   407  							Deadline: pointer.StringPtr(durationOfSubTask3.String()),
   408  						}},
   409  					},
   410  					Status: v1alpha1.WorkflowStatus{},
   411  				}
   412  
   413  				By("create workflow with parallel entry")
   414  				Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
   415  
   416  				By("task 1,task 2,task 3 should be created")
   417  				task1Name := ""
   418  				task2Name := ""
   419  				task3Name := ""
   420  				Eventually(func() bool {
   421  					workflowNodes := v1alpha1.WorkflowNodeList{}
   422  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   423  					for _, item := range workflowNodes.Items {
   424  						if strings.HasPrefix(item.Name, "parallel-task-1") {
   425  							task1Name = item.Name
   426  							return true
   427  						}
   428  					}
   429  					return false
   430  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   431  				Eventually(func() bool {
   432  					workflowNodes := v1alpha1.WorkflowNodeList{}
   433  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   434  					for _, item := range workflowNodes.Items {
   435  						if strings.HasPrefix(item.Name, "parallel-task-2") {
   436  							task2Name = item.Name
   437  							return true
   438  						}
   439  					}
   440  					return false
   441  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   442  				Eventually(func() bool {
   443  					workflowNodes := v1alpha1.WorkflowNodeList{}
   444  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   445  					for _, item := range workflowNodes.Items {
   446  						if strings.HasPrefix(item.Name, "parallel-task-3") {
   447  							task3Name = item.Name
   448  							return true
   449  						}
   450  					}
   451  					return false
   452  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   453  
   454  				Expect(task1Name).NotTo(BeEmpty())
   455  				Expect(task2Name).NotTo(BeEmpty())
   456  				Expect(task3Name).NotTo(BeEmpty())
   457  
   458  				By("task 1 should be DeadlineExceed by itself")
   459  				Eventually(func() bool {
   460  					taskNode := v1alpha1.WorkflowNode{}
   461  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task1Name}, &taskNode)).To(Succeed())
   462  					condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
   463  					if condition == nil {
   464  						return false
   465  					}
   466  					if condition.Status != corev1.ConditionTrue {
   467  						return false
   468  					}
   469  					if condition.Reason != v1alpha1.NodeDeadlineExceed {
   470  						return false
   471  					}
   472  					return true
   473  				}, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   474  
   475  				By("task 2 and task 3 should be DeadlineExceed by parent")
   476  				for _, nodeName := range []string{task2Name, task3Name} {
   477  					Eventually(func() bool {
   478  						taskNode := v1alpha1.WorkflowNode{}
   479  						Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: nodeName}, &taskNode)).To(Succeed())
   480  						condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
   481  						if condition == nil {
   482  							return false
   483  						}
   484  						if condition.Status != corev1.ConditionTrue {
   485  							return false
   486  						}
   487  						if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
   488  							return false
   489  						}
   490  						return true
   491  					}, parallelDuration+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   492  				}
   493  				By("entry parallel should also be DeadlineExceed by itself")
   494  				updateWorkflow := v1alpha1.Workflow{}
   495  				Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: workflow.Name}, &updateWorkflow)).To(Succeed())
   496  				entryNodeName := updateWorkflow.Status.EntryNode
   497  				Expect(entryNodeName).NotTo(BeNil())
   498  				Expect(*entryNodeName).NotTo(BeEmpty())
   499  				entryNode := v1alpha1.WorkflowNode{}
   500  				Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: *entryNodeName}, &entryNode)).To(Succeed())
   501  				condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
   502  				Expect(condition).NotTo(BeNil())
   503  				Expect(condition.Status).To(Equal(corev1.ConditionTrue))
   504  				Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
   505  			})
   506  		})
   507  
   508  		Context("nested serial or parallel", func() {
   509  			It("should shutdown children recursively", func() {
   510  				ctx := context.TODO()
   511  				parallelDuration := 3 * time.Second
   512  				durationOfSuspend := 10 * time.Second
   513  				toleratedJitter := 2 * time.Second
   514  
   515  				workflow := v1alpha1.Workflow{
   516  					ObjectMeta: metav1.ObjectMeta{
   517  						Namespace:    ns,
   518  						GenerateName: "fake-workflow-parallel-",
   519  					},
   520  					Spec: v1alpha1.WorkflowSpec{
   521  						Entry: "entry-parallel",
   522  						Templates: []v1alpha1.Template{{
   523  							Name:     "entry-parallel",
   524  							Type:     v1alpha1.TypeParallel,
   525  							Deadline: pointer.StringPtr(parallelDuration.String()),
   526  							Children: []string{
   527  								"parallel-level-1",
   528  							},
   529  						}, {
   530  							Name: "parallel-level-1",
   531  							Type: v1alpha1.TypeParallel,
   532  							Children: []string{
   533  								"parallel-level-2",
   534  							},
   535  						}, {
   536  							Name: "parallel-level-2",
   537  							Type: v1alpha1.TypeParallel,
   538  							Children: []string{
   539  								"suspend-task",
   540  							},
   541  						}, {
   542  							Name:     "suspend-task",
   543  							Type:     v1alpha1.TypeSuspend,
   544  							Deadline: pointer.StringPtr(durationOfSuspend.String()),
   545  						}},
   546  					},
   547  					Status: v1alpha1.WorkflowStatus{},
   548  				}
   549  
   550  				By("create workflow with parallel entry")
   551  				Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
   552  
   553  				By("all the node should be created")
   554  				parallelLevel1NodeName := ""
   555  				parallelLevel2NodeName := ""
   556  				suspendTaskNodeName := ""
   557  				Eventually(func() bool {
   558  					workflowNodes := v1alpha1.WorkflowNodeList{}
   559  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   560  					for _, item := range workflowNodes.Items {
   561  						if strings.HasPrefix(item.Name, "parallel-level-1") {
   562  							parallelLevel1NodeName = item.Name
   563  							return true
   564  						}
   565  					}
   566  					return false
   567  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   568  				Eventually(func() bool {
   569  					workflowNodes := v1alpha1.WorkflowNodeList{}
   570  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   571  					for _, item := range workflowNodes.Items {
   572  						if strings.HasPrefix(item.Name, "parallel-level-2") {
   573  							parallelLevel2NodeName = item.Name
   574  							return true
   575  						}
   576  					}
   577  					return false
   578  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   579  				Eventually(func() bool {
   580  					workflowNodes := v1alpha1.WorkflowNodeList{}
   581  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   582  					for _, item := range workflowNodes.Items {
   583  						if strings.HasPrefix(item.Name, "suspend-task") {
   584  							suspendTaskNodeName = item.Name
   585  							return true
   586  						}
   587  					}
   588  					return false
   589  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   590  
   591  				Expect(parallelLevel1NodeName).NotTo(BeEmpty())
   592  				Expect(parallelLevel2NodeName).NotTo(BeEmpty())
   593  				Expect(suspendTaskNodeName).NotTo(BeEmpty())
   594  
   595  				By("parallel level 1, parallel level 2 and suspend task should be DeadlineExceed by parent")
   596  				for _, nodeName := range []string{parallelLevel1NodeName, parallelLevel2NodeName, suspendTaskNodeName} {
   597  					Eventually(func() bool {
   598  						taskNode := v1alpha1.WorkflowNode{}
   599  						Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: nodeName}, &taskNode)).To(Succeed())
   600  						condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
   601  						if condition == nil {
   602  							return false
   603  						}
   604  						if condition.Status != corev1.ConditionTrue {
   605  							return false
   606  						}
   607  						if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
   608  							return false
   609  						}
   610  						return true
   611  					}, parallelDuration+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   612  				}
   613  
   614  				By("entry parallel should also be DeadlineExceed by itself")
   615  				updateWorkflow := v1alpha1.Workflow{}
   616  				Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: workflow.Name}, &updateWorkflow)).To(Succeed())
   617  				entryNodeName := updateWorkflow.Status.EntryNode
   618  				Expect(entryNodeName).NotTo(BeNil())
   619  				Expect(*entryNodeName).NotTo(BeEmpty())
   620  				entryNode := v1alpha1.WorkflowNode{}
   621  				Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: *entryNodeName}, &entryNode)).To(Succeed())
   622  				condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
   623  				Expect(condition).NotTo(BeNil())
   624  				Expect(condition.Status).To(Equal(corev1.ConditionTrue))
   625  				Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
   626  
   627  			})
   628  		})
   629  
   630  		Context("if this node is already in DeadlineExceed because of ParentNodeDeadlineExceed", func() {
   631  			It("should omit the next coming deadline", func() {
   632  				ctx := context.TODO()
   633  				now := time.Now()
   634  				duration := 5 * time.Second
   635  				toleratedJitter := 3 * time.Second
   636  
   637  				startTime := metav1.NewTime(now)
   638  				deadline := metav1.NewTime(now.Add(duration))
   639  
   640  				By("create one empty podchaos workflow node, with deadline: 3s")
   641  				node := v1alpha1.WorkflowNode{
   642  					ObjectMeta: metav1.ObjectMeta{
   643  						Namespace:    ns,
   644  						GenerateName: "pod-chaos-",
   645  					},
   646  					Spec: v1alpha1.WorkflowNodeSpec{
   647  						WorkflowName: "",
   648  						Type:         v1alpha1.TypePodChaos,
   649  						StartTime:    &startTime,
   650  						Deadline:     &deadline,
   651  						EmbedChaos: &v1alpha1.EmbedChaos{
   652  							PodChaos: &v1alpha1.PodChaosSpec{
   653  								ContainerSelector: v1alpha1.ContainerSelector{
   654  									PodSelector: v1alpha1.PodSelector{
   655  										Selector: v1alpha1.PodSelectorSpec{
   656  											Namespaces: []string{ns},
   657  											LabelSelectors: map[string]string{
   658  												"app": "not-actually-exist",
   659  											},
   660  										},
   661  										Mode: v1alpha1.AllPodMode,
   662  									},
   663  									ContainerNames: nil,
   664  								},
   665  								Action: v1alpha1.PodKillAction,
   666  							},
   667  						},
   668  					},
   669  					Status: v1alpha1.WorkflowNodeStatus{},
   670  				}
   671  				Expect(kubeClient.Create(ctx, &node)).To(Succeed())
   672  				By("manually set condition ConditionDeadlineExceed to true, because of v1alpha1.ParentNodeDeadlineExceed")
   673  				updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
   674  					deadlineExceedNode := v1alpha1.WorkflowNode{}
   675  
   676  					err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &deadlineExceedNode)
   677  					if err != nil {
   678  						return err
   679  					}
   680  					deadlineExceedNode.Status.Conditions = []v1alpha1.WorkflowNodeCondition{
   681  						{
   682  							Type:   v1alpha1.ConditionDeadlineExceed,
   683  							Status: corev1.ConditionTrue,
   684  							Reason: v1alpha1.ParentNodeDeadlineExceed,
   685  						},
   686  					}
   687  					err = kubeClient.Status().Update(ctx, &deadlineExceedNode)
   688  					if err != nil {
   689  						return err
   690  					}
   691  					return nil
   692  				})
   693  				Expect(updateError).To(BeNil())
   694  				By("after 3 seconds, the condition ConditionDeadlineExceed should not be modified")
   695  				Consistently(func() bool {
   696  					updatedNode := v1alpha1.WorkflowNode{}
   697  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
   698  
   699  					condition := GetCondition(updatedNode.Status, v1alpha1.ConditionDeadlineExceed)
   700  					if condition == nil {
   701  						return false
   702  					}
   703  					if condition.Status != corev1.ConditionTrue {
   704  						return false
   705  					}
   706  					if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
   707  						return false
   708  					}
   709  					return true
   710  				},
   711  					duration+toleratedJitter, time.Second,
   712  				).Should(BeTrue())
   713  			})
   714  
   715  			It("should NOT omit the next coming deadline otherwise", func() {
   716  				ctx := context.TODO()
   717  				now := time.Now()
   718  				duration := 5 * time.Second
   719  				toleratedJitter := 3 * time.Second
   720  
   721  				startTime := metav1.NewTime(now)
   722  				deadline := metav1.NewTime(now.Add(duration))
   723  
   724  				By("create one empty podchaos workflow node, with deadline: 3s")
   725  				node := v1alpha1.WorkflowNode{
   726  					ObjectMeta: metav1.ObjectMeta{
   727  						Namespace:    ns,
   728  						GenerateName: "pod-chaos-",
   729  					},
   730  					Spec: v1alpha1.WorkflowNodeSpec{
   731  						WorkflowName: "",
   732  						Type:         v1alpha1.TypePodChaos,
   733  						StartTime:    &startTime,
   734  						Deadline:     &deadline,
   735  						EmbedChaos: &v1alpha1.EmbedChaos{
   736  							PodChaos: &v1alpha1.PodChaosSpec{
   737  								ContainerSelector: v1alpha1.ContainerSelector{
   738  									PodSelector: v1alpha1.PodSelector{
   739  										Selector: v1alpha1.PodSelectorSpec{
   740  											Namespaces: []string{ns},
   741  											LabelSelectors: map[string]string{
   742  												"app": "not-actually-exist",
   743  											},
   744  										},
   745  										Mode: v1alpha1.AllPodMode,
   746  									},
   747  									ContainerNames: nil,
   748  								},
   749  								Action: v1alpha1.PodKillAction,
   750  							},
   751  						},
   752  					},
   753  					Status: v1alpha1.WorkflowNodeStatus{},
   754  				}
   755  				Expect(kubeClient.Create(ctx, &node)).To(Succeed())
   756  				By("manually set condition ConditionDeadlineExceed to true, but NOT caused by v1alpha1.ParentNodeDeadlineExceed")
   757  				updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
   758  					deadlineExceedNode := v1alpha1.WorkflowNode{}
   759  
   760  					err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &deadlineExceedNode)
   761  					if err != nil {
   762  						return err
   763  					}
   764  					deadlineExceedNode.Status.Conditions = []v1alpha1.WorkflowNodeCondition{
   765  						{
   766  							Type:   v1alpha1.ConditionDeadlineExceed,
   767  							Status: corev1.ConditionTrue,
   768  							Reason: v1alpha1.NodeDeadlineExceed,
   769  						},
   770  					}
   771  					err = kubeClient.Status().Update(ctx, &deadlineExceedNode)
   772  					if err != nil {
   773  						return err
   774  					}
   775  					return nil
   776  				})
   777  				Expect(updateError).To(BeNil())
   778  				By("condition ConditionDeadlineExceed should be corrected soon")
   779  				Eventually(func() bool {
   780  					updatedNode := v1alpha1.WorkflowNode{}
   781  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
   782  					return ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionFalse)
   783  				},
   784  					toleratedJitter,
   785  					time.Second)
   786  				By("after 5 seconds, the condition ConditionDeadlineExceed should not be modified, caused by NodeDeadlineExceed itself")
   787  				Eventually(func() bool {
   788  					updatedNode := v1alpha1.WorkflowNode{}
   789  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
   790  
   791  					condition := GetCondition(updatedNode.Status, v1alpha1.ConditionDeadlineExceed)
   792  					if condition == nil {
   793  						return false
   794  					}
   795  					if condition.Status != corev1.ConditionTrue {
   796  						return false
   797  					}
   798  					if condition.Reason != v1alpha1.NodeDeadlineExceed {
   799  						return false
   800  					}
   801  					return true
   802  				},
   803  					duration+toleratedJitter, time.Second,
   804  				).Should(BeTrue())
   805  			})
   806  		})
   807  	})
   808  })
   809