1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package core
17
18 import (
19 "context"
20 "encoding/json"
21 "strings"
22 "time"
23
24 "github.com/pkg/errors"
25 corev1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/types"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 wfcontrollers "github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers"
32 )
33
34 type WorkflowRepository interface {
35 List(ctx context.Context) ([]WorkflowMeta, error)
36 ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error)
37 Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error)
38 Get(ctx context.Context, namespace, name string) (WorkflowDetail, error)
39 Delete(ctx context.Context, namespace, name string) error
40 Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error)
41 }
42
43 type WorkflowStatus string
44
45 const (
46 WorkflowRunning WorkflowStatus = "running"
47 WorkflowSucceed WorkflowStatus = "finished"
48 WorkflowFailed WorkflowStatus = "failed"
49 WorkflowUnknown WorkflowStatus = "unknown"
50 )
51
52
53 type WorkflowMeta struct {
54 ID uint `gorm:"primary_key" json:"id"`
55 UID string `gorm:"index:workflow_uid" json:"uid"`
56 Namespace string `json:"namespace"`
57 Name string `json:"name"`
58 Entry string `json:"entry"`
59 CreatedAt time.Time `json:"created_at"`
60
61 FinishTime *time.Time `json:"finish_time"`
62
63 EndTime string `json:"end_time"`
64 Status WorkflowStatus `json:"status,omitempty"`
65 Archived bool `json:"-"`
66 }
67
68 type WorkflowDetail struct {
69 WorkflowMeta `json:",inline"`
70 Topology Topology `json:"topology"`
71 KubeObject KubeObjectDesc `json:"kube_object,omitempty"`
72 }
73
74
75 type Topology struct {
76 Nodes []Node `json:"nodes"`
77 }
78
79 type NodeState string
80
81 const (
82 NodeRunning NodeState = "Running"
83 NodeSucceed NodeState = "Succeed"
84 NodeFailed NodeState = "Failed"
85 )
86
87
88 type Node struct {
89 Name string `json:"name"`
90 Type NodeType `json:"type"`
91 State NodeState `json:"state"`
92 Serial []NodeNameWithTemplate `json:"serial,omitempty"`
93 Parallel []NodeNameWithTemplate `json:"parallel,omitempty"`
94 ConditionalBranches []ConditionalBranch `json:"conditional_branches,omitempty"`
95 Template string `json:"template"`
96 UID string `json:"uid"`
97 }
98
99 type NodeNameWithTemplate struct {
100 Name string `json:"name,omitempty"`
101 Template string `json:"template,omitempty"`
102 }
103
104 type ConditionalBranch struct {
105 NodeNameWithTemplate `json:",inline,omitempty"`
106 Expression string `json:"expression,omitempty"`
107 }
108
109
110
111
112
113
114
115 type NodeType string
116
117 const (
118
119 ChaosNode NodeType = "ChaosNode"
120
121
122 SerialNode NodeType = "SerialNode"
123
124
125 ParallelNode NodeType = "ParallelNode"
126
127
128 SuspendNode NodeType = "SuspendNode"
129
130
131 TaskNode NodeType = "TaskNode"
132 )
133
134 var nodeTypeTemplateTypeMapping = map[v1alpha1.TemplateType]NodeType{
135 v1alpha1.TypeSerial: SerialNode,
136 v1alpha1.TypeParallel: ParallelNode,
137 v1alpha1.TypeSuspend: SuspendNode,
138 v1alpha1.TypeTask: TaskNode,
139 }
140
141 type KubeWorkflowRepository struct {
142 kubeclient client.Client
143 }
144
145 func NewKubeWorkflowRepository(kubeclient client.Client) *KubeWorkflowRepository {
146 return &KubeWorkflowRepository{kubeclient: kubeclient}
147 }
148
149 func (it *KubeWorkflowRepository) Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
150 err := it.kubeclient.Create(ctx, &workflow)
151 if err != nil {
152 return WorkflowDetail{}, err
153 }
154
155 return it.Get(ctx, workflow.Namespace, workflow.Name)
156 }
157
158 func (it *KubeWorkflowRepository) Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
159 current := v1alpha1.Workflow{}
160
161 err := it.kubeclient.Get(ctx, types.NamespacedName{
162 Namespace: namespace,
163 Name: name,
164 }, ¤t)
165 if err != nil {
166 return WorkflowDetail{}, err
167 }
168 workflow.ObjectMeta.ResourceVersion = current.ObjectMeta.ResourceVersion
169
170 err = it.kubeclient.Update(ctx, &workflow)
171 if err != nil {
172 return WorkflowDetail{}, err
173 }
174
175 return it.Get(ctx, workflow.Namespace, workflow.Name)
176 }
177
178 func (it *KubeWorkflowRepository) ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error) {
179 workflowList := v1alpha1.WorkflowList{}
180
181 err := it.kubeclient.List(ctx, &workflowList, &client.ListOptions{
182 Namespace: namespace,
183 })
184 if err != nil {
185 return nil, err
186 }
187
188 var result []WorkflowMeta
189 for _, item := range workflowList.Items {
190 result = append(result, convertWorkflow(item))
191 }
192
193 return result, nil
194 }
195
196 func (it *KubeWorkflowRepository) List(ctx context.Context) ([]WorkflowMeta, error) {
197 return it.ListByNamespace(ctx, "")
198 }
199
200 func (it *KubeWorkflowRepository) Get(ctx context.Context, namespace, name string) (WorkflowDetail, error) {
201 kubeWorkflow := v1alpha1.Workflow{}
202
203 err := it.kubeclient.Get(ctx, types.NamespacedName{
204 Namespace: namespace,
205 Name: name,
206 }, &kubeWorkflow)
207 if err != nil {
208 return WorkflowDetail{}, err
209 }
210
211 workflowNodes := v1alpha1.WorkflowNodeList{}
212
213 selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
214 MatchLabels: map[string]string{
215 v1alpha1.LabelWorkflow: kubeWorkflow.Name,
216 },
217 })
218 if err != nil {
219 return WorkflowDetail{}, err
220 }
221
222 err = it.kubeclient.List(ctx, &workflowNodes, &client.ListOptions{
223 Namespace: namespace,
224 LabelSelector: selector,
225 })
226 if err != nil {
227 return WorkflowDetail{}, err
228 }
229
230 return convertWorkflowDetail(kubeWorkflow, workflowNodes.Items)
231 }
232
233 func (it *KubeWorkflowRepository) Delete(ctx context.Context, namespace, name string) error {
234 kubeWorkflow := v1alpha1.Workflow{}
235
236 err := it.kubeclient.Get(ctx, types.NamespacedName{
237 Namespace: namespace,
238 Name: name,
239 }, &kubeWorkflow)
240 if err != nil {
241 return err
242 }
243
244 return it.kubeclient.Delete(ctx, &kubeWorkflow)
245 }
246
247 func convertWorkflow(kubeWorkflow v1alpha1.Workflow) WorkflowMeta {
248 result := WorkflowMeta{
249 Namespace: kubeWorkflow.Namespace,
250 Name: kubeWorkflow.Name,
251 Entry: kubeWorkflow.Spec.Entry,
252 UID: string(kubeWorkflow.UID),
253 }
254
255 if kubeWorkflow.Status.StartTime != nil {
256 result.CreatedAt = kubeWorkflow.Status.StartTime.Time
257 }
258
259 if kubeWorkflow.Status.EndTime != nil {
260 result.EndTime = kubeWorkflow.Status.EndTime.Format(time.RFC3339)
261 }
262
263 if kubeWorkflow.GetDeletionTimestamp() != nil {
264 result.FinishTime = &kubeWorkflow.GetDeletionTimestamp().Time
265 }
266
267 if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionAccomplished, corev1.ConditionTrue) {
268 result.Status = WorkflowSucceed
269 } else if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionScheduled, corev1.ConditionTrue) {
270 result.Status = WorkflowRunning
271 } else {
272 result.Status = WorkflowUnknown
273 }
274
275
276
277 return result
278 }
279
280 func convertWorkflowDetail(kubeWorkflow v1alpha1.Workflow, kubeNodes []v1alpha1.WorkflowNode) (WorkflowDetail, error) {
281 nodes := make([]Node, 0)
282
283 for _, item := range kubeNodes {
284 node, err := convertWorkflowNode(item)
285 if err != nil {
286 return WorkflowDetail{}, nil
287 }
288
289 nodes = append(nodes, node)
290 }
291
292 result := WorkflowDetail{
293 WorkflowMeta: convertWorkflow(kubeWorkflow),
294 Topology: Topology{
295 Nodes: nodes,
296 },
297 KubeObject: KubeObjectDesc{
298 TypeMeta: kubeWorkflow.TypeMeta,
299 Meta: KubeObjectMeta{
300 Name: kubeWorkflow.Name,
301 Namespace: kubeWorkflow.Namespace,
302 Labels: kubeWorkflow.Labels,
303 Annotations: kubeWorkflow.Annotations,
304 },
305 Spec: kubeWorkflow.Spec,
306 },
307 }
308
309 return result, nil
310 }
311
312 func convertWorkflowNode(kubeWorkflowNode v1alpha1.WorkflowNode) (Node, error) {
313 templateType, err := mappingTemplateType(kubeWorkflowNode.Spec.Type)
314 if err != nil {
315 return Node{}, err
316 }
317
318 result := Node{
319 Name: kubeWorkflowNode.Name,
320 Type: templateType,
321 Serial: nil,
322 Parallel: nil,
323 Template: kubeWorkflowNode.Spec.TemplateName,
324 UID: string(kubeWorkflowNode.UID),
325 }
326
327 if kubeWorkflowNode.Spec.Type == v1alpha1.TypeSerial {
328 var nodes []string
329 for _, child := range kubeWorkflowNode.Status.FinishedChildren {
330 nodes = append(nodes, child.Name)
331 }
332 for _, child := range kubeWorkflowNode.Status.ActiveChildren {
333 nodes = append(nodes, child.Name)
334 }
335 result.Serial = composeSerialTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes)
336
337 } else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeParallel {
338 var nodes []string
339 for _, child := range kubeWorkflowNode.Status.FinishedChildren {
340 nodes = append(nodes, child.Name)
341 }
342 for _, child := range kubeWorkflowNode.Status.ActiveChildren {
343 nodes = append(nodes, child.Name)
344 }
345 result.Parallel = composeParallelTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes)
346
347 } else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeTask {
348 var nodes []string
349 for _, child := range kubeWorkflowNode.Status.FinishedChildren {
350 nodes = append(nodes, child.Name)
351 }
352 for _, child := range kubeWorkflowNode.Status.ActiveChildren {
353 nodes = append(nodes, child.Name)
354 }
355 result.ConditionalBranches = composeTaskConditionalBranches(kubeWorkflowNode.Spec.ConditionalBranches, nodes)
356 }
357
358 if wfcontrollers.WorkflowNodeFinished(kubeWorkflowNode.Status) {
359 result.State = NodeSucceed
360 } else {
361 result.State = NodeRunning
362 }
363
364 return result, nil
365 }
366
367
368 func composeSerialTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
369 var result []NodeNameWithTemplate
370 for _, node := range nodes {
371
372 templateName := node[0:strings.LastIndex(node, "-")]
373 result = append(result, NodeNameWithTemplate{Name: node, Template: templateName})
374 }
375 for _, task := range children[len(nodes):] {
376 result = append(result, NodeNameWithTemplate{Template: task})
377 }
378 return result
379 }
380
381 func composeParallelTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
382 var result []NodeNameWithTemplate
383 for _, task := range children {
384 result = append(result, NodeNameWithTemplate{
385 Name: "",
386 Template: task,
387 })
388 }
389 for _, node := range nodes {
390 for i, item := range result {
391 if len(item.Name) == 0 && strings.HasPrefix(node, item.Template) {
392 result[i].Name = node
393 break
394 }
395 }
396 }
397 return result
398 }
399
400 func composeTaskConditionalBranches(conditionalBranches []v1alpha1.ConditionalBranch, nodes []string) []ConditionalBranch {
401 var result []ConditionalBranch
402 for _, item := range conditionalBranches {
403 nodeName := ""
404 for _, node := range nodes {
405 if strings.HasPrefix(node, item.Target) {
406 nodeName = node
407 }
408 }
409 result = append(result,
410 ConditionalBranch{
411 NodeNameWithTemplate: NodeNameWithTemplate{
412 Name: nodeName,
413 Template: item.Target,
414 },
415 Expression: item.Expression,
416 })
417 }
418
419 return result
420 }
421
422 func mappingTemplateType(templateType v1alpha1.TemplateType) (NodeType, error) {
423 if v1alpha1.IsChaosTemplateType(templateType) {
424 return ChaosNode, nil
425 } else if target, ok := nodeTypeTemplateTypeMapping[templateType]; ok {
426 return target, nil
427 } else {
428 return "", errors.Errorf("can not resolve such type called %s", templateType)
429 }
430 }
431
432
433 type WorkflowStore interface {
434 List(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowEntity, error)
435 ListMeta(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowMeta, error)
436 FindByID(ctx context.Context, ID uint) (*WorkflowEntity, error)
437 FindByUID(ctx context.Context, UID string) (*WorkflowEntity, error)
438 FindMetaByUID(ctx context.Context, UID string) (*WorkflowMeta, error)
439 Save(ctx context.Context, entity *WorkflowEntity) error
440 DeleteByUID(ctx context.Context, UID string) error
441 DeleteByUIDs(ctx context.Context, UIDs []string) error
442 DeleteByFinishTime(ctx context.Context, ttl time.Duration) error
443 MarkAsArchived(ctx context.Context, namespace, name string) error
444 MarkAsArchivedWithUID(ctx context.Context, UID string) error
445 }
446
447
448 type WorkflowEntity struct {
449 WorkflowMeta
450 Workflow string `gorm:"type:text;size:32768"`
451 }
452
453 func WorkflowCR2WorkflowEntity(workflow *v1alpha1.Workflow) (*WorkflowEntity, error) {
454 if workflow == nil {
455 return nil, nil
456
457 }
458 jsonContent, err := json.Marshal(workflow)
459 if err != nil {
460 return nil, err
461 }
462
463 return &WorkflowEntity{
464 WorkflowMeta: convertWorkflow(*workflow),
465 Workflow: string(jsonContent),
466 }, nil
467
468 }
469
470 func WorkflowEntity2WorkflowCR(entity *WorkflowEntity) (*v1alpha1.Workflow, error) {
471 if entity == nil {
472 return nil, nil
473 }
474 result := v1alpha1.Workflow{}
475 err := json.Unmarshal([]byte(entity.Workflow), &result)
476 if err != nil {
477 return nil, err
478 }
479 return &result, nil
480 }
481
482 func WorkflowEntity2WorkflowDetail(entity *WorkflowEntity) (*WorkflowDetail, error) {
483 workflowCustomResource, err := WorkflowEntity2WorkflowCR(entity)
484 if err != nil {
485 return nil, err
486 }
487 return &WorkflowDetail{
488 WorkflowMeta: entity.WorkflowMeta,
489 KubeObject: KubeObjectDesc{
490 TypeMeta: workflowCustomResource.TypeMeta,
491 Meta: KubeObjectMeta{
492 Name: workflowCustomResource.Name,
493 Namespace: workflowCustomResource.Namespace,
494 Labels: workflowCustomResource.Labels,
495 Annotations: workflowCustomResource.Annotations,
496 },
497 Spec: workflowCustomResource.Spec,
498 },
499 }, nil
500 }
501