...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/deadline_reconciler_test.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"strings"
    22  	"time"
    23  
    24  	. "github.com/onsi/ginkgo/v2"
    25  	. "github.com/onsi/gomega"
    26  	corev1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/types"
    29  	"k8s.io/client-go/util/retry"
    30  	"k8s.io/utils/pointer"
    31  	"sigs.k8s.io/controller-runtime/pkg/client"
    32  
    33  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    34  )
    35  
    36  // integration tests
    37  var _ = Describe("Workflow", func() {
    38  	var ns string
    39  	BeforeEach(func() {
    40  		ctx := context.TODO()
    41  		newNs := corev1.Namespace{
    42  			ObjectMeta: metav1.ObjectMeta{
    43  				GenerateName: "chaos-mesh-",
    44  			},
    45  			Spec: corev1.NamespaceSpec{},
    46  		}
    47  		Expect(kubeClient.Create(ctx, &newNs)).To(Succeed())
    48  		ns = newNs.Name
    49  		By(fmt.Sprintf("create new namespace %s", ns))
    50  	})
    51  
    52  	AfterEach(func() {
    53  		ctx := context.TODO()
    54  		nsToDelete := corev1.Namespace{}
    55  		Expect(kubeClient.Get(ctx, types.NamespacedName{Name: ns}, &nsToDelete)).To(Succeed())
    56  		Expect(kubeClient.Delete(ctx, &nsToDelete)).To(Succeed())
    57  		By(fmt.Sprintf("cleanup namespace %s", ns))
    58  	})
    59  
    60  	Context("with deadline", func() {
    61  		Context("on suspend", func() {
    62  			It("should do nothing except waiting", func() {
    63  				ctx := context.TODO()
    64  				now := time.Now()
    65  				duration := 5 * time.Second
    66  				toleratedJitter := 3 * time.Second
    67  
    68  				By("create simple suspend node")
    69  				startTime := metav1.NewTime(now)
    70  				deadline := metav1.NewTime(now.Add(duration))
    71  				node := v1alpha1.WorkflowNode{
    72  					ObjectMeta: metav1.ObjectMeta{
    73  						Namespace:    ns,
    74  						GenerateName: "suspend-node-",
    75  					},
    76  					Spec: v1alpha1.WorkflowNodeSpec{
    77  						WorkflowName: "",
    78  						Type:         v1alpha1.TypeSuspend,
    79  						StartTime:    &startTime,
    80  						Deadline:     &deadline,
    81  					},
    82  				}
    83  				Expect(kubeClient.Create(ctx, &node)).To(Succeed())
    84  
    85  				// TODO: no other side effects
    86  
    87  				By("assert this node is finished")
    88  				Eventually(func() bool {
    89  					updatedNode := v1alpha1.WorkflowNode{}
    90  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
    91  					return ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue)
    92  				}, duration+toleratedJitter, time.Second).Should(BeTrue())
    93  
    94  				Eventually(func() bool {
    95  					updatedNode := v1alpha1.WorkflowNode{}
    96  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
    97  					return WorkflowNodeFinished(updatedNode.Status)
    98  				}, toleratedJitter, time.Second).Should(BeTrue())
    99  			})
   100  		})
   101  
   102  		Context("on chaos node with chaos", func() {
   103  			It("should delete chaos as soon as deadline exceed", func() {
   104  				ctx := context.TODO()
   105  				now := time.Now()
   106  				duration := 5 * time.Second
   107  				toleratedJitter := 3 * time.Second
   108  
   109  				By("create simple chaos node with pod chaos")
   110  				startTime := metav1.NewTime(now)
   111  				deadline := metav1.NewTime(now.Add(duration))
   112  				node := v1alpha1.WorkflowNode{
   113  					ObjectMeta: metav1.ObjectMeta{
   114  						Namespace:    ns,
   115  						GenerateName: "pod-chaos-",
   116  					},
   117  					Spec: v1alpha1.WorkflowNodeSpec{
   118  						WorkflowName: "",
   119  						Type:         v1alpha1.TypePodChaos,
   120  						StartTime:    &startTime,
   121  						Deadline:     &deadline,
   122  						EmbedChaos: &v1alpha1.EmbedChaos{
   123  							PodChaos: &v1alpha1.PodChaosSpec{
   124  								ContainerSelector: v1alpha1.ContainerSelector{
   125  									PodSelector: v1alpha1.PodSelector{
   126  										Selector: v1alpha1.PodSelectorSpec{
   127  											GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
   128  												Namespaces: []string{ns},
   129  												LabelSelectors: map[string]string{
   130  													"app": "not-actually-exist",
   131  												},
   132  											},
   133  										},
   134  										Mode: v1alpha1.AllMode,
   135  									},
   136  									ContainerNames: nil,
   137  								},
   138  								Action: v1alpha1.PodKillAction,
   139  							},
   140  						},
   141  					},
   142  				}
   143  				Expect(kubeClient.Create(ctx, &node)).To(Succeed())
   144  
   145  				By("assert that pod chaos CR is created")
   146  				Eventually(func() bool {
   147  					updatedNode := v1alpha1.WorkflowNode{}
   148  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
   149  					if !ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionChaosInjected, corev1.ConditionTrue) {
   150  						return false
   151  					}
   152  					chaos := v1alpha1.PodChaos{}
   153  					err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: updatedNode.Status.ChaosResource.Name}, &chaos)
   154  					return err == nil
   155  				}, toleratedJitter, time.Second).Should(BeTrue())
   156  
   157  				By("assert that pod chaos should be purged")
   158  				Eventually(func() bool {
   159  					podChaosList := v1alpha1.PodChaosList{}
   160  					Expect(kubeClient.List(ctx, &podChaosList, &client.ListOptions{Namespace: ns})).To(Succeed())
   161  					return len(podChaosList.Items) == 0
   162  				}, duration+toleratedJitter, time.Second).Should(BeTrue())
   163  			})
   164  		})
   165  
   166  		Context("on chaos node with schedule", func() {
   167  			It("should delete schedule as soon as deadline exceed", func() {
   168  				ctx := context.TODO()
   169  				now := time.Now()
   170  				duration := 5 * time.Second
   171  				toleratedJitter := 3 * time.Second
   172  
   173  				By("create simple chaos node with pod chaos")
   174  				startTime := metav1.NewTime(now)
   175  				deadline := metav1.NewTime(now.Add(duration))
   176  				node := v1alpha1.WorkflowNode{
   177  					ObjectMeta: metav1.ObjectMeta{
   178  						Namespace:    ns,
   179  						GenerateName: "pod-chaos-",
   180  					},
   181  					Spec: v1alpha1.WorkflowNodeSpec{
   182  						WorkflowName: "",
   183  						Type:         v1alpha1.TypeSchedule,
   184  						StartTime:    &startTime,
   185  						Deadline:     &deadline,
   186  						Schedule: &v1alpha1.ScheduleSpec{
   187  							Schedule:                "@every 1s",
   188  							StartingDeadlineSeconds: nil,
   189  							ConcurrencyPolicy:       v1alpha1.AllowConcurrent,
   190  							HistoryLimit:            5,
   191  							Type:                    v1alpha1.ScheduleTypePodChaos,
   192  							ScheduleItem: v1alpha1.ScheduleItem{
   193  								EmbedChaos: v1alpha1.EmbedChaos{
   194  									PodChaos: &v1alpha1.PodChaosSpec{
   195  										ContainerSelector: v1alpha1.ContainerSelector{
   196  											PodSelector: v1alpha1.PodSelector{
   197  												Selector: v1alpha1.PodSelectorSpec{
   198  													GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
   199  														Namespaces: []string{ns},
   200  														LabelSelectors: map[string]string{
   201  															"app": "not-actually-exist",
   202  														},
   203  													},
   204  												},
   205  												Mode: v1alpha1.AllMode,
   206  											},
   207  											ContainerNames: nil,
   208  										},
   209  										Action: v1alpha1.PodKillAction,
   210  									},
   211  								},
   212  							},
   213  						},
   214  					},
   215  				}
   216  				Expect(kubeClient.Create(ctx, &node)).To(Succeed())
   217  
   218  				By("assert that schedule CR is created")
   219  				Eventually(func() bool {
   220  					updatedNode := v1alpha1.WorkflowNode{}
   221  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
   222  					if !ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionChaosInjected, corev1.ConditionTrue) {
   223  						return false
   224  					}
   225  					schedule := v1alpha1.Schedule{}
   226  					err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: updatedNode.Status.ChaosResource.Name}, &schedule)
   227  					return err == nil
   228  				}, toleratedJitter, time.Second).Should(BeTrue())
   229  
   230  				By("assert that schedule should be purged")
   231  				Eventually(func() bool {
   232  					scheduleList := v1alpha1.ScheduleList{}
   233  					Expect(kubeClient.List(ctx, &scheduleList, &client.ListOptions{Namespace: ns})).To(Succeed())
   234  					return len(scheduleList.Items) == 0
   235  				}, duration+toleratedJitter, time.Second).Should(BeTrue())
   236  			})
   237  		})
   238  
   239  		Context("on serial", func() {
   240  			It("should shutdown all children of serial", func() {
   241  				ctx := context.TODO()
   242  				serialDuration := 3 * time.Second
   243  				durationOfSubTask1 := time.Second
   244  				durationOfSubTask2 := 5 * time.Second
   245  				durationOfSubTask3 := 5 * time.Second
   246  				toleratedJitter := 2 * time.Second
   247  
   248  				maxConsisting := durationOfSubTask1 + durationOfSubTask2 + durationOfSubTask3
   249  
   250  				workflow := v1alpha1.Workflow{
   251  					ObjectMeta: metav1.ObjectMeta{
   252  						Namespace:    ns,
   253  						GenerateName: "fake-workflow-serial-",
   254  					},
   255  					Spec: v1alpha1.WorkflowSpec{
   256  						Entry: "entry-serial",
   257  						Templates: []v1alpha1.Template{{
   258  							Name:     "entry-serial",
   259  							Type:     v1alpha1.TypeSerial,
   260  							Deadline: pointer.StringPtr(serialDuration.String()),
   261  							Children: []string{
   262  								"serial-task-1",
   263  								"serial-task-2",
   264  								"serial-task-3",
   265  							},
   266  						}, {
   267  							Name:     "serial-task-1",
   268  							Type:     v1alpha1.TypeSuspend,
   269  							Deadline: pointer.StringPtr(durationOfSubTask1.String()),
   270  						}, {
   271  							Name:     "serial-task-2",
   272  							Type:     v1alpha1.TypeSuspend,
   273  							Deadline: pointer.StringPtr(durationOfSubTask2.String()),
   274  						}, {
   275  							Name:     "serial-task-3",
   276  							Type:     v1alpha1.TypeSuspend,
   277  							Deadline: pointer.StringPtr(durationOfSubTask3.String()),
   278  						}},
   279  					},
   280  					Status: v1alpha1.WorkflowStatus{},
   281  				}
   282  
   283  				By("create workflow with serial entry")
   284  				Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
   285  
   286  				By("task 1 should be created")
   287  				task1Name := ""
   288  				Eventually(func() bool {
   289  					workflowNodes := v1alpha1.WorkflowNodeList{}
   290  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   291  					for _, item := range workflowNodes.Items {
   292  						if strings.HasPrefix(item.Name, "serial-task-1") {
   293  							task1Name = item.Name
   294  							return true
   295  						}
   296  					}
   297  					return false
   298  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   299  
   300  				Expect(task1Name).NotTo(BeEmpty())
   301  
   302  				By("task 1 will be DeadlineExceed by itself")
   303  				Eventually(func() bool {
   304  					taskNode1 := v1alpha1.WorkflowNode{}
   305  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task1Name}, &taskNode1)).To(Succeed())
   306  					condition := GetCondition(taskNode1.Status, v1alpha1.ConditionDeadlineExceed)
   307  					if condition == nil {
   308  						return false
   309  					}
   310  					if condition.Status != corev1.ConditionTrue {
   311  						return false
   312  					}
   313  					if condition.Reason != v1alpha1.NodeDeadlineExceed {
   314  						return false
   315  					}
   316  					return true
   317  				}, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   318  
   319  				By("task 2 should be created")
   320  				task2Name := ""
   321  				Eventually(func() bool {
   322  					workflowNodes := v1alpha1.WorkflowNodeList{}
   323  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   324  					for _, item := range workflowNodes.Items {
   325  						if strings.HasPrefix(item.Name, "serial-task-2") {
   326  							task2Name = item.Name
   327  							return true
   328  						}
   329  					}
   330  					return false
   331  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   332  				Expect(task2Name).NotTo(BeEmpty())
   333  
   334  				By("task 2 should be DeadlineExceed by parent")
   335  				taskNode2 := v1alpha1.WorkflowNode{}
   336  				Eventually(func() bool {
   337  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task2Name}, &taskNode2)).To(Succeed())
   338  					condition := GetCondition(taskNode2.Status, v1alpha1.ConditionDeadlineExceed)
   339  					if condition == nil {
   340  						return false
   341  					}
   342  					if condition.Status != corev1.ConditionTrue {
   343  						return false
   344  					}
   345  					if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
   346  						return false
   347  					}
   348  					return true
   349  				}, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   350  
   351  				By("entry serial should also be DeadlineExceed by itself")
   352  				entryNode := v1alpha1.WorkflowNode{}
   353  				entryNodeName := taskNode2.Labels[v1alpha1.LabelControlledBy]
   354  				Expect(entryNodeName).NotTo(BeEmpty())
   355  				Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: entryNodeName}, &entryNode)).To(Succeed())
   356  				condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
   357  				Expect(condition).NotTo(BeNil())
   358  				Expect(condition.Status).To(Equal(corev1.ConditionTrue))
   359  				Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
   360  
   361  				By("task 3 should NEVER be created")
   362  				Consistently(
   363  					func() bool {
   364  						workflowNodes := v1alpha1.WorkflowNodeList{}
   365  						Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   366  						for _, item := range workflowNodes.Items {
   367  							if strings.HasPrefix(item.Name, "serial-task-3") {
   368  								return false
   369  							}
   370  						}
   371  						return true
   372  					},
   373  					maxConsisting+toleratedJitter, time.Second).Should(BeTrue())
   374  			})
   375  		})
   376  
   377  		Context("on parallel", func() {
   378  			It("should shutdown all children of parallel", func() {
   379  				ctx := context.TODO()
   380  				parallelDuration := 3 * time.Second
   381  				durationOfSubTask1 := time.Second
   382  				durationOfSubTask2 := 5 * time.Second
   383  				durationOfSubTask3 := 5 * time.Second
   384  				toleratedJitter := 2 * time.Second
   385  
   386  				workflow := v1alpha1.Workflow{
   387  					ObjectMeta: metav1.ObjectMeta{
   388  						Namespace:    ns,
   389  						GenerateName: "fake-workflow-parallel-",
   390  					},
   391  					Spec: v1alpha1.WorkflowSpec{
   392  						Entry: "entry-parallel",
   393  						Templates: []v1alpha1.Template{{
   394  							Name:     "entry-parallel",
   395  							Type:     v1alpha1.TypeParallel,
   396  							Deadline: pointer.StringPtr(parallelDuration.String()),
   397  							Children: []string{
   398  								"parallel-task-1",
   399  								"parallel-task-2",
   400  								"parallel-task-3",
   401  							},
   402  						}, {
   403  							Name:     "parallel-task-1",
   404  							Type:     v1alpha1.TypeSuspend,
   405  							Deadline: pointer.StringPtr(durationOfSubTask1.String()),
   406  						}, {
   407  							Name:     "parallel-task-2",
   408  							Type:     v1alpha1.TypeSuspend,
   409  							Deadline: pointer.StringPtr(durationOfSubTask2.String()),
   410  						}, {
   411  							Name:     "parallel-task-3",
   412  							Type:     v1alpha1.TypeSuspend,
   413  							Deadline: pointer.StringPtr(durationOfSubTask3.String()),
   414  						}},
   415  					},
   416  					Status: v1alpha1.WorkflowStatus{},
   417  				}
   418  
   419  				By("create workflow with parallel entry")
   420  				Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
   421  
   422  				By("task 1,task 2,task 3 should be created")
   423  				task1Name := ""
   424  				task2Name := ""
   425  				task3Name := ""
   426  				Eventually(func() bool {
   427  					workflowNodes := v1alpha1.WorkflowNodeList{}
   428  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   429  					for _, item := range workflowNodes.Items {
   430  						if strings.HasPrefix(item.Name, "parallel-task-1") {
   431  							task1Name = item.Name
   432  							return true
   433  						}
   434  					}
   435  					return false
   436  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   437  				Eventually(func() bool {
   438  					workflowNodes := v1alpha1.WorkflowNodeList{}
   439  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   440  					for _, item := range workflowNodes.Items {
   441  						if strings.HasPrefix(item.Name, "parallel-task-2") {
   442  							task2Name = item.Name
   443  							return true
   444  						}
   445  					}
   446  					return false
   447  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   448  				Eventually(func() bool {
   449  					workflowNodes := v1alpha1.WorkflowNodeList{}
   450  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   451  					for _, item := range workflowNodes.Items {
   452  						if strings.HasPrefix(item.Name, "parallel-task-3") {
   453  							task3Name = item.Name
   454  							return true
   455  						}
   456  					}
   457  					return false
   458  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   459  
   460  				Expect(task1Name).NotTo(BeEmpty())
   461  				Expect(task2Name).NotTo(BeEmpty())
   462  				Expect(task3Name).NotTo(BeEmpty())
   463  
   464  				By("task 1 should be DeadlineExceed by itself")
   465  				Eventually(func() bool {
   466  					taskNode := v1alpha1.WorkflowNode{}
   467  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: task1Name}, &taskNode)).To(Succeed())
   468  					condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
   469  					if condition == nil {
   470  						return false
   471  					}
   472  					if condition.Status != corev1.ConditionTrue {
   473  						return false
   474  					}
   475  					if condition.Reason != v1alpha1.NodeDeadlineExceed {
   476  						return false
   477  					}
   478  					return true
   479  				}, durationOfSubTask1+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   480  
   481  				By("task 2 and task 3 should be DeadlineExceed by parent")
   482  				for _, nodeName := range []string{task2Name, task3Name} {
   483  					Eventually(func() bool {
   484  						taskNode := v1alpha1.WorkflowNode{}
   485  						Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: nodeName}, &taskNode)).To(Succeed())
   486  						condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
   487  						if condition == nil {
   488  							return false
   489  						}
   490  						if condition.Status != corev1.ConditionTrue {
   491  							return false
   492  						}
   493  						if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
   494  							return false
   495  						}
   496  						return true
   497  					}, parallelDuration+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   498  				}
   499  				By("entry parallel should also be DeadlineExceed by itself")
   500  				updateWorkflow := v1alpha1.Workflow{}
   501  				Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: workflow.Name}, &updateWorkflow)).To(Succeed())
   502  				entryNodeName := updateWorkflow.Status.EntryNode
   503  				Expect(entryNodeName).NotTo(BeNil())
   504  				Expect(*entryNodeName).NotTo(BeEmpty())
   505  				entryNode := v1alpha1.WorkflowNode{}
   506  				Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: *entryNodeName}, &entryNode)).To(Succeed())
   507  				condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
   508  				Expect(condition).NotTo(BeNil())
   509  				Expect(condition.Status).To(Equal(corev1.ConditionTrue))
   510  				Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
   511  			})
   512  		})
   513  
   514  		Context("nested serial or parallel", func() {
   515  			It("should shutdown children recursively", func() {
   516  				ctx := context.TODO()
   517  				parallelDuration := 3 * time.Second
   518  				durationOfSuspend := 10 * time.Second
   519  				toleratedJitter := 2 * time.Second
   520  
   521  				workflow := v1alpha1.Workflow{
   522  					ObjectMeta: metav1.ObjectMeta{
   523  						Namespace:    ns,
   524  						GenerateName: "fake-workflow-parallel-",
   525  					},
   526  					Spec: v1alpha1.WorkflowSpec{
   527  						Entry: "entry-parallel",
   528  						Templates: []v1alpha1.Template{{
   529  							Name:     "entry-parallel",
   530  							Type:     v1alpha1.TypeParallel,
   531  							Deadline: pointer.StringPtr(parallelDuration.String()),
   532  							Children: []string{
   533  								"parallel-level-1",
   534  							},
   535  						}, {
   536  							Name: "parallel-level-1",
   537  							Type: v1alpha1.TypeParallel,
   538  							Children: []string{
   539  								"parallel-level-2",
   540  							},
   541  						}, {
   542  							Name: "parallel-level-2",
   543  							Type: v1alpha1.TypeParallel,
   544  							Children: []string{
   545  								"suspend-task",
   546  							},
   547  						}, {
   548  							Name:     "suspend-task",
   549  							Type:     v1alpha1.TypeSuspend,
   550  							Deadline: pointer.StringPtr(durationOfSuspend.String()),
   551  						}},
   552  					},
   553  					Status: v1alpha1.WorkflowStatus{},
   554  				}
   555  
   556  				By("create workflow with parallel entry")
   557  				Expect(kubeClient.Create(ctx, &workflow)).To(Succeed())
   558  
   559  				By("all the node should be created")
   560  				parallelLevel1NodeName := ""
   561  				parallelLevel2NodeName := ""
   562  				suspendTaskNodeName := ""
   563  				Eventually(func() bool {
   564  					workflowNodes := v1alpha1.WorkflowNodeList{}
   565  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   566  					for _, item := range workflowNodes.Items {
   567  						if strings.HasPrefix(item.Name, "parallel-level-1") {
   568  							parallelLevel1NodeName = item.Name
   569  							return true
   570  						}
   571  					}
   572  					return false
   573  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   574  				Eventually(func() bool {
   575  					workflowNodes := v1alpha1.WorkflowNodeList{}
   576  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   577  					for _, item := range workflowNodes.Items {
   578  						if strings.HasPrefix(item.Name, "parallel-level-2") {
   579  							parallelLevel2NodeName = item.Name
   580  							return true
   581  						}
   582  					}
   583  					return false
   584  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   585  				Eventually(func() bool {
   586  					workflowNodes := v1alpha1.WorkflowNodeList{}
   587  					Expect(kubeClient.List(ctx, &workflowNodes)).To(Succeed())
   588  					for _, item := range workflowNodes.Items {
   589  						if strings.HasPrefix(item.Name, "suspend-task") {
   590  							suspendTaskNodeName = item.Name
   591  							return true
   592  						}
   593  					}
   594  					return false
   595  				}, toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   596  
   597  				Expect(parallelLevel1NodeName).NotTo(BeEmpty())
   598  				Expect(parallelLevel2NodeName).NotTo(BeEmpty())
   599  				Expect(suspendTaskNodeName).NotTo(BeEmpty())
   600  
   601  				By("parallel level 1, parallel level 2 and suspend task should be DeadlineExceed by parent")
   602  				for _, nodeName := range []string{parallelLevel1NodeName, parallelLevel2NodeName, suspendTaskNodeName} {
   603  					Eventually(func() bool {
   604  						taskNode := v1alpha1.WorkflowNode{}
   605  						Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: nodeName}, &taskNode)).To(Succeed())
   606  						condition := GetCondition(taskNode.Status, v1alpha1.ConditionDeadlineExceed)
   607  						if condition == nil {
   608  							return false
   609  						}
   610  						if condition.Status != corev1.ConditionTrue {
   611  							return false
   612  						}
   613  						if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
   614  							return false
   615  						}
   616  						return true
   617  					}, parallelDuration+toleratedJitter, 200*time.Millisecond).Should(BeTrue())
   618  				}
   619  
   620  				By("entry parallel should also be DeadlineExceed by itself")
   621  				updateWorkflow := v1alpha1.Workflow{}
   622  				Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: workflow.Name}, &updateWorkflow)).To(Succeed())
   623  				entryNodeName := updateWorkflow.Status.EntryNode
   624  				Expect(entryNodeName).NotTo(BeNil())
   625  				Expect(*entryNodeName).NotTo(BeEmpty())
   626  				entryNode := v1alpha1.WorkflowNode{}
   627  				Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: *entryNodeName}, &entryNode)).To(Succeed())
   628  				condition := GetCondition(entryNode.Status, v1alpha1.ConditionDeadlineExceed)
   629  				Expect(condition).NotTo(BeNil())
   630  				Expect(condition.Status).To(Equal(corev1.ConditionTrue))
   631  				Expect(condition.Reason).To(Equal(v1alpha1.NodeDeadlineExceed))
   632  
   633  			})
   634  		})
   635  
   636  		Context("if this node is already in DeadlineExceed because of ParentNodeDeadlineExceed", func() {
   637  			It("should omit the next coming deadline", func() {
   638  				ctx := context.TODO()
   639  				now := time.Now()
   640  				duration := 5 * time.Second
   641  				toleratedJitter := 3 * time.Second
   642  
   643  				startTime := metav1.NewTime(now)
   644  				deadline := metav1.NewTime(now.Add(duration))
   645  
   646  				By("create one empty podchaos workflow node, with deadline: 3s")
   647  				node := v1alpha1.WorkflowNode{
   648  					ObjectMeta: metav1.ObjectMeta{
   649  						Namespace:    ns,
   650  						GenerateName: "pod-chaos-",
   651  					},
   652  					Spec: v1alpha1.WorkflowNodeSpec{
   653  						WorkflowName: "",
   654  						Type:         v1alpha1.TypePodChaos,
   655  						StartTime:    &startTime,
   656  						Deadline:     &deadline,
   657  						EmbedChaos: &v1alpha1.EmbedChaos{
   658  							PodChaos: &v1alpha1.PodChaosSpec{
   659  								ContainerSelector: v1alpha1.ContainerSelector{
   660  									PodSelector: v1alpha1.PodSelector{
   661  										Selector: v1alpha1.PodSelectorSpec{
   662  											GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
   663  												Namespaces: []string{ns},
   664  												LabelSelectors: map[string]string{
   665  													"app": "not-actually-exist",
   666  												},
   667  											},
   668  										},
   669  										Mode: v1alpha1.AllMode,
   670  									},
   671  									ContainerNames: nil,
   672  								},
   673  								Action: v1alpha1.PodKillAction,
   674  							},
   675  						},
   676  					},
   677  					Status: v1alpha1.WorkflowNodeStatus{},
   678  				}
   679  				Expect(kubeClient.Create(ctx, &node)).To(Succeed())
   680  				By("manually set condition ConditionDeadlineExceed to true, because of v1alpha1.ParentNodeDeadlineExceed")
   681  				updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
   682  					deadlineExceedNode := v1alpha1.WorkflowNode{}
   683  
   684  					err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &deadlineExceedNode)
   685  					if err != nil {
   686  						return err
   687  					}
   688  					deadlineExceedNode.Status.Conditions = []v1alpha1.WorkflowNodeCondition{
   689  						{
   690  							Type:   v1alpha1.ConditionDeadlineExceed,
   691  							Status: corev1.ConditionTrue,
   692  							Reason: v1alpha1.ParentNodeDeadlineExceed,
   693  						},
   694  					}
   695  					err = kubeClient.Status().Update(ctx, &deadlineExceedNode)
   696  					if err != nil {
   697  						return err
   698  					}
   699  					return nil
   700  				})
   701  				Expect(updateError).To(BeNil())
   702  				By("after 3 seconds, the condition ConditionDeadlineExceed should not be modified")
   703  				Consistently(func() bool {
   704  					updatedNode := v1alpha1.WorkflowNode{}
   705  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
   706  
   707  					condition := GetCondition(updatedNode.Status, v1alpha1.ConditionDeadlineExceed)
   708  					if condition == nil {
   709  						return false
   710  					}
   711  					if condition.Status != corev1.ConditionTrue {
   712  						return false
   713  					}
   714  					if condition.Reason != v1alpha1.ParentNodeDeadlineExceed {
   715  						return false
   716  					}
   717  					return true
   718  				},
   719  					duration+toleratedJitter, time.Second,
   720  				).Should(BeTrue())
   721  			})
   722  
   723  			It("should NOT omit the next coming deadline otherwise", func() {
   724  				ctx := context.TODO()
   725  				now := time.Now()
   726  				duration := 5 * time.Second
   727  				toleratedJitter := 3 * time.Second
   728  
   729  				startTime := metav1.NewTime(now)
   730  				deadline := metav1.NewTime(now.Add(duration))
   731  
   732  				By("create one empty podchaos workflow node, with deadline: 3s")
   733  				node := v1alpha1.WorkflowNode{
   734  					ObjectMeta: metav1.ObjectMeta{
   735  						Namespace:    ns,
   736  						GenerateName: "pod-chaos-",
   737  					},
   738  					Spec: v1alpha1.WorkflowNodeSpec{
   739  						WorkflowName: "",
   740  						Type:         v1alpha1.TypePodChaos,
   741  						StartTime:    &startTime,
   742  						Deadline:     &deadline,
   743  						EmbedChaos: &v1alpha1.EmbedChaos{
   744  							PodChaos: &v1alpha1.PodChaosSpec{
   745  								ContainerSelector: v1alpha1.ContainerSelector{
   746  									PodSelector: v1alpha1.PodSelector{
   747  										Selector: v1alpha1.PodSelectorSpec{
   748  											GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
   749  												Namespaces: []string{ns},
   750  												LabelSelectors: map[string]string{
   751  													"app": "not-actually-exist",
   752  												},
   753  											},
   754  										},
   755  										Mode: v1alpha1.AllMode,
   756  									},
   757  									ContainerNames: nil,
   758  								},
   759  								Action: v1alpha1.PodKillAction,
   760  							},
   761  						},
   762  					},
   763  					Status: v1alpha1.WorkflowNodeStatus{},
   764  				}
   765  				Expect(kubeClient.Create(ctx, &node)).To(Succeed())
   766  				By("manually set condition ConditionDeadlineExceed to true, but NOT caused by v1alpha1.ParentNodeDeadlineExceed")
   767  				updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
   768  					deadlineExceedNode := v1alpha1.WorkflowNode{}
   769  
   770  					err := kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &deadlineExceedNode)
   771  					if err != nil {
   772  						return err
   773  					}
   774  					deadlineExceedNode.Status.Conditions = []v1alpha1.WorkflowNodeCondition{
   775  						{
   776  							Type:   v1alpha1.ConditionDeadlineExceed,
   777  							Status: corev1.ConditionTrue,
   778  							Reason: v1alpha1.NodeDeadlineExceed,
   779  						},
   780  					}
   781  					err = kubeClient.Status().Update(ctx, &deadlineExceedNode)
   782  					if err != nil {
   783  						return err
   784  					}
   785  					return nil
   786  				})
   787  				Expect(updateError).To(BeNil())
   788  				By("condition ConditionDeadlineExceed should be corrected soon")
   789  				Eventually(func() bool {
   790  					updatedNode := v1alpha1.WorkflowNode{}
   791  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
   792  					return ConditionEqualsTo(updatedNode.Status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionFalse)
   793  				},
   794  					toleratedJitter,
   795  					time.Second)
   796  				By("after 5 seconds, the condition ConditionDeadlineExceed should not be modified, caused by NodeDeadlineExceed itself")
   797  				Eventually(func() bool {
   798  					updatedNode := v1alpha1.WorkflowNode{}
   799  					Expect(kubeClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: node.Name}, &updatedNode)).To(Succeed())
   800  
   801  					condition := GetCondition(updatedNode.Status, v1alpha1.ConditionDeadlineExceed)
   802  					if condition == nil {
   803  						return false
   804  					}
   805  					if condition.Status != corev1.ConditionTrue {
   806  						return false
   807  					}
   808  					if condition.Reason != v1alpha1.NodeDeadlineExceed {
   809  						return false
   810  					}
   811  					return true
   812  				},
   813  					duration+toleratedJitter, time.Second,
   814  				).Should(BeTrue())
   815  			})
   816  		})
   817  	})
   818  })
   819