1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package core
17
18 import (
19 "context"
20 "encoding/json"
21 "strings"
22 "time"
23
24 "github.com/pkg/errors"
25 corev1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/types"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 wfcontrollers "github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers"
32 )
33
34 type WorkflowRepository interface {
35 List(ctx context.Context) ([]WorkflowMeta, error)
36 ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error)
37 Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error)
38 Get(ctx context.Context, namespace, name string) (WorkflowDetail, error)
39 Delete(ctx context.Context, namespace, name string) error
40 Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error)
41 }
42
43 type WorkflowStatus string
44
45 const (
46 WorkflowRunning WorkflowStatus = "running"
47 WorkflowSucceed WorkflowStatus = "finished"
48 WorkflowFailed WorkflowStatus = "failed"
49 WorkflowUnknown WorkflowStatus = "unknown"
50 )
51
52
53 type WorkflowMeta struct {
54 ID uint `gorm:"primary_key" json:"id"`
55 UID string `gorm:"index:workflow_uid" json:"uid"`
56 Namespace string `json:"namespace"`
57 Name string `json:"name"`
58 Entry string `json:"entry"`
59 CreatedAt time.Time `json:"created_at"`
60 EndTime string `json:"end_time"`
61 Status WorkflowStatus `json:"status,omitempty"`
62 Archived bool `json:"-"`
63 }
64
65 type WorkflowDetail struct {
66 WorkflowMeta `json:",inline"`
67 Topology Topology `json:"topology"`
68 KubeObject KubeObjectDesc `json:"kube_object,omitempty"`
69 }
70
71
72 type Topology struct {
73 Nodes []Node `json:"nodes"`
74 }
75
76 type NodeState string
77
78 const (
79 NodeRunning NodeState = "Running"
80 NodeSucceed NodeState = "Succeed"
81 NodeFailed NodeState = "Failed"
82 )
83
84
85 type Node struct {
86 Name string `json:"name"`
87 Type NodeType `json:"type"`
88 State NodeState `json:"state"`
89 Serial []NodeNameWithTemplate `json:"serial,omitempty"`
90 Parallel []NodeNameWithTemplate `json:"parallel,omitempty"`
91 ConditionalBranches []ConditionalBranch `json:"conditional_branches,omitempty"`
92 Template string `json:"template"`
93 UID string `json:"uid"`
94 }
95
96 type NodeNameWithTemplate struct {
97 Name string `json:"name,omitempty"`
98 Template string `json:"template,omitempty"`
99 }
100
101 type ConditionalBranch struct {
102 NodeNameWithTemplate `json:",inline,omitempty"`
103 Expression string `json:"expression,omitempty"`
104 }
105
106
107
108
109
110
111
112 type NodeType string
113
114 const (
115
116 ChaosNode NodeType = "ChaosNode"
117
118
119 SerialNode NodeType = "SerialNode"
120
121
122 ParallelNode NodeType = "ParallelNode"
123
124
125 SuspendNode NodeType = "SuspendNode"
126
127
128 TaskNode NodeType = "TaskNode"
129 )
130
131 var nodeTypeTemplateTypeMapping = map[v1alpha1.TemplateType]NodeType{
132 v1alpha1.TypeSerial: SerialNode,
133 v1alpha1.TypeParallel: ParallelNode,
134 v1alpha1.TypeSuspend: SuspendNode,
135 v1alpha1.TypeTask: TaskNode,
136 }
137
138 type KubeWorkflowRepository struct {
139 kubeclient client.Client
140 }
141
142 func NewKubeWorkflowRepository(kubeclient client.Client) *KubeWorkflowRepository {
143 return &KubeWorkflowRepository{kubeclient: kubeclient}
144 }
145
146 func (it *KubeWorkflowRepository) Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
147 err := it.kubeclient.Create(ctx, &workflow)
148 if err != nil {
149 return WorkflowDetail{}, err
150 }
151
152 return it.Get(ctx, workflow.Namespace, workflow.Name)
153 }
154
155 func (it *KubeWorkflowRepository) Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
156 current := v1alpha1.Workflow{}
157
158 err := it.kubeclient.Get(ctx, types.NamespacedName{
159 Namespace: namespace,
160 Name: name,
161 }, ¤t)
162 if err != nil {
163 return WorkflowDetail{}, err
164 }
165 workflow.ObjectMeta.ResourceVersion = current.ObjectMeta.ResourceVersion
166
167 err = it.kubeclient.Update(ctx, &workflow)
168 if err != nil {
169 return WorkflowDetail{}, err
170 }
171
172 return it.Get(ctx, workflow.Namespace, workflow.Name)
173 }
174
175 func (it *KubeWorkflowRepository) ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error) {
176 workflowList := v1alpha1.WorkflowList{}
177
178 err := it.kubeclient.List(ctx, &workflowList, &client.ListOptions{
179 Namespace: namespace,
180 })
181 if err != nil {
182 return nil, err
183 }
184
185 var result []WorkflowMeta
186 for _, item := range workflowList.Items {
187 result = append(result, convertWorkflow(item))
188 }
189
190 return result, nil
191 }
192
193 func (it *KubeWorkflowRepository) List(ctx context.Context) ([]WorkflowMeta, error) {
194 return it.ListByNamespace(ctx, "")
195 }
196
197 func (it *KubeWorkflowRepository) Get(ctx context.Context, namespace, name string) (WorkflowDetail, error) {
198 kubeWorkflow := v1alpha1.Workflow{}
199
200 err := it.kubeclient.Get(ctx, types.NamespacedName{
201 Namespace: namespace,
202 Name: name,
203 }, &kubeWorkflow)
204 if err != nil {
205 return WorkflowDetail{}, err
206 }
207
208 workflowNodes := v1alpha1.WorkflowNodeList{}
209
210 selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
211 MatchLabels: map[string]string{
212 v1alpha1.LabelWorkflow: kubeWorkflow.Name,
213 },
214 })
215 if err != nil {
216 return WorkflowDetail{}, err
217 }
218
219 err = it.kubeclient.List(ctx, &workflowNodes, &client.ListOptions{
220 Namespace: namespace,
221 LabelSelector: selector,
222 })
223 if err != nil {
224 return WorkflowDetail{}, err
225 }
226
227 return convertWorkflowDetail(kubeWorkflow, workflowNodes.Items)
228 }
229
230 func (it *KubeWorkflowRepository) Delete(ctx context.Context, namespace, name string) error {
231 kubeWorkflow := v1alpha1.Workflow{}
232
233 err := it.kubeclient.Get(ctx, types.NamespacedName{
234 Namespace: namespace,
235 Name: name,
236 }, &kubeWorkflow)
237 if err != nil {
238 return err
239 }
240
241 return it.kubeclient.Delete(ctx, &kubeWorkflow)
242 }
243
244 func convertWorkflow(kubeWorkflow v1alpha1.Workflow) WorkflowMeta {
245 result := WorkflowMeta{
246 Namespace: kubeWorkflow.Namespace,
247 Name: kubeWorkflow.Name,
248 Entry: kubeWorkflow.Spec.Entry,
249 UID: string(kubeWorkflow.UID),
250 }
251
252 if kubeWorkflow.Status.StartTime != nil {
253 result.CreatedAt = kubeWorkflow.Status.StartTime.Time
254 }
255
256 if kubeWorkflow.Status.EndTime != nil {
257 result.EndTime = kubeWorkflow.Status.EndTime.Format(time.RFC3339)
258 }
259
260 if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionAccomplished, corev1.ConditionTrue) {
261 result.Status = WorkflowSucceed
262 } else if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionScheduled, corev1.ConditionTrue) {
263 result.Status = WorkflowRunning
264 } else {
265 result.Status = WorkflowUnknown
266 }
267
268
269
270 return result
271 }
272
273 func convertWorkflowDetail(kubeWorkflow v1alpha1.Workflow, kubeNodes []v1alpha1.WorkflowNode) (WorkflowDetail, error) {
274 nodes := make([]Node, 0)
275
276 for _, item := range kubeNodes {
277 node, err := convertWorkflowNode(item)
278 if err != nil {
279 return WorkflowDetail{}, nil
280 }
281
282 nodes = append(nodes, node)
283 }
284
285 result := WorkflowDetail{
286 WorkflowMeta: convertWorkflow(kubeWorkflow),
287 Topology: Topology{
288 Nodes: nodes,
289 },
290 KubeObject: KubeObjectDesc{
291 TypeMeta: kubeWorkflow.TypeMeta,
292 Meta: KubeObjectMeta{
293 Name: kubeWorkflow.Name,
294 Namespace: kubeWorkflow.Namespace,
295 Labels: kubeWorkflow.Labels,
296 Annotations: kubeWorkflow.Annotations,
297 },
298 Spec: kubeWorkflow.Spec,
299 },
300 }
301
302 return result, nil
303 }
304
305 func convertWorkflowNode(kubeWorkflowNode v1alpha1.WorkflowNode) (Node, error) {
306 templateType, err := mappingTemplateType(kubeWorkflowNode.Spec.Type)
307 if err != nil {
308 return Node{}, err
309 }
310
311 result := Node{
312 Name: kubeWorkflowNode.Name,
313 Type: templateType,
314 Serial: nil,
315 Parallel: nil,
316 Template: kubeWorkflowNode.Spec.TemplateName,
317 UID: string(kubeWorkflowNode.UID),
318 }
319
320 if kubeWorkflowNode.Spec.Type == v1alpha1.TypeSerial {
321 var nodes []string
322 for _, child := range kubeWorkflowNode.Status.FinishedChildren {
323 nodes = append(nodes, child.Name)
324 }
325 for _, child := range kubeWorkflowNode.Status.ActiveChildren {
326 nodes = append(nodes, child.Name)
327 }
328 result.Serial = composeSerialTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes)
329
330 } else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeParallel {
331 var nodes []string
332 for _, child := range kubeWorkflowNode.Status.FinishedChildren {
333 nodes = append(nodes, child.Name)
334 }
335 for _, child := range kubeWorkflowNode.Status.ActiveChildren {
336 nodes = append(nodes, child.Name)
337 }
338 result.Parallel = composeParallelTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes)
339
340 } else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeTask {
341 var nodes []string
342 for _, child := range kubeWorkflowNode.Status.FinishedChildren {
343 nodes = append(nodes, child.Name)
344 }
345 for _, child := range kubeWorkflowNode.Status.ActiveChildren {
346 nodes = append(nodes, child.Name)
347 }
348 result.ConditionalBranches = composeTaskConditionalBranches(kubeWorkflowNode.Spec.ConditionalBranches, nodes)
349 }
350
351 if wfcontrollers.WorkflowNodeFinished(kubeWorkflowNode.Status) {
352 result.State = NodeSucceed
353 } else {
354 result.State = NodeRunning
355 }
356
357 return result, nil
358 }
359
360
361 func composeSerialTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
362 var result []NodeNameWithTemplate
363 for _, node := range nodes {
364
365 templateName := node[0:strings.LastIndex(node, "-")]
366 result = append(result, NodeNameWithTemplate{Name: node, Template: templateName})
367 }
368 for _, task := range children[len(nodes):] {
369 result = append(result, NodeNameWithTemplate{Template: task})
370 }
371 return result
372 }
373
374 func composeParallelTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
375 var result []NodeNameWithTemplate
376 for _, task := range children {
377 result = append(result, NodeNameWithTemplate{
378 Name: "",
379 Template: task,
380 })
381 }
382 for _, node := range nodes {
383 for i, item := range result {
384 if len(item.Name) == 0 && strings.HasPrefix(node, item.Template) {
385 result[i].Name = node
386 break
387 }
388 }
389 }
390 return result
391 }
392
393 func composeTaskConditionalBranches(conditionalBranches []v1alpha1.ConditionalBranch, nodes []string) []ConditionalBranch {
394 var result []ConditionalBranch
395 for _, item := range conditionalBranches {
396 nodeName := ""
397 for _, node := range nodes {
398 if strings.HasPrefix(node, item.Target) {
399 nodeName = node
400 }
401 }
402 result = append(result,
403 ConditionalBranch{
404 NodeNameWithTemplate: NodeNameWithTemplate{
405 Name: nodeName,
406 Template: item.Target,
407 },
408 Expression: item.Expression,
409 })
410 }
411
412 return result
413 }
414
415 func mappingTemplateType(templateType v1alpha1.TemplateType) (NodeType, error) {
416 if v1alpha1.IsChaosTemplateType(templateType) {
417 return ChaosNode, nil
418 } else if target, ok := nodeTypeTemplateTypeMapping[templateType]; ok {
419 return target, nil
420 } else {
421 return "", errors.Errorf("can not resolve such type called %s", templateType)
422 }
423 }
424
425
426 type WorkflowStore interface {
427 List(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowEntity, error)
428 ListMeta(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowMeta, error)
429 FindByID(ctx context.Context, ID uint) (*WorkflowEntity, error)
430 FindByUID(ctx context.Context, UID string) (*WorkflowEntity, error)
431 FindMetaByUID(ctx context.Context, UID string) (*WorkflowMeta, error)
432 Save(ctx context.Context, entity *WorkflowEntity) error
433 DeleteByUID(ctx context.Context, UID string) error
434 DeleteByUIDs(ctx context.Context, UIDs []string) error
435 MarkAsArchived(ctx context.Context, namespace, name string) error
436 MarkAsArchivedWithUID(ctx context.Context, UID string) error
437 }
438
439
440 type WorkflowEntity struct {
441 WorkflowMeta
442 Workflow string `gorm:"type:text;size:32768"`
443 }
444
445 func WorkflowCR2WorkflowEntity(workflow *v1alpha1.Workflow) (*WorkflowEntity, error) {
446 if workflow == nil {
447 return nil, nil
448
449 }
450 jsonContent, err := json.Marshal(workflow)
451 if err != nil {
452 return nil, err
453 }
454 return &WorkflowEntity{
455 WorkflowMeta: convertWorkflow(*workflow),
456 Workflow: string(jsonContent),
457 }, nil
458
459 }
460
461 func WorkflowEntity2WorkflowCR(entity *WorkflowEntity) (*v1alpha1.Workflow, error) {
462 if entity == nil {
463 return nil, nil
464 }
465 result := v1alpha1.Workflow{}
466 err := json.Unmarshal([]byte(entity.Workflow), &result)
467 if err != nil {
468 return nil, err
469 }
470 return &result, nil
471 }
472
473 func WorkflowEntity2WorkflowDetail(entity *WorkflowEntity) (*WorkflowDetail, error) {
474 workflowCustomResource, err := WorkflowEntity2WorkflowCR(entity)
475 if err != nil {
476 return nil, err
477 }
478 return &WorkflowDetail{
479 WorkflowMeta: entity.WorkflowMeta,
480 KubeObject: KubeObjectDesc{
481 TypeMeta: workflowCustomResource.TypeMeta,
482 Meta: KubeObjectMeta{
483 Name: workflowCustomResource.Name,
484 Namespace: workflowCustomResource.Namespace,
485 Labels: workflowCustomResource.Labels,
486 Annotations: workflowCustomResource.Annotations,
487 },
488 Spec: workflowCustomResource.Spec,
489 },
490 }, nil
491 }
492