...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/core/workflow.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/core

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package core
    15  
    16  import (
    17  	"context"
    18  	"encoding/json"
    19  	"strings"
    20  	"time"
    21  
    22  	"github.com/pkg/errors"
    23  	corev1 "k8s.io/api/core/v1"
    24  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	"sigs.k8s.io/controller-runtime/pkg/client"
    27  
    28  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    29  	wfcontrollers "github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers"
    30  )
    31  
    32  type WorkflowRepository interface {
    33  	List(ctx context.Context) ([]WorkflowMeta, error)
    34  	ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error)
    35  	Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error)
    36  	Get(ctx context.Context, namespace, name string) (WorkflowDetail, error)
    37  	Delete(ctx context.Context, namespace, name string) error
    38  	Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error)
    39  }
    40  
    41  type WorkflowStatus string
    42  
    43  const (
    44  	WorkflowRunning WorkflowStatus = "running"
    45  	WorkflowSucceed WorkflowStatus = "finished"
    46  	WorkflowFailed  WorkflowStatus = "failed"
    47  	WorkflowUnknown WorkflowStatus = "unknown"
    48  )
    49  
    50  // WorkflowMeta defines the root structure of a workflow.
    51  type WorkflowMeta struct {
    52  	ID        uint           `gorm:"primary_key" json:"id"`
    53  	UID       string         `gorm:"index:workflow_uid" json:"uid"`
    54  	Namespace string         `json:"namespace"`
    55  	Name      string         `json:"name"`
    56  	Entry     string         `json:"entry"` // the entry node name
    57  	CreatedAt time.Time      `json:"created_at"`
    58  	EndTime   string         `json:"end_time"`
    59  	Status    WorkflowStatus `json:"status,omitempty"`
    60  	Archived  bool           `json:"-"`
    61  }
    62  
    63  type WorkflowDetail struct {
    64  	WorkflowMeta `json:",inline"`
    65  	Topology     Topology       `json:"topology"`
    66  	KubeObject   KubeObjectDesc `json:"kube_object,omitempty"`
    67  }
    68  
    69  // Topology describes the process of a workflow.
    70  type Topology struct {
    71  	Nodes []Node `json:"nodes"`
    72  }
    73  
    74  type NodeState string
    75  
    76  const (
    77  	NodeRunning NodeState = "Running"
    78  	NodeSucceed NodeState = "Succeed"
    79  	NodeFailed  NodeState = "Failed"
    80  )
    81  
    82  // Node defines a single step of a workflow.
    83  type Node struct {
    84  	Name                string              `json:"name"`
    85  	Type                NodeType            `json:"type"`
    86  	State               NodeState           `json:"state"`
    87  	Serial              *NodeSerial         `json:"serial,omitempty"`
    88  	Parallel            *NodeParallel       `json:"parallel,omitempty"`
    89  	ConditionalBranches []ConditionalBranch `json:"conditional_branches,omitempty"`
    90  	Template            string              `json:"template"`
    91  	UID                 string              `json:"uid"`
    92  }
    93  
    94  type NodeNameWithTemplate struct {
    95  	Name     string `json:"name,omitempty"`
    96  	Template string `json:"template,omitempty"`
    97  }
    98  
    99  // FIXME: unnecessary another wrap of []NodeNameWithTemplate, like struct NodeSerial and NodeParallel
   100  
   101  // NodeSerial defines SerialNode's specific fields.
   102  type NodeSerial struct {
   103  	Children []NodeNameWithTemplate `json:"children"`
   104  }
   105  
   106  // NodeParallel defines ParallelNode's specific fields.
   107  type NodeParallel struct {
   108  	Children []NodeNameWithTemplate `json:"children"`
   109  }
   110  
   111  type ConditionalBranch struct {
   112  	NodeNameWithTemplate `json:",inline,omitempty"`
   113  	Expression           string `json:"expression,omitempty"`
   114  }
   115  
   116  // NodeType represents the type of a workflow node.
   117  //
   118  // There will be five types can be referred as NodeType:
   119  // ChaosNode, SerialNode, ParallelNode, SuspendNode, TaskNode.
   120  //
   121  // Const definitions can be found below this type.
   122  type NodeType string
   123  
   124  const (
   125  	// ChaosNode represents a node will perform a single Chaos Experiment.
   126  	ChaosNode NodeType = "ChaosNode"
   127  
   128  	// SerialNode represents a node that will perform continuous templates.
   129  	SerialNode NodeType = "SerialNode"
   130  
   131  	// ParallelNode represents a node that will perform parallel templates.
   132  	ParallelNode NodeType = "ParallelNode"
   133  
   134  	// SuspendNode represents a node that will perform wait operation.
   135  	SuspendNode NodeType = "SuspendNode"
   136  
   137  	// TaskNode represents a node that will perform user-defined task.
   138  	TaskNode NodeType = "TaskNode"
   139  )
   140  
   141  var nodeTypeTemplateTypeMapping = map[v1alpha1.TemplateType]NodeType{
   142  	v1alpha1.TypeSerial:   SerialNode,
   143  	v1alpha1.TypeParallel: ParallelNode,
   144  	v1alpha1.TypeSuspend:  SuspendNode,
   145  	v1alpha1.TypeTask:     TaskNode,
   146  }
   147  
   148  type KubeWorkflowRepository struct {
   149  	kubeclient client.Client
   150  }
   151  
   152  func NewKubeWorkflowRepository(kubeclient client.Client) *KubeWorkflowRepository {
   153  	return &KubeWorkflowRepository{kubeclient: kubeclient}
   154  }
   155  
   156  func (it *KubeWorkflowRepository) Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
   157  	err := it.kubeclient.Create(ctx, &workflow)
   158  	if err != nil {
   159  		return WorkflowDetail{}, err
   160  	}
   161  
   162  	return it.Get(ctx, workflow.Namespace, workflow.Name)
   163  }
   164  
   165  func (it *KubeWorkflowRepository) Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
   166  	current := v1alpha1.Workflow{}
   167  
   168  	err := it.kubeclient.Get(ctx, types.NamespacedName{
   169  		Namespace: namespace,
   170  		Name:      name,
   171  	}, &current)
   172  	if err != nil {
   173  		return WorkflowDetail{}, err
   174  	}
   175  	workflow.ObjectMeta.ResourceVersion = current.ObjectMeta.ResourceVersion
   176  
   177  	err = it.kubeclient.Update(ctx, &workflow)
   178  	if err != nil {
   179  		return WorkflowDetail{}, err
   180  	}
   181  
   182  	return it.Get(ctx, workflow.Namespace, workflow.Name)
   183  }
   184  
   185  func (it *KubeWorkflowRepository) ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error) {
   186  	workflowList := v1alpha1.WorkflowList{}
   187  
   188  	err := it.kubeclient.List(ctx, &workflowList, &client.ListOptions{
   189  		Namespace: namespace,
   190  	})
   191  	if err != nil {
   192  		return nil, err
   193  	}
   194  
   195  	var result []WorkflowMeta
   196  	for _, item := range workflowList.Items {
   197  		result = append(result, convertWorkflow(item))
   198  	}
   199  
   200  	return result, nil
   201  }
   202  
   203  func (it *KubeWorkflowRepository) List(ctx context.Context) ([]WorkflowMeta, error) {
   204  	return it.ListByNamespace(ctx, "")
   205  }
   206  
   207  func (it *KubeWorkflowRepository) Get(ctx context.Context, namespace, name string) (WorkflowDetail, error) {
   208  	kubeWorkflow := v1alpha1.Workflow{}
   209  
   210  	err := it.kubeclient.Get(ctx, types.NamespacedName{
   211  		Namespace: namespace,
   212  		Name:      name,
   213  	}, &kubeWorkflow)
   214  	if err != nil {
   215  		return WorkflowDetail{}, err
   216  	}
   217  
   218  	workflowNodes := v1alpha1.WorkflowNodeList{}
   219  	// labeling workflow nodes, see pkg/workflow/controllers/new_node.go
   220  	selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   221  		MatchLabels: map[string]string{
   222  			v1alpha1.LabelWorkflow: kubeWorkflow.Name,
   223  		},
   224  	})
   225  	if err != nil {
   226  		return WorkflowDetail{}, err
   227  	}
   228  
   229  	err = it.kubeclient.List(ctx, &workflowNodes, &client.ListOptions{
   230  		Namespace:     namespace,
   231  		LabelSelector: selector,
   232  	})
   233  	if err != nil {
   234  		return WorkflowDetail{}, err
   235  	}
   236  
   237  	return convertWorkflowDetail(kubeWorkflow, workflowNodes.Items)
   238  }
   239  
   240  func (it *KubeWorkflowRepository) Delete(ctx context.Context, namespace, name string) error {
   241  	kubeWorkflow := v1alpha1.Workflow{}
   242  
   243  	err := it.kubeclient.Get(ctx, types.NamespacedName{
   244  		Namespace: namespace,
   245  		Name:      name,
   246  	}, &kubeWorkflow)
   247  	if err != nil {
   248  		return err
   249  	}
   250  
   251  	return it.kubeclient.Delete(ctx, &kubeWorkflow)
   252  }
   253  
   254  func convertWorkflow(kubeWorkflow v1alpha1.Workflow) WorkflowMeta {
   255  	result := WorkflowMeta{
   256  		Namespace: kubeWorkflow.Namespace,
   257  		Name:      kubeWorkflow.Name,
   258  		Entry:     kubeWorkflow.Spec.Entry,
   259  		UID:       string(kubeWorkflow.UID),
   260  	}
   261  
   262  	if kubeWorkflow.Status.StartTime != nil {
   263  		result.CreatedAt = kubeWorkflow.Status.StartTime.Time
   264  	}
   265  
   266  	if kubeWorkflow.Status.EndTime != nil {
   267  		result.EndTime = kubeWorkflow.Status.EndTime.Format(time.RFC3339)
   268  	}
   269  
   270  	if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionAccomplished, corev1.ConditionTrue) {
   271  		result.Status = WorkflowSucceed
   272  	} else if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionScheduled, corev1.ConditionTrue) {
   273  		result.Status = WorkflowRunning
   274  	} else {
   275  		result.Status = WorkflowUnknown
   276  	}
   277  
   278  	// TODO: status failed
   279  
   280  	return result
   281  }
   282  
   283  func convertWorkflowDetail(kubeWorkflow v1alpha1.Workflow, kubeNodes []v1alpha1.WorkflowNode) (WorkflowDetail, error) {
   284  	nodes := make([]Node, 0)
   285  
   286  	for _, item := range kubeNodes {
   287  		node, err := convertWorkflowNode(item)
   288  		if err != nil {
   289  			return WorkflowDetail{}, nil
   290  		}
   291  
   292  		nodes = append(nodes, node)
   293  	}
   294  
   295  	result := WorkflowDetail{
   296  		WorkflowMeta: convertWorkflow(kubeWorkflow),
   297  		Topology: Topology{
   298  			Nodes: nodes,
   299  		},
   300  		KubeObject: KubeObjectDesc{
   301  			TypeMeta: kubeWorkflow.TypeMeta,
   302  			Meta: KubeObjectMeta{
   303  				Name:        kubeWorkflow.Name,
   304  				Namespace:   kubeWorkflow.Namespace,
   305  				Labels:      kubeWorkflow.Labels,
   306  				Annotations: kubeWorkflow.Annotations,
   307  			},
   308  			Spec: kubeWorkflow.Spec,
   309  		},
   310  	}
   311  
   312  	return result, nil
   313  }
   314  
   315  func convertWorkflowNode(kubeWorkflowNode v1alpha1.WorkflowNode) (Node, error) {
   316  	templateType, err := mappingTemplateType(kubeWorkflowNode.Spec.Type)
   317  	if err != nil {
   318  		return Node{}, err
   319  	}
   320  
   321  	result := Node{
   322  		Name:     kubeWorkflowNode.Name,
   323  		Type:     templateType,
   324  		Serial:   nil,
   325  		Parallel: nil,
   326  		Template: kubeWorkflowNode.Spec.TemplateName,
   327  		UID:      string(kubeWorkflowNode.UID),
   328  	}
   329  
   330  	if kubeWorkflowNode.Spec.Type == v1alpha1.TypeSerial {
   331  		var nodes []string
   332  		for _, child := range kubeWorkflowNode.Status.FinishedChildren {
   333  			nodes = append(nodes, child.Name)
   334  		}
   335  		for _, child := range kubeWorkflowNode.Status.ActiveChildren {
   336  			nodes = append(nodes, child.Name)
   337  		}
   338  		result.Serial = &NodeSerial{
   339  			Children: composeSerialTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes),
   340  		}
   341  	} else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeParallel {
   342  		var nodes []string
   343  		for _, child := range kubeWorkflowNode.Status.FinishedChildren {
   344  			nodes = append(nodes, child.Name)
   345  		}
   346  		for _, child := range kubeWorkflowNode.Status.ActiveChildren {
   347  			nodes = append(nodes, child.Name)
   348  		}
   349  		result.Parallel = &NodeParallel{
   350  			Children: composeParallelTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes),
   351  		}
   352  	} else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeTask {
   353  		var nodes []string
   354  		for _, child := range kubeWorkflowNode.Status.FinishedChildren {
   355  			nodes = append(nodes, child.Name)
   356  		}
   357  		for _, child := range kubeWorkflowNode.Status.ActiveChildren {
   358  			nodes = append(nodes, child.Name)
   359  		}
   360  		result.ConditionalBranches = composeTaskConditionalBranches(kubeWorkflowNode.Spec.ConditionalBranches, nodes)
   361  	}
   362  
   363  	if wfcontrollers.WorkflowNodeFinished(kubeWorkflowNode.Status) {
   364  		result.State = NodeSucceed
   365  	} else {
   366  		result.State = NodeRunning
   367  	}
   368  
   369  	return result, nil
   370  }
   371  
   372  // composeSerialTaskAndNodes need nodes to be ordered with its creation time
   373  func composeSerialTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
   374  	var result []NodeNameWithTemplate
   375  	for _, node := range nodes {
   376  		// TODO: that reverse the generated name, maybe we could use WorkflowNode.TemplateName in the future
   377  		templateName := node[0:strings.LastIndex(node, "-")]
   378  		result = append(result, NodeNameWithTemplate{Name: node, Template: templateName})
   379  	}
   380  	for _, task := range children[len(nodes):] {
   381  		result = append(result, NodeNameWithTemplate{Template: task})
   382  	}
   383  	return result
   384  }
   385  
   386  func composeParallelTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
   387  	var result []NodeNameWithTemplate
   388  	for _, task := range children {
   389  		result = append(result, NodeNameWithTemplate{
   390  			Name:     "",
   391  			Template: task,
   392  		})
   393  	}
   394  	for _, node := range nodes {
   395  		for i, item := range result {
   396  			if len(item.Name) == 0 && strings.HasPrefix(node, item.Template) {
   397  				result[i].Name = node
   398  				break
   399  			}
   400  		}
   401  	}
   402  	return result
   403  }
   404  
   405  func composeTaskConditionalBranches(conditionalBranches []v1alpha1.ConditionalBranch, nodes []string) []ConditionalBranch {
   406  	var result []ConditionalBranch
   407  	for _, item := range conditionalBranches {
   408  		nodeName := ""
   409  		for _, node := range nodes {
   410  			if strings.HasPrefix(node, item.Target) {
   411  				nodeName = node
   412  			}
   413  		}
   414  		result = append(result,
   415  			ConditionalBranch{
   416  				NodeNameWithTemplate: NodeNameWithTemplate{
   417  					Name:     nodeName,
   418  					Template: item.Target,
   419  				},
   420  				Expression: item.Expression,
   421  			})
   422  	}
   423  
   424  	return result
   425  }
   426  
   427  func mappingTemplateType(templateType v1alpha1.TemplateType) (NodeType, error) {
   428  	if v1alpha1.IsChaosTemplateType(templateType) {
   429  		return ChaosNode, nil
   430  	} else if target, ok := nodeTypeTemplateTypeMapping[templateType]; ok {
   431  		return target, nil
   432  	} else {
   433  		return "", errors.Errorf("can not resolve such type called %s", templateType)
   434  	}
   435  }
   436  
   437  // The WorkflowStore of workflow is not so similar with others store.
   438  type WorkflowStore interface {
   439  	List(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowEntity, error)
   440  	ListMeta(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowMeta, error)
   441  	FindByID(ctx context.Context, ID uint) (*WorkflowEntity, error)
   442  	FindByUID(ctx context.Context, UID string) (*WorkflowEntity, error)
   443  	FindMetaByUID(ctx context.Context, UID string) (*WorkflowMeta, error)
   444  	Save(ctx context.Context, entity *WorkflowEntity) error
   445  	DeleteByUID(ctx context.Context, UID string) error
   446  	DeleteByUIDs(ctx context.Context, UIDs []string) error
   447  	MarkAsArchived(ctx context.Context, namespace, name string) error
   448  	MarkAsArchivedWithUID(ctx context.Context, UID string) error
   449  }
   450  
   451  // WorkflowEntity is the gorm entity, refers to a row of data
   452  type WorkflowEntity struct {
   453  	WorkflowMeta
   454  	Workflow string `gorm:"size:32768"`
   455  }
   456  
   457  func WorkflowCR2WorkflowEntity(workflow *v1alpha1.Workflow) (*WorkflowEntity, error) {
   458  	if workflow == nil {
   459  		return nil, nil
   460  
   461  	}
   462  	jsonContent, err := json.Marshal(workflow)
   463  	if err != nil {
   464  		return nil, err
   465  	}
   466  	return &WorkflowEntity{
   467  		WorkflowMeta: convertWorkflow(*workflow),
   468  		Workflow:     string(jsonContent),
   469  	}, nil
   470  
   471  }
   472  
   473  func WorkflowEntity2WorkflowCR(entity *WorkflowEntity) (*v1alpha1.Workflow, error) {
   474  	if entity == nil {
   475  		return nil, nil
   476  	}
   477  	result := v1alpha1.Workflow{}
   478  	err := json.Unmarshal([]byte(entity.Workflow), &result)
   479  	if err != nil {
   480  		return nil, err
   481  	}
   482  	return &result, nil
   483  }
   484  
   485  func WorkflowEntity2WorkflowDetail(entity *WorkflowEntity) (*WorkflowDetail, error) {
   486  	workflowCustomResource, err := WorkflowEntity2WorkflowCR(entity)
   487  	if err != nil {
   488  		return nil, err
   489  	}
   490  	return &WorkflowDetail{
   491  		WorkflowMeta: entity.WorkflowMeta,
   492  		KubeObject: KubeObjectDesc{
   493  			TypeMeta: workflowCustomResource.TypeMeta,
   494  			Meta: KubeObjectMeta{
   495  				Name:        workflowCustomResource.Name,
   496  				Namespace:   workflowCustomResource.Namespace,
   497  				Labels:      workflowCustomResource.Labels,
   498  				Annotations: workflowCustomResource.Annotations,
   499  			},
   500  			Spec: workflowCustomResource.Spec,
   501  		},
   502  	}, nil
   503  }
   504