1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21 "sort"
22 "strings"
23
24 "github.com/go-logr/logr"
25 corev1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 )
31
32 func SetCondition(status *v1alpha1.WorkflowNodeStatus, condition v1alpha1.WorkflowNodeCondition) {
33 currentCond := GetCondition(*status, condition.Type)
34 if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
35 return
36 }
37 newConditions := filterOutCondition(status.Conditions, condition.Type)
38 status.Conditions = append(newConditions, condition)
39 }
40
41 func GetCondition(status v1alpha1.WorkflowNodeStatus, conditionType v1alpha1.WorkflowNodeConditionType) *v1alpha1.WorkflowNodeCondition {
42 for _, item := range status.Conditions {
43 if item.Type == conditionType {
44 return &item
45 }
46 }
47 return nil
48 }
49
50 func ConditionEqualsTo(status v1alpha1.WorkflowNodeStatus, conditionType v1alpha1.WorkflowNodeConditionType, expected corev1.ConditionStatus) bool {
51 condition := GetCondition(status, conditionType)
52 if condition == nil {
53 return false
54 }
55 return condition.Status == expected
56 }
57
58 func filterOutCondition(conditions []v1alpha1.WorkflowNodeCondition, except v1alpha1.WorkflowNodeConditionType) []v1alpha1.WorkflowNodeCondition {
59 var newConditions []v1alpha1.WorkflowNodeCondition
60 for _, c := range conditions {
61 if c.Type == except {
62 continue
63 }
64 newConditions = append(newConditions, c)
65 }
66 return newConditions
67 }
68
69 func WorkflowNodeFinished(status v1alpha1.WorkflowNodeStatus) bool {
70 return ConditionEqualsTo(status, v1alpha1.ConditionAccomplished, corev1.ConditionTrue) ||
71 ConditionEqualsTo(status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue) ||
72 ConditionEqualsTo(status, v1alpha1.ConditionAborted, corev1.ConditionTrue)
73 }
74
75 func WorkflowAborted(workflow v1alpha1.Workflow) bool {
76 return workflow.Annotations[v1alpha1.WorkflowAnnotationAbort] == "true"
77 }
78
79 func SetWorkflowCondition(status *v1alpha1.WorkflowStatus, condition v1alpha1.WorkflowCondition) {
80 currentCond := GetWorkflowCondition(*status, condition.Type)
81 if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
82 return
83 }
84 newConditions := filterOutWorkflowCondition(status.Conditions, condition.Type)
85 status.Conditions = append(newConditions, condition)
86 }
87
88 func GetWorkflowCondition(status v1alpha1.WorkflowStatus, conditionType v1alpha1.WorkflowConditionType) *v1alpha1.WorkflowCondition {
89 for _, item := range status.Conditions {
90 if item.Type == conditionType {
91 return &item
92 }
93 }
94 return nil
95 }
96
97 func WorkflowConditionEqualsTo(status v1alpha1.WorkflowStatus, conditionType v1alpha1.WorkflowConditionType, expected corev1.ConditionStatus) bool {
98 condition := GetWorkflowCondition(status, conditionType)
99 if condition == nil {
100 return false
101 }
102 return condition.Status == expected
103 }
104
105 func filterOutWorkflowCondition(conditions []v1alpha1.WorkflowCondition, except v1alpha1.WorkflowConditionType) []v1alpha1.WorkflowCondition {
106 var newConditions []v1alpha1.WorkflowCondition
107 for _, c := range conditions {
108 if c.Type == except {
109 continue
110 }
111 newConditions = append(newConditions, c)
112 }
113 return newConditions
114 }
115
116 type SortByCreationTimestamp []v1alpha1.WorkflowNode
117
118 func (it SortByCreationTimestamp) Len() int {
119 return len(it)
120 }
121
122 func (it SortByCreationTimestamp) Less(i, j int) bool {
123 return it[j].GetCreationTimestamp().After(it[i].GetCreationTimestamp().Time)
124 }
125
126 func (it SortByCreationTimestamp) Swap(i, j int) {
127 it[i], it[j] = it[j], it[i]
128 }
129
130 type ChildNodesFetcher struct {
131 kubeClient client.Client
132 logger logr.Logger
133 }
134
135 func NewChildNodesFetcher(kubeClient client.Client, logger logr.Logger) *ChildNodesFetcher {
136 return &ChildNodesFetcher{kubeClient: kubeClient, logger: logger}
137 }
138
139
140
141 func (it *ChildNodesFetcher) fetchChildNodes(ctx context.Context, node v1alpha1.WorkflowNode) (activeChildNodes []v1alpha1.WorkflowNode, finishedChildNodes []v1alpha1.WorkflowNode, err error) {
142 childNodes := v1alpha1.WorkflowNodeList{}
143 controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
144 MatchLabels: map[string]string{
145 v1alpha1.LabelControlledBy: node.Name,
146 },
147 })
148
149 if err != nil {
150 it.logger.Error(err, "failed to build label selector with filtering children workflow node controlled by current node",
151 "current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
152 return nil, nil, err
153 }
154
155 err = it.kubeClient.List(ctx, &childNodes, &client.ListOptions{
156 Namespace: node.Namespace,
157 LabelSelector: controlledByThisNode,
158 })
159
160 if err != nil {
161 it.logger.Error(err, "failed to list children workflow node controlled by current node",
162 "current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
163 return nil, nil, err
164 }
165
166 sortedChildNodes := SortByCreationTimestamp(childNodes.Items)
167 sort.Sort(sortedChildNodes)
168
169 it.logger.V(4).Info("list children node", "current node",
170 "current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
171 len(sortedChildNodes), "children", sortedChildNodes)
172
173 var activeChildren []v1alpha1.WorkflowNode
174 var finishedChildren []v1alpha1.WorkflowNode
175
176 for _, item := range sortedChildNodes {
177 childNode := item
178 if WorkflowNodeFinished(childNode.Status) {
179 finishedChildren = append(finishedChildren, childNode)
180 } else {
181 activeChildren = append(activeChildren, childNode)
182 }
183 }
184 return activeChildren, finishedChildren, nil
185 }
186
187 func getTaskNameFromGeneratedName(generatedNodeName string) string {
188 index := strings.LastIndex(generatedNodeName, "-")
189 if index < 0 {
190 return generatedNodeName
191 }
192 return generatedNodeName[:index]
193 }
194
195
196 func setDifference(former []string, latter []string) []string {
197 var result []string
198 formerSet := make(map[string]struct{})
199 latterSet := make(map[string]struct{})
200
201 for _, item := range former {
202 formerSet[item] = struct{}{}
203 }
204 for _, item := range latter {
205 latterSet[item] = struct{}{}
206 }
207 for k := range formerSet {
208 if _, ok := latterSet[k]; !ok {
209 result = append(result, k)
210 }
211 }
212 return result
213 }
214