1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package controllers
17
18 import (
19 "context"
20 "fmt"
21 "sort"
22 "time"
23
24 "github.com/go-logr/logr"
25 "github.com/pkg/errors"
26 corev1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/client-go/util/retry"
29 "k8s.io/utils/pointer"
30 "sigs.k8s.io/controller-runtime/pkg/client"
31 "sigs.k8s.io/controller-runtime/pkg/reconcile"
32
33 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
34 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
35 )
36
37
38 type WorkflowEntryReconciler struct {
39 kubeClient client.Client
40 eventRecorder recorder.ChaosRecorder
41 logger logr.Logger
42 }
43
44 func NewWorkflowEntryReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *WorkflowEntryReconciler {
45 return &WorkflowEntryReconciler{kubeClient: kubeClient, eventRecorder: eventRecorder, logger: logger}
46 }
47
48 func (it *WorkflowEntryReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
49 startTime := time.Now()
50 defer func() {
51 it.logger.V(4).Info("Finished syncing for workflow",
52 "node", request.NamespacedName,
53 "duration", time.Since(startTime),
54 )
55 }()
56
57 workflow := v1alpha1.Workflow{}
58 err := it.kubeClient.Get(ctx, request.NamespacedName, &workflow)
59 if err != nil {
60 return reconcile.Result{}, client.IgnoreNotFound(err)
61 }
62
63 entryNodes, err := fetchEntryNode(ctx, it.kubeClient, workflow)
64 if err != nil {
65 it.logger.Error(err, "failed to list entry nodes of workflow",
66 "workflow", request.NamespacedName)
67 return reconcile.Result{}, err
68 }
69
70 if len(entryNodes) == 0 {
71 func() {
72
73 spawnedEntryNode, err := it.spawnEntryNode(ctx, workflow)
74 if err != nil {
75 it.eventRecorder.Event(&workflow, recorder.InvalidEntry{
76 EntryTemplate: workflow.Spec.Entry,
77 })
78 it.logger.Error(err, "failed to spawn new entry node of workflow",
79 "workflow", request.NamespacedName,
80 "entry", workflow.Spec.Entry)
81
82 return
83 }
84 it.logger.Info(
85 "entry node for workflow created",
86 "workflow", request.NamespacedName,
87 "entry node", fmt.Sprintf("%s/%s", spawnedEntryNode.Namespace, spawnedEntryNode.Name),
88 )
89 it.eventRecorder.Event(&workflow, recorder.EntryCreated{Entry: spawnedEntryNode.Name})
90 }()
91 }
92
93 if len(entryNodes) > 1 {
94 var nodeNames []string
95 for _, node := range entryNodes {
96 nodeNames = append(nodeNames, node.GetName())
97 }
98 it.logger.Info("there are more than 1 entry nodes of workflow, cleaning up except first one",
99 "workflow", request.NamespacedName,
100 "entry nodes", nodeNames,
101 )
102 for _, redundantEntryNode := range entryNodes[1:] {
103 redundantEntryNode := redundantEntryNode
104
105 err := it.kubeClient.Delete(ctx, &redundantEntryNode)
106 if err != nil {
107 it.logger.Error(err,
108 "failed to delete redundant entry node",
109 "workflow", request.NamespacedName,
110 "redundant entry node", fmt.Sprintf("%s/%s", redundantEntryNode.Namespace, redundantEntryNode.Name),
111 )
112 }
113 }
114 }
115
116
117 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
118 workflowNeedUpdate := v1alpha1.Workflow{}
119 err := it.kubeClient.Get(ctx, request.NamespacedName, &workflowNeedUpdate)
120 if err != nil {
121 it.logger.Error(err,
122 "failed to fetch the latest state of workflow",
123 "workflow", request.NamespacedName,
124 )
125 return err
126 }
127
128 entryNodes, err := fetchEntryNode(ctx, it.kubeClient, workflowNeedUpdate)
129 if err != nil {
130 it.logger.Error(err,
131 "failed to list entry nodes of workflow",
132 "workflow", request.NamespacedName,
133 )
134 return err
135 }
136
137 if len(entryNodes) > 0 {
138 if len(entryNodes) > 1 {
139 var nodeNames []string
140 for _, node := range entryNodes {
141 nodeNames = append(nodeNames, node.GetName())
142 }
143 it.logger.Info("there are more than 1 entry nodes of workflow",
144 "workflow", request.NamespacedName,
145 "entry nodes", nodeNames,
146 )
147 }
148
149 workflowNeedUpdate.Status.EntryNode = pointer.String(entryNodes[0].Name)
150 SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
151 Type: v1alpha1.WorkflowConditionScheduled,
152 Status: corev1.ConditionTrue,
153 Reason: "",
154 })
155
156 if WorkflowNodeFinished(entryNodes[0].Status) {
157 SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
158 Type: v1alpha1.WorkflowConditionAccomplished,
159 Status: corev1.ConditionTrue,
160 Reason: "",
161 })
162 if workflowNeedUpdate.Status.EndTime == nil {
163 now := metav1.NewTime(time.Now())
164 workflowNeedUpdate.Status.EndTime = &now
165 it.eventRecorder.Event(&workflow, recorder.WorkflowAccomplished{})
166 }
167 } else {
168 SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
169 Type: v1alpha1.WorkflowConditionAccomplished,
170 Status: corev1.ConditionFalse,
171 Reason: "",
172 })
173 workflowNeedUpdate.Status.EndTime = nil
174 }
175 } else {
176 SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
177 Type: v1alpha1.WorkflowConditionScheduled,
178 Status: corev1.ConditionFalse,
179 Reason: "",
180 })
181 SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
182 Type: v1alpha1.WorkflowConditionAccomplished,
183 Status: corev1.ConditionFalse,
184 Reason: "",
185 })
186 workflowNeedUpdate.Status.EndTime = nil
187 }
188
189 if workflowNeedUpdate.Status.StartTime == nil {
190 tmp := metav1.NewTime(startTime)
191 workflowNeedUpdate.Status.StartTime = &tmp
192 }
193
194 err = it.kubeClient.Status().Update(ctx, &workflowNeedUpdate)
195 if err != nil {
196 it.logger.Error(err, "failed to update workflowNeedUpdate status")
197 return err
198 }
199 return nil
200 })
201
202 return reconcile.Result{}, client.IgnoreNotFound(updateError)
203 }
204
205
206
207
208
209 func fetchEntryNode(ctx context.Context, kubeClient client.Client, workflow v1alpha1.Workflow) ([]v1alpha1.WorkflowNode, error) {
210 entryNodesList := v1alpha1.WorkflowNodeList{}
211 controlledByWorkflow, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
212 MatchLabels: map[string]string{
213 v1alpha1.LabelControlledBy: workflow.Name,
214 },
215 })
216 if err != nil {
217 return nil, errors.Wrap(err, "build label selector")
218 }
219
220 err = kubeClient.List(ctx, &entryNodesList, &client.ListOptions{
221 Namespace: workflow.Namespace,
222 LabelSelector: controlledByWorkflow,
223 })
224 if err != nil {
225 return nil, errors.Wrapf(err, "list entry workflow node")
226 }
227
228 sortedEntryNodes := SortByCreationTimestamp(entryNodesList.Items)
229 sort.Sort(sortedEntryNodes)
230
231 return sortedEntryNodes, nil
232 }
233
234
235 func (it *WorkflowEntryReconciler) spawnEntryNode(ctx context.Context, workflow v1alpha1.Workflow) (*v1alpha1.WorkflowNode, error) {
236
237 nodes, err := renderNodesByTemplates(&workflow, nil, workflow.Spec.Entry)
238 if err != nil {
239 it.logger.Error(err, "failed create entry node", "workflow", workflow.Name, "entry", workflow.Spec.Entry)
240 return nil, err
241 }
242
243 if len(nodes) > 1 {
244 it.logger.Info("the results of entry nodes are more than 1, will only pick the first one",
245 "workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name),
246 "nodes", nodes,
247 )
248 }
249
250 entryNode := nodes[0]
251 err = it.kubeClient.Create(ctx, entryNode)
252 if err != nil {
253 it.logger.Info("failed to create workflow nodes")
254 return nil, err
255 }
256 it.logger.Info("entry workflow node created",
257 "workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name),
258 "entry node", entryNode.Name,
259 )
260
261 return entryNode, nil
262 }
263