1
2
3
4
5
6
7
8
9
10
11
12
13
14 package core
15
16 import (
17 "context"
18 "encoding/json"
19 "strings"
20 "time"
21
22 "github.com/pkg/errors"
23 corev1 "k8s.io/api/core/v1"
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/apimachinery/pkg/types"
26 "sigs.k8s.io/controller-runtime/pkg/client"
27
28 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
29 wfcontrollers "github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers"
30 )
31
32 type WorkflowRepository interface {
33 List(ctx context.Context) ([]WorkflowMeta, error)
34 ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error)
35 Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error)
36 Get(ctx context.Context, namespace, name string) (WorkflowDetail, error)
37 Delete(ctx context.Context, namespace, name string) error
38 Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error)
39 }
40
41 type WorkflowStatus string
42
43 const (
44 WorkflowRunning WorkflowStatus = "running"
45 WorkflowSucceed WorkflowStatus = "finished"
46 WorkflowFailed WorkflowStatus = "failed"
47 WorkflowUnknown WorkflowStatus = "unknown"
48 )
49
50
51 type WorkflowMeta struct {
52 ID uint `gorm:"primary_key" json:"id"`
53 UID string `gorm:"index:workflow_uid" json:"uid"`
54 Namespace string `json:"namespace"`
55 Name string `json:"name"`
56 Entry string `json:"entry"`
57 CreatedAt time.Time `json:"created_at"`
58 EndTime string `json:"end_time"`
59 Status WorkflowStatus `json:"status,omitempty"`
60 Archived bool `json:"-"`
61 }
62
63 type WorkflowDetail struct {
64 WorkflowMeta `json:",inline"`
65 Topology Topology `json:"topology"`
66 KubeObject KubeObjectDesc `json:"kube_object,omitempty"`
67 }
68
69
70 type Topology struct {
71 Nodes []Node `json:"nodes"`
72 }
73
74 type NodeState string
75
76 const (
77 NodeRunning NodeState = "Running"
78 NodeSucceed NodeState = "Succeed"
79 NodeFailed NodeState = "Failed"
80 )
81
82
83 type Node struct {
84 Name string `json:"name"`
85 Type NodeType `json:"type"`
86 State NodeState `json:"state"`
87 Serial *NodeSerial `json:"serial,omitempty"`
88 Parallel *NodeParallel `json:"parallel,omitempty"`
89 ConditionalBranches []ConditionalBranch `json:"conditional_branches,omitempty"`
90 Template string `json:"template"`
91 UID string `json:"uid"`
92 }
93
94 type NodeNameWithTemplate struct {
95 Name string `json:"name,omitempty"`
96 Template string `json:"template,omitempty"`
97 }
98
99
100
101
102 type NodeSerial struct {
103 Children []NodeNameWithTemplate `json:"children"`
104 }
105
106
107 type NodeParallel struct {
108 Children []NodeNameWithTemplate `json:"children"`
109 }
110
111 type ConditionalBranch struct {
112 NodeNameWithTemplate `json:",inline,omitempty"`
113 Expression string `json:"expression,omitempty"`
114 }
115
116
117
118
119
120
121
122 type NodeType string
123
124 const (
125
126 ChaosNode NodeType = "ChaosNode"
127
128
129 SerialNode NodeType = "SerialNode"
130
131
132 ParallelNode NodeType = "ParallelNode"
133
134
135 SuspendNode NodeType = "SuspendNode"
136
137
138 TaskNode NodeType = "TaskNode"
139 )
140
141 var nodeTypeTemplateTypeMapping = map[v1alpha1.TemplateType]NodeType{
142 v1alpha1.TypeSerial: SerialNode,
143 v1alpha1.TypeParallel: ParallelNode,
144 v1alpha1.TypeSuspend: SuspendNode,
145 v1alpha1.TypeTask: TaskNode,
146 }
147
148 type KubeWorkflowRepository struct {
149 kubeclient client.Client
150 }
151
152 func NewKubeWorkflowRepository(kubeclient client.Client) *KubeWorkflowRepository {
153 return &KubeWorkflowRepository{kubeclient: kubeclient}
154 }
155
156 func (it *KubeWorkflowRepository) Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
157 err := it.kubeclient.Create(ctx, &workflow)
158 if err != nil {
159 return WorkflowDetail{}, err
160 }
161
162 return it.Get(ctx, workflow.Namespace, workflow.Name)
163 }
164
165 func (it *KubeWorkflowRepository) Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
166 current := v1alpha1.Workflow{}
167
168 err := it.kubeclient.Get(ctx, types.NamespacedName{
169 Namespace: namespace,
170 Name: name,
171 }, ¤t)
172 if err != nil {
173 return WorkflowDetail{}, err
174 }
175 workflow.ObjectMeta.ResourceVersion = current.ObjectMeta.ResourceVersion
176
177 err = it.kubeclient.Update(ctx, &workflow)
178 if err != nil {
179 return WorkflowDetail{}, err
180 }
181
182 return it.Get(ctx, workflow.Namespace, workflow.Name)
183 }
184
185 func (it *KubeWorkflowRepository) ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error) {
186 workflowList := v1alpha1.WorkflowList{}
187
188 err := it.kubeclient.List(ctx, &workflowList, &client.ListOptions{
189 Namespace: namespace,
190 })
191 if err != nil {
192 return nil, err
193 }
194
195 var result []WorkflowMeta
196 for _, item := range workflowList.Items {
197 result = append(result, convertWorkflow(item))
198 }
199
200 return result, nil
201 }
202
203 func (it *KubeWorkflowRepository) List(ctx context.Context) ([]WorkflowMeta, error) {
204 return it.ListByNamespace(ctx, "")
205 }
206
207 func (it *KubeWorkflowRepository) Get(ctx context.Context, namespace, name string) (WorkflowDetail, error) {
208 kubeWorkflow := v1alpha1.Workflow{}
209
210 err := it.kubeclient.Get(ctx, types.NamespacedName{
211 Namespace: namespace,
212 Name: name,
213 }, &kubeWorkflow)
214 if err != nil {
215 return WorkflowDetail{}, err
216 }
217
218 workflowNodes := v1alpha1.WorkflowNodeList{}
219
220 selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
221 MatchLabels: map[string]string{
222 v1alpha1.LabelWorkflow: kubeWorkflow.Name,
223 },
224 })
225 if err != nil {
226 return WorkflowDetail{}, err
227 }
228
229 err = it.kubeclient.List(ctx, &workflowNodes, &client.ListOptions{
230 Namespace: namespace,
231 LabelSelector: selector,
232 })
233 if err != nil {
234 return WorkflowDetail{}, err
235 }
236
237 return convertWorkflowDetail(kubeWorkflow, workflowNodes.Items)
238 }
239
240 func (it *KubeWorkflowRepository) Delete(ctx context.Context, namespace, name string) error {
241 kubeWorkflow := v1alpha1.Workflow{}
242
243 err := it.kubeclient.Get(ctx, types.NamespacedName{
244 Namespace: namespace,
245 Name: name,
246 }, &kubeWorkflow)
247 if err != nil {
248 return err
249 }
250
251 return it.kubeclient.Delete(ctx, &kubeWorkflow)
252 }
253
254 func convertWorkflow(kubeWorkflow v1alpha1.Workflow) WorkflowMeta {
255 result := WorkflowMeta{
256 Namespace: kubeWorkflow.Namespace,
257 Name: kubeWorkflow.Name,
258 Entry: kubeWorkflow.Spec.Entry,
259 UID: string(kubeWorkflow.UID),
260 }
261
262 if kubeWorkflow.Status.StartTime != nil {
263 result.CreatedAt = kubeWorkflow.Status.StartTime.Time
264 }
265
266 if kubeWorkflow.Status.EndTime != nil {
267 result.EndTime = kubeWorkflow.Status.EndTime.Format(time.RFC3339)
268 }
269
270 if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionAccomplished, corev1.ConditionTrue) {
271 result.Status = WorkflowSucceed
272 } else if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionScheduled, corev1.ConditionTrue) {
273 result.Status = WorkflowRunning
274 } else {
275 result.Status = WorkflowUnknown
276 }
277
278
279
280 return result
281 }
282
283 func convertWorkflowDetail(kubeWorkflow v1alpha1.Workflow, kubeNodes []v1alpha1.WorkflowNode) (WorkflowDetail, error) {
284 nodes := make([]Node, 0)
285
286 for _, item := range kubeNodes {
287 node, err := convertWorkflowNode(item)
288 if err != nil {
289 return WorkflowDetail{}, nil
290 }
291
292 nodes = append(nodes, node)
293 }
294
295 result := WorkflowDetail{
296 WorkflowMeta: convertWorkflow(kubeWorkflow),
297 Topology: Topology{
298 Nodes: nodes,
299 },
300 KubeObject: KubeObjectDesc{
301 TypeMeta: kubeWorkflow.TypeMeta,
302 Meta: KubeObjectMeta{
303 Name: kubeWorkflow.Name,
304 Namespace: kubeWorkflow.Namespace,
305 Labels: kubeWorkflow.Labels,
306 Annotations: kubeWorkflow.Annotations,
307 },
308 Spec: kubeWorkflow.Spec,
309 },
310 }
311
312 return result, nil
313 }
314
315 func convertWorkflowNode(kubeWorkflowNode v1alpha1.WorkflowNode) (Node, error) {
316 templateType, err := mappingTemplateType(kubeWorkflowNode.Spec.Type)
317 if err != nil {
318 return Node{}, err
319 }
320
321 result := Node{
322 Name: kubeWorkflowNode.Name,
323 Type: templateType,
324 Serial: nil,
325 Parallel: nil,
326 Template: kubeWorkflowNode.Spec.TemplateName,
327 UID: string(kubeWorkflowNode.UID),
328 }
329
330 if kubeWorkflowNode.Spec.Type == v1alpha1.TypeSerial {
331 var nodes []string
332 for _, child := range kubeWorkflowNode.Status.FinishedChildren {
333 nodes = append(nodes, child.Name)
334 }
335 for _, child := range kubeWorkflowNode.Status.ActiveChildren {
336 nodes = append(nodes, child.Name)
337 }
338 result.Serial = &NodeSerial{
339 Children: composeSerialTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes),
340 }
341 } else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeParallel {
342 var nodes []string
343 for _, child := range kubeWorkflowNode.Status.FinishedChildren {
344 nodes = append(nodes, child.Name)
345 }
346 for _, child := range kubeWorkflowNode.Status.ActiveChildren {
347 nodes = append(nodes, child.Name)
348 }
349 result.Parallel = &NodeParallel{
350 Children: composeParallelTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes),
351 }
352 } else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeTask {
353 var nodes []string
354 for _, child := range kubeWorkflowNode.Status.FinishedChildren {
355 nodes = append(nodes, child.Name)
356 }
357 for _, child := range kubeWorkflowNode.Status.ActiveChildren {
358 nodes = append(nodes, child.Name)
359 }
360 result.ConditionalBranches = composeTaskConditionalBranches(kubeWorkflowNode.Spec.ConditionalBranches, nodes)
361 }
362
363 if wfcontrollers.WorkflowNodeFinished(kubeWorkflowNode.Status) {
364 result.State = NodeSucceed
365 } else {
366 result.State = NodeRunning
367 }
368
369 return result, nil
370 }
371
372
373 func composeSerialTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
374 var result []NodeNameWithTemplate
375 for _, node := range nodes {
376
377 templateName := node[0:strings.LastIndex(node, "-")]
378 result = append(result, NodeNameWithTemplate{Name: node, Template: templateName})
379 }
380 for _, task := range children[len(nodes):] {
381 result = append(result, NodeNameWithTemplate{Template: task})
382 }
383 return result
384 }
385
386 func composeParallelTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
387 var result []NodeNameWithTemplate
388 for _, task := range children {
389 result = append(result, NodeNameWithTemplate{
390 Name: "",
391 Template: task,
392 })
393 }
394 for _, node := range nodes {
395 for i, item := range result {
396 if len(item.Name) == 0 && strings.HasPrefix(node, item.Template) {
397 result[i].Name = node
398 break
399 }
400 }
401 }
402 return result
403 }
404
405 func composeTaskConditionalBranches(conditionalBranches []v1alpha1.ConditionalBranch, nodes []string) []ConditionalBranch {
406 var result []ConditionalBranch
407 for _, item := range conditionalBranches {
408 nodeName := ""
409 for _, node := range nodes {
410 if strings.HasPrefix(node, item.Target) {
411 nodeName = node
412 }
413 }
414 result = append(result,
415 ConditionalBranch{
416 NodeNameWithTemplate: NodeNameWithTemplate{
417 Name: nodeName,
418 Template: item.Target,
419 },
420 Expression: item.Expression,
421 })
422 }
423
424 return result
425 }
426
427 func mappingTemplateType(templateType v1alpha1.TemplateType) (NodeType, error) {
428 if v1alpha1.IsChaosTemplateType(templateType) {
429 return ChaosNode, nil
430 } else if target, ok := nodeTypeTemplateTypeMapping[templateType]; ok {
431 return target, nil
432 } else {
433 return "", errors.Errorf("can not resolve such type called %s", templateType)
434 }
435 }
436
437
438 type WorkflowStore interface {
439 List(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowEntity, error)
440 ListMeta(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowMeta, error)
441 FindByID(ctx context.Context, ID uint) (*WorkflowEntity, error)
442 FindByUID(ctx context.Context, UID string) (*WorkflowEntity, error)
443 FindMetaByUID(ctx context.Context, UID string) (*WorkflowMeta, error)
444 Save(ctx context.Context, entity *WorkflowEntity) error
445 DeleteByUID(ctx context.Context, UID string) error
446 DeleteByUIDs(ctx context.Context, UIDs []string) error
447 MarkAsArchived(ctx context.Context, namespace, name string) error
448 MarkAsArchivedWithUID(ctx context.Context, UID string) error
449 }
450
451
452 type WorkflowEntity struct {
453 WorkflowMeta
454 Workflow string `gorm:"size:32768"`
455 }
456
457 func WorkflowCR2WorkflowEntity(workflow *v1alpha1.Workflow) (*WorkflowEntity, error) {
458 if workflow == nil {
459 return nil, nil
460
461 }
462 jsonContent, err := json.Marshal(workflow)
463 if err != nil {
464 return nil, err
465 }
466 return &WorkflowEntity{
467 WorkflowMeta: convertWorkflow(*workflow),
468 Workflow: string(jsonContent),
469 }, nil
470
471 }
472
473 func WorkflowEntity2WorkflowCR(entity *WorkflowEntity) (*v1alpha1.Workflow, error) {
474 if entity == nil {
475 return nil, nil
476 }
477 result := v1alpha1.Workflow{}
478 err := json.Unmarshal([]byte(entity.Workflow), &result)
479 if err != nil {
480 return nil, err
481 }
482 return &result, nil
483 }
484
485 func WorkflowEntity2WorkflowDetail(entity *WorkflowEntity) (*WorkflowDetail, error) {
486 workflowCustomResource, err := WorkflowEntity2WorkflowCR(entity)
487 if err != nil {
488 return nil, err
489 }
490 return &WorkflowDetail{
491 WorkflowMeta: entity.WorkflowMeta,
492 KubeObject: KubeObjectDesc{
493 TypeMeta: workflowCustomResource.TypeMeta,
494 Meta: KubeObjectMeta{
495 Name: workflowCustomResource.Name,
496 Namespace: workflowCustomResource.Namespace,
497 Labels: workflowCustomResource.Labels,
498 Annotations: workflowCustomResource.Annotations,
499 },
500 Spec: workflowCustomResource.Spec,
501 },
502 }, nil
503 }
504