1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21 "sort"
22 "strings"
23
24 "github.com/go-logr/logr"
25 corev1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 )
31
32 func SetCondition(status *v1alpha1.WorkflowNodeStatus, condition v1alpha1.WorkflowNodeCondition) {
33 currentCond := GetCondition(*status, condition.Type)
34 if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
35 return
36 }
37 newConditions := filterOutCondition(status.Conditions, condition.Type)
38 status.Conditions = append(newConditions, condition)
39 }
40
41 func GetCondition(status v1alpha1.WorkflowNodeStatus, conditionType v1alpha1.WorkflowNodeConditionType) *v1alpha1.WorkflowNodeCondition {
42 for _, item := range status.Conditions {
43 if item.Type == conditionType {
44 return &item
45 }
46 }
47 return nil
48 }
49
50 func ConditionEqualsTo(status v1alpha1.WorkflowNodeStatus, conditionType v1alpha1.WorkflowNodeConditionType, expected corev1.ConditionStatus) bool {
51 condition := GetCondition(status, conditionType)
52 if condition == nil {
53 return false
54 }
55 return condition.Status == expected
56 }
57
58 func filterOutCondition(conditions []v1alpha1.WorkflowNodeCondition, except v1alpha1.WorkflowNodeConditionType) []v1alpha1.WorkflowNodeCondition {
59 var newConditions []v1alpha1.WorkflowNodeCondition
60 for _, c := range conditions {
61 if c.Type == except {
62 continue
63 }
64 newConditions = append(newConditions, c)
65 }
66 return newConditions
67 }
68
69 func WorkflowNodeFinished(status v1alpha1.WorkflowNodeStatus) bool {
70 return ConditionEqualsTo(status, v1alpha1.ConditionAccomplished, corev1.ConditionTrue) ||
71 ConditionEqualsTo(status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue)
72 }
73
74 func SetWorkflowCondition(status *v1alpha1.WorkflowStatus, condition v1alpha1.WorkflowCondition) {
75 currentCond := GetWorkflowCondition(*status, condition.Type)
76 if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
77 return
78 }
79 newConditions := filterOutWorkflowCondition(status.Conditions, condition.Type)
80 status.Conditions = append(newConditions, condition)
81 }
82
83 func GetWorkflowCondition(status v1alpha1.WorkflowStatus, conditionType v1alpha1.WorkflowConditionType) *v1alpha1.WorkflowCondition {
84 for _, item := range status.Conditions {
85 if item.Type == conditionType {
86 return &item
87 }
88 }
89 return nil
90 }
91
92 func WorkflowConditionEqualsTo(status v1alpha1.WorkflowStatus, conditionType v1alpha1.WorkflowConditionType, expected corev1.ConditionStatus) bool {
93 condition := GetWorkflowCondition(status, conditionType)
94 if condition == nil {
95 return false
96 }
97 return condition.Status == expected
98 }
99
100 func filterOutWorkflowCondition(conditions []v1alpha1.WorkflowCondition, except v1alpha1.WorkflowConditionType) []v1alpha1.WorkflowCondition {
101 var newConditions []v1alpha1.WorkflowCondition
102 for _, c := range conditions {
103 if c.Type == except {
104 continue
105 }
106 newConditions = append(newConditions, c)
107 }
108 return newConditions
109 }
110
111 type SortByCreationTimestamp []v1alpha1.WorkflowNode
112
113 func (it SortByCreationTimestamp) Len() int {
114 return len(it)
115 }
116
117 func (it SortByCreationTimestamp) Less(i, j int) bool {
118 return it[j].GetCreationTimestamp().After(it[i].GetCreationTimestamp().Time)
119 }
120
121 func (it SortByCreationTimestamp) Swap(i, j int) {
122 it[i], it[j] = it[j], it[i]
123 }
124
125 type ChildNodesFetcher struct {
126 kubeClient client.Client
127 logger logr.Logger
128 }
129
130 func NewChildNodesFetcher(kubeClient client.Client, logger logr.Logger) *ChildNodesFetcher {
131 return &ChildNodesFetcher{kubeClient: kubeClient, logger: logger}
132 }
133
134
135
136 func (it *ChildNodesFetcher) fetchChildNodes(ctx context.Context, node v1alpha1.WorkflowNode) (activeChildNodes []v1alpha1.WorkflowNode, finishedChildNodes []v1alpha1.WorkflowNode, err error) {
137 childNodes := v1alpha1.WorkflowNodeList{}
138 controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
139 MatchLabels: map[string]string{
140 v1alpha1.LabelControlledBy: node.Name,
141 },
142 })
143
144 if err != nil {
145 it.logger.Error(err, "failed to build label selector with filtering children workflow node controlled by current node",
146 "current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
147 return nil, nil, err
148 }
149
150 err = it.kubeClient.List(ctx, &childNodes, &client.ListOptions{
151 Namespace: node.Namespace,
152 LabelSelector: controlledByThisNode,
153 })
154
155 if err != nil {
156 it.logger.Error(err, "failed to list children workflow node controlled by current node",
157 "current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
158 return nil, nil, err
159 }
160
161 sortedChildNodes := SortByCreationTimestamp(childNodes.Items)
162 sort.Sort(sortedChildNodes)
163
164 it.logger.V(4).Info("list children node", "current node",
165 "current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
166 len(sortedChildNodes), "children", sortedChildNodes)
167
168 var activeChildren []v1alpha1.WorkflowNode
169 var finishedChildren []v1alpha1.WorkflowNode
170
171 for _, item := range sortedChildNodes {
172 childNode := item
173 if WorkflowNodeFinished(childNode.Status) {
174 finishedChildren = append(finishedChildren, childNode)
175 } else {
176 activeChildren = append(activeChildren, childNode)
177 }
178 }
179 return activeChildren, finishedChildren, nil
180 }
181
182 func getTaskNameFromGeneratedName(generatedNodeName string) string {
183 index := strings.LastIndex(generatedNodeName, "-")
184 if index < 0 {
185 return generatedNodeName
186 }
187 return generatedNodeName[:index]
188 }
189
190
191 func setDifference(former []string, latter []string) []string {
192 var result []string
193 formerSet := make(map[string]struct{})
194 latterSet := make(map[string]struct{})
195
196 for _, item := range former {
197 formerSet[item] = struct{}{}
198 }
199 for _, item := range latter {
200 latterSet[item] = struct{}{}
201 }
202 for k := range formerSet {
203 if _, ok := latterSet[k]; !ok {
204 result = append(result, k)
205 }
206 }
207 return result
208 }
209