1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21 "sort"
22 "time"
23
24 "github.com/go-logr/logr"
25 corev1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/client-go/util/retry"
28 "k8s.io/utils/pointer"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30 "sigs.k8s.io/controller-runtime/pkg/reconcile"
31
32 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
33 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
34 )
35
36
37 type WorkflowEntryReconciler struct {
38 kubeClient client.Client
39 eventRecorder recorder.ChaosRecorder
40 logger logr.Logger
41 }
42
43 func NewWorkflowEntryReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *WorkflowEntryReconciler {
44 return &WorkflowEntryReconciler{kubeClient: kubeClient, eventRecorder: eventRecorder, logger: logger}
45 }
46
47 func (it *WorkflowEntryReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
48 startTime := time.Now()
49 defer func() {
50 it.logger.V(4).Info("Finished syncing for workflow",
51 "node", request.NamespacedName,
52 "duration", time.Since(startTime),
53 )
54 }()
55
56 workflow := v1alpha1.Workflow{}
57 err := it.kubeClient.Get(ctx, request.NamespacedName, &workflow)
58 if err != nil {
59 return reconcile.Result{}, client.IgnoreNotFound(err)
60 }
61
62 entryNodes, err := it.fetchEntryNode(ctx, workflow)
63 if err != nil {
64 it.logger.Error(err, "failed to list entry nodes of workflow",
65 "workflow", request.NamespacedName)
66 return reconcile.Result{}, err
67 }
68
69 if len(entryNodes) == 0 {
70 func() {
71
72 spawnedEntryNode, err := it.spawnEntryNode(ctx, workflow)
73 if err != nil {
74 it.eventRecorder.Event(&workflow, recorder.InvalidEntry{
75 EntryTemplate: workflow.Spec.Entry,
76 })
77 it.logger.Error(err, "failed to spawn new entry node of workflow",
78 "workflow", request.NamespacedName,
79 "entry", workflow.Spec.Entry)
80
81 return
82 }
83 it.logger.Info(
84 "entry node for workflow created",
85 "workflow", request.NamespacedName,
86 "entry node", fmt.Sprintf("%s/%s", spawnedEntryNode.Namespace, spawnedEntryNode.Name),
87 )
88 it.eventRecorder.Event(&workflow, recorder.EntryCreated{Entry: spawnedEntryNode.Name})
89 }()
90 }
91
92 if len(entryNodes) > 1 {
93 var nodeNames []string
94 for _, node := range entryNodes {
95 nodeNames = append(nodeNames, node.GetName())
96 }
97 it.logger.Info("there are more than 1 entry nodes of workflow, cleaning up except first one",
98 "workflow", request.NamespacedName,
99 "entry nodes", nodeNames,
100 )
101 for _, redundantEntryNode := range entryNodes[1:] {
102 redundantEntryNode := redundantEntryNode
103
104 err := it.kubeClient.Delete(ctx, &redundantEntryNode)
105 if err != nil {
106 it.logger.Error(err,
107 "failed to delete redundant entry node",
108 "workflow", request.NamespacedName,
109 "redundant entry node", fmt.Sprintf("%s/%s", redundantEntryNode.Namespace, redundantEntryNode.Name),
110 )
111 }
112 }
113 }
114
115
116 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
117 workflowNeedUpdate := v1alpha1.Workflow{}
118 err := it.kubeClient.Get(ctx, request.NamespacedName, &workflowNeedUpdate)
119 if err != nil {
120 it.logger.Error(err,
121 "failed to fetch the latest state of workflow",
122 "workflow", request.NamespacedName,
123 )
124 return err
125 }
126
127 entryNodes, err := it.fetchEntryNode(ctx, workflowNeedUpdate)
128 if err != nil {
129 it.logger.Error(err,
130 "failed to list entry nodes of workflow",
131 "workflow", request.NamespacedName,
132 )
133 return err
134 }
135
136 if len(entryNodes) > 0 {
137 if len(entryNodes) > 1 {
138 var nodeNames []string
139 for _, node := range entryNodes {
140 nodeNames = append(nodeNames, node.GetName())
141 }
142 it.logger.Info("there are more than 1 entry nodes of workflow",
143 "workflow", request.NamespacedName,
144 "entry nodes", nodeNames,
145 )
146 }
147
148 workflowNeedUpdate.Status.EntryNode = pointer.StringPtr(entryNodes[0].Name)
149 SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
150 Type: v1alpha1.WorkflowConditionScheduled,
151 Status: corev1.ConditionTrue,
152 Reason: "",
153 })
154
155 if WorkflowNodeFinished(entryNodes[0].Status) {
156 SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
157 Type: v1alpha1.WorkflowConditionAccomplished,
158 Status: corev1.ConditionTrue,
159 Reason: "",
160 })
161 if workflowNeedUpdate.Status.EndTime == nil {
162 now := metav1.NewTime(time.Now())
163 workflowNeedUpdate.Status.EndTime = &now
164 it.eventRecorder.Event(&workflow, recorder.WorkflowAccomplished{})
165 }
166 } else {
167 SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
168 Type: v1alpha1.WorkflowConditionAccomplished,
169 Status: corev1.ConditionFalse,
170 Reason: "",
171 })
172 workflowNeedUpdate.Status.EndTime = nil
173 }
174 } else {
175 SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
176 Type: v1alpha1.WorkflowConditionScheduled,
177 Status: corev1.ConditionFalse,
178 Reason: "",
179 })
180 SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
181 Type: v1alpha1.WorkflowConditionAccomplished,
182 Status: corev1.ConditionFalse,
183 Reason: "",
184 })
185 workflowNeedUpdate.Status.EndTime = nil
186 }
187
188 if workflowNeedUpdate.Status.StartTime == nil {
189 tmp := metav1.NewTime(startTime)
190 workflowNeedUpdate.Status.StartTime = &tmp
191 }
192
193 err = it.kubeClient.Status().Update(ctx, &workflowNeedUpdate)
194 if err != nil {
195 it.logger.Error(err, "failed to update workflowNeedUpdate status")
196 return err
197 }
198 return nil
199 })
200
201 return reconcile.Result{}, client.IgnoreNotFound(updateError)
202 }
203
204
205
206
207
208 func (it *WorkflowEntryReconciler) fetchEntryNode(ctx context.Context, workflow v1alpha1.Workflow) ([]v1alpha1.WorkflowNode, error) {
209 entryNodesList := v1alpha1.WorkflowNodeList{}
210 controlledByWorkflow, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
211 MatchLabels: map[string]string{
212 v1alpha1.LabelControlledBy: workflow.Name,
213 },
214 })
215 if err != nil {
216 it.logger.Error(err, "failed to build label selector with filtering entry workflow node controlled by current workflow",
217 "workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name))
218 return nil, err
219 }
220
221 err = it.kubeClient.List(ctx, &entryNodesList, &client.ListOptions{
222 Namespace: workflow.Namespace,
223 LabelSelector: controlledByWorkflow,
224 })
225 if err != nil {
226 it.logger.Error(err, "failed to list entry workflow node controlled by workflow",
227 "workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name))
228 return nil, err
229 }
230
231 sortedEntryNodes := SortByCreationTimestamp(entryNodesList.Items)
232 sort.Sort(sortedEntryNodes)
233
234 return sortedEntryNodes, nil
235 }
236
237
238 func (it *WorkflowEntryReconciler) spawnEntryNode(ctx context.Context, workflow v1alpha1.Workflow) (*v1alpha1.WorkflowNode, error) {
239
240 nodes, err := renderNodesByTemplates(&workflow, nil, workflow.Spec.Entry)
241 if err != nil {
242 it.logger.Error(err, "failed create entry node", "workflow", workflow.Name, "entry", workflow.Spec.Entry)
243 return nil, err
244 }
245
246 if len(nodes) > 1 {
247 it.logger.Info("the results of entry nodes are more than 1, will only pick the first one",
248 "workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name),
249 "nodes", nodes,
250 )
251 }
252
253 entryNode := nodes[0]
254 err = it.kubeClient.Create(ctx, entryNode)
255 if err != nil {
256 it.logger.Info("failed to create workflow nodes")
257 return nil, err
258 }
259 it.logger.Info("entry workflow node created",
260 "workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name),
261 "entry node", entryNode.Name,
262 )
263
264 return entryNode, nil
265 }
266