...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core/workflow.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package core
    17  
    18  import (
    19  	"context"
    20  	"encoding/json"
    21  	"strings"
    22  	"time"
    23  
    24  	"github.com/pkg/errors"
    25  	corev1 "k8s.io/api/core/v1"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    29  
    30  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    31  	wfcontrollers "github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers"
    32  )
    33  
    34  type WorkflowRepository interface {
    35  	List(ctx context.Context) ([]WorkflowMeta, error)
    36  	ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error)
    37  	Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error)
    38  	Get(ctx context.Context, namespace, name string) (WorkflowDetail, error)
    39  	Delete(ctx context.Context, namespace, name string) error
    40  	Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error)
    41  }
    42  
    43  type WorkflowStatus string
    44  
    45  const (
    46  	WorkflowRunning WorkflowStatus = "running"
    47  	WorkflowSucceed WorkflowStatus = "finished"
    48  	WorkflowFailed  WorkflowStatus = "failed"
    49  	WorkflowUnknown WorkflowStatus = "unknown"
    50  )
    51  
    52  // WorkflowMeta defines the root structure of a workflow.
    53  type WorkflowMeta struct {
    54  	ID        uint           `gorm:"primary_key" json:"id"`
    55  	UID       string         `gorm:"index:workflow_uid" json:"uid"`
    56  	Namespace string         `json:"namespace"`
    57  	Name      string         `json:"name"`
    58  	Entry     string         `json:"entry"` // the entry node name
    59  	CreatedAt time.Time      `json:"created_at"`
    60  	EndTime   string         `json:"end_time"`
    61  	Status    WorkflowStatus `json:"status,omitempty"`
    62  	Archived  bool           `json:"-"`
    63  }
    64  
    65  type WorkflowDetail struct {
    66  	WorkflowMeta `json:",inline"`
    67  	Topology     Topology       `json:"topology"`
    68  	KubeObject   KubeObjectDesc `json:"kube_object,omitempty"`
    69  }
    70  
    71  // Topology describes the process of a workflow.
    72  type Topology struct {
    73  	Nodes []Node `json:"nodes"`
    74  }
    75  
    76  type NodeState string
    77  
    78  const (
    79  	NodeRunning NodeState = "Running"
    80  	NodeSucceed NodeState = "Succeed"
    81  	NodeFailed  NodeState = "Failed"
    82  )
    83  
    84  // Node defines a single step of a workflow.
    85  type Node struct {
    86  	Name                string                 `json:"name"`
    87  	Type                NodeType               `json:"type"`
    88  	State               NodeState              `json:"state"`
    89  	Serial              []NodeNameWithTemplate `json:"serial,omitempty"`
    90  	Parallel            []NodeNameWithTemplate `json:"parallel,omitempty"`
    91  	ConditionalBranches []ConditionalBranch    `json:"conditional_branches,omitempty"`
    92  	Template            string                 `json:"template"`
    93  	UID                 string                 `json:"uid"`
    94  }
    95  
    96  type NodeNameWithTemplate struct {
    97  	Name     string `json:"name,omitempty"`
    98  	Template string `json:"template,omitempty"`
    99  }
   100  
   101  type ConditionalBranch struct {
   102  	NodeNameWithTemplate `json:",inline,omitempty"`
   103  	Expression           string `json:"expression,omitempty"`
   104  }
   105  
   106  // NodeType represents the type of a workflow node.
   107  //
   108  // There will be five types can be referred as NodeType:
   109  // ChaosNode, SerialNode, ParallelNode, SuspendNode, TaskNode.
   110  //
   111  // Const definitions can be found below this type.
   112  type NodeType string
   113  
   114  const (
   115  	// ChaosNode represents a node will perform a single Chaos Experiment.
   116  	ChaosNode NodeType = "ChaosNode"
   117  
   118  	// SerialNode represents a node that will perform continuous templates.
   119  	SerialNode NodeType = "SerialNode"
   120  
   121  	// ParallelNode represents a node that will perform parallel templates.
   122  	ParallelNode NodeType = "ParallelNode"
   123  
   124  	// SuspendNode represents a node that will perform wait operation.
   125  	SuspendNode NodeType = "SuspendNode"
   126  
   127  	// TaskNode represents a node that will perform user-defined task.
   128  	TaskNode NodeType = "TaskNode"
   129  )
   130  
   131  var nodeTypeTemplateTypeMapping = map[v1alpha1.TemplateType]NodeType{
   132  	v1alpha1.TypeSerial:   SerialNode,
   133  	v1alpha1.TypeParallel: ParallelNode,
   134  	v1alpha1.TypeSuspend:  SuspendNode,
   135  	v1alpha1.TypeTask:     TaskNode,
   136  }
   137  
   138  type KubeWorkflowRepository struct {
   139  	kubeclient client.Client
   140  }
   141  
   142  func NewKubeWorkflowRepository(kubeclient client.Client) *KubeWorkflowRepository {
   143  	return &KubeWorkflowRepository{kubeclient: kubeclient}
   144  }
   145  
   146  func (it *KubeWorkflowRepository) Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
   147  	err := it.kubeclient.Create(ctx, &workflow)
   148  	if err != nil {
   149  		return WorkflowDetail{}, err
   150  	}
   151  
   152  	return it.Get(ctx, workflow.Namespace, workflow.Name)
   153  }
   154  
   155  func (it *KubeWorkflowRepository) Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
   156  	current := v1alpha1.Workflow{}
   157  
   158  	err := it.kubeclient.Get(ctx, types.NamespacedName{
   159  		Namespace: namespace,
   160  		Name:      name,
   161  	}, &current)
   162  	if err != nil {
   163  		return WorkflowDetail{}, err
   164  	}
   165  	workflow.ObjectMeta.ResourceVersion = current.ObjectMeta.ResourceVersion
   166  
   167  	err = it.kubeclient.Update(ctx, &workflow)
   168  	if err != nil {
   169  		return WorkflowDetail{}, err
   170  	}
   171  
   172  	return it.Get(ctx, workflow.Namespace, workflow.Name)
   173  }
   174  
   175  func (it *KubeWorkflowRepository) ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error) {
   176  	workflowList := v1alpha1.WorkflowList{}
   177  
   178  	err := it.kubeclient.List(ctx, &workflowList, &client.ListOptions{
   179  		Namespace: namespace,
   180  	})
   181  	if err != nil {
   182  		return nil, err
   183  	}
   184  
   185  	var result []WorkflowMeta
   186  	for _, item := range workflowList.Items {
   187  		result = append(result, convertWorkflow(item))
   188  	}
   189  
   190  	return result, nil
   191  }
   192  
   193  func (it *KubeWorkflowRepository) List(ctx context.Context) ([]WorkflowMeta, error) {
   194  	return it.ListByNamespace(ctx, "")
   195  }
   196  
   197  func (it *KubeWorkflowRepository) Get(ctx context.Context, namespace, name string) (WorkflowDetail, error) {
   198  	kubeWorkflow := v1alpha1.Workflow{}
   199  
   200  	err := it.kubeclient.Get(ctx, types.NamespacedName{
   201  		Namespace: namespace,
   202  		Name:      name,
   203  	}, &kubeWorkflow)
   204  	if err != nil {
   205  		return WorkflowDetail{}, err
   206  	}
   207  
   208  	workflowNodes := v1alpha1.WorkflowNodeList{}
   209  	// labeling workflow nodes, see pkg/workflow/controllers/new_node.go
   210  	selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   211  		MatchLabels: map[string]string{
   212  			v1alpha1.LabelWorkflow: kubeWorkflow.Name,
   213  		},
   214  	})
   215  	if err != nil {
   216  		return WorkflowDetail{}, err
   217  	}
   218  
   219  	err = it.kubeclient.List(ctx, &workflowNodes, &client.ListOptions{
   220  		Namespace:     namespace,
   221  		LabelSelector: selector,
   222  	})
   223  	if err != nil {
   224  		return WorkflowDetail{}, err
   225  	}
   226  
   227  	return convertWorkflowDetail(kubeWorkflow, workflowNodes.Items)
   228  }
   229  
   230  func (it *KubeWorkflowRepository) Delete(ctx context.Context, namespace, name string) error {
   231  	kubeWorkflow := v1alpha1.Workflow{}
   232  
   233  	err := it.kubeclient.Get(ctx, types.NamespacedName{
   234  		Namespace: namespace,
   235  		Name:      name,
   236  	}, &kubeWorkflow)
   237  	if err != nil {
   238  		return err
   239  	}
   240  
   241  	return it.kubeclient.Delete(ctx, &kubeWorkflow)
   242  }
   243  
   244  func convertWorkflow(kubeWorkflow v1alpha1.Workflow) WorkflowMeta {
   245  	result := WorkflowMeta{
   246  		Namespace: kubeWorkflow.Namespace,
   247  		Name:      kubeWorkflow.Name,
   248  		Entry:     kubeWorkflow.Spec.Entry,
   249  		UID:       string(kubeWorkflow.UID),
   250  	}
   251  
   252  	if kubeWorkflow.Status.StartTime != nil {
   253  		result.CreatedAt = kubeWorkflow.Status.StartTime.Time
   254  	}
   255  
   256  	if kubeWorkflow.Status.EndTime != nil {
   257  		result.EndTime = kubeWorkflow.Status.EndTime.Format(time.RFC3339)
   258  	}
   259  
   260  	if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionAccomplished, corev1.ConditionTrue) {
   261  		result.Status = WorkflowSucceed
   262  	} else if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionScheduled, corev1.ConditionTrue) {
   263  		result.Status = WorkflowRunning
   264  	} else {
   265  		result.Status = WorkflowUnknown
   266  	}
   267  
   268  	// TODO: status failed
   269  
   270  	return result
   271  }
   272  
   273  func convertWorkflowDetail(kubeWorkflow v1alpha1.Workflow, kubeNodes []v1alpha1.WorkflowNode) (WorkflowDetail, error) {
   274  	nodes := make([]Node, 0)
   275  
   276  	for _, item := range kubeNodes {
   277  		node, err := convertWorkflowNode(item)
   278  		if err != nil {
   279  			return WorkflowDetail{}, nil
   280  		}
   281  
   282  		nodes = append(nodes, node)
   283  	}
   284  
   285  	result := WorkflowDetail{
   286  		WorkflowMeta: convertWorkflow(kubeWorkflow),
   287  		Topology: Topology{
   288  			Nodes: nodes,
   289  		},
   290  		KubeObject: KubeObjectDesc{
   291  			TypeMeta: kubeWorkflow.TypeMeta,
   292  			Meta: KubeObjectMeta{
   293  				Name:        kubeWorkflow.Name,
   294  				Namespace:   kubeWorkflow.Namespace,
   295  				Labels:      kubeWorkflow.Labels,
   296  				Annotations: kubeWorkflow.Annotations,
   297  			},
   298  			Spec: kubeWorkflow.Spec,
   299  		},
   300  	}
   301  
   302  	return result, nil
   303  }
   304  
   305  func convertWorkflowNode(kubeWorkflowNode v1alpha1.WorkflowNode) (Node, error) {
   306  	templateType, err := mappingTemplateType(kubeWorkflowNode.Spec.Type)
   307  	if err != nil {
   308  		return Node{}, err
   309  	}
   310  
   311  	result := Node{
   312  		Name:     kubeWorkflowNode.Name,
   313  		Type:     templateType,
   314  		Serial:   nil,
   315  		Parallel: nil,
   316  		Template: kubeWorkflowNode.Spec.TemplateName,
   317  		UID:      string(kubeWorkflowNode.UID),
   318  	}
   319  
   320  	if kubeWorkflowNode.Spec.Type == v1alpha1.TypeSerial {
   321  		var nodes []string
   322  		for _, child := range kubeWorkflowNode.Status.FinishedChildren {
   323  			nodes = append(nodes, child.Name)
   324  		}
   325  		for _, child := range kubeWorkflowNode.Status.ActiveChildren {
   326  			nodes = append(nodes, child.Name)
   327  		}
   328  		result.Serial = composeSerialTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes)
   329  
   330  	} else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeParallel {
   331  		var nodes []string
   332  		for _, child := range kubeWorkflowNode.Status.FinishedChildren {
   333  			nodes = append(nodes, child.Name)
   334  		}
   335  		for _, child := range kubeWorkflowNode.Status.ActiveChildren {
   336  			nodes = append(nodes, child.Name)
   337  		}
   338  		result.Parallel = composeParallelTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes)
   339  
   340  	} else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeTask {
   341  		var nodes []string
   342  		for _, child := range kubeWorkflowNode.Status.FinishedChildren {
   343  			nodes = append(nodes, child.Name)
   344  		}
   345  		for _, child := range kubeWorkflowNode.Status.ActiveChildren {
   346  			nodes = append(nodes, child.Name)
   347  		}
   348  		result.ConditionalBranches = composeTaskConditionalBranches(kubeWorkflowNode.Spec.ConditionalBranches, nodes)
   349  	}
   350  
   351  	if wfcontrollers.WorkflowNodeFinished(kubeWorkflowNode.Status) {
   352  		result.State = NodeSucceed
   353  	} else {
   354  		result.State = NodeRunning
   355  	}
   356  
   357  	return result, nil
   358  }
   359  
   360  // composeSerialTaskAndNodes need nodes to be ordered with its creation time
   361  func composeSerialTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
   362  	var result []NodeNameWithTemplate
   363  	for _, node := range nodes {
   364  		// TODO: that reverse the generated name, maybe we could use WorkflowNode.TemplateName in the future
   365  		templateName := node[0:strings.LastIndex(node, "-")]
   366  		result = append(result, NodeNameWithTemplate{Name: node, Template: templateName})
   367  	}
   368  	for _, task := range children[len(nodes):] {
   369  		result = append(result, NodeNameWithTemplate{Template: task})
   370  	}
   371  	return result
   372  }
   373  
   374  func composeParallelTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
   375  	var result []NodeNameWithTemplate
   376  	for _, task := range children {
   377  		result = append(result, NodeNameWithTemplate{
   378  			Name:     "",
   379  			Template: task,
   380  		})
   381  	}
   382  	for _, node := range nodes {
   383  		for i, item := range result {
   384  			if len(item.Name) == 0 && strings.HasPrefix(node, item.Template) {
   385  				result[i].Name = node
   386  				break
   387  			}
   388  		}
   389  	}
   390  	return result
   391  }
   392  
   393  func composeTaskConditionalBranches(conditionalBranches []v1alpha1.ConditionalBranch, nodes []string) []ConditionalBranch {
   394  	var result []ConditionalBranch
   395  	for _, item := range conditionalBranches {
   396  		nodeName := ""
   397  		for _, node := range nodes {
   398  			if strings.HasPrefix(node, item.Target) {
   399  				nodeName = node
   400  			}
   401  		}
   402  		result = append(result,
   403  			ConditionalBranch{
   404  				NodeNameWithTemplate: NodeNameWithTemplate{
   405  					Name:     nodeName,
   406  					Template: item.Target,
   407  				},
   408  				Expression: item.Expression,
   409  			})
   410  	}
   411  
   412  	return result
   413  }
   414  
   415  func mappingTemplateType(templateType v1alpha1.TemplateType) (NodeType, error) {
   416  	if v1alpha1.IsChaosTemplateType(templateType) {
   417  		return ChaosNode, nil
   418  	} else if target, ok := nodeTypeTemplateTypeMapping[templateType]; ok {
   419  		return target, nil
   420  	} else {
   421  		return "", errors.Errorf("can not resolve such type called %s", templateType)
   422  	}
   423  }
   424  
   425  // The WorkflowStore of workflow is not so similar with others store.
   426  type WorkflowStore interface {
   427  	List(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowEntity, error)
   428  	ListMeta(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowMeta, error)
   429  	FindByID(ctx context.Context, ID uint) (*WorkflowEntity, error)
   430  	FindByUID(ctx context.Context, UID string) (*WorkflowEntity, error)
   431  	FindMetaByUID(ctx context.Context, UID string) (*WorkflowMeta, error)
   432  	Save(ctx context.Context, entity *WorkflowEntity) error
   433  	DeleteByUID(ctx context.Context, UID string) error
   434  	DeleteByUIDs(ctx context.Context, UIDs []string) error
   435  	MarkAsArchived(ctx context.Context, namespace, name string) error
   436  	MarkAsArchivedWithUID(ctx context.Context, UID string) error
   437  }
   438  
   439  // WorkflowEntity is the gorm entity, refers to a row of data
   440  type WorkflowEntity struct {
   441  	WorkflowMeta
   442  	Workflow string `gorm:"type:text;size:32768"`
   443  }
   444  
   445  func WorkflowCR2WorkflowEntity(workflow *v1alpha1.Workflow) (*WorkflowEntity, error) {
   446  	if workflow == nil {
   447  		return nil, nil
   448  
   449  	}
   450  	jsonContent, err := json.Marshal(workflow)
   451  	if err != nil {
   452  		return nil, err
   453  	}
   454  	return &WorkflowEntity{
   455  		WorkflowMeta: convertWorkflow(*workflow),
   456  		Workflow:     string(jsonContent),
   457  	}, nil
   458  
   459  }
   460  
   461  func WorkflowEntity2WorkflowCR(entity *WorkflowEntity) (*v1alpha1.Workflow, error) {
   462  	if entity == nil {
   463  		return nil, nil
   464  	}
   465  	result := v1alpha1.Workflow{}
   466  	err := json.Unmarshal([]byte(entity.Workflow), &result)
   467  	if err != nil {
   468  		return nil, err
   469  	}
   470  	return &result, nil
   471  }
   472  
   473  func WorkflowEntity2WorkflowDetail(entity *WorkflowEntity) (*WorkflowDetail, error) {
   474  	workflowCustomResource, err := WorkflowEntity2WorkflowCR(entity)
   475  	if err != nil {
   476  		return nil, err
   477  	}
   478  	return &WorkflowDetail{
   479  		WorkflowMeta: entity.WorkflowMeta,
   480  		KubeObject: KubeObjectDesc{
   481  			TypeMeta: workflowCustomResource.TypeMeta,
   482  			Meta: KubeObjectMeta{
   483  				Name:        workflowCustomResource.Name,
   484  				Namespace:   workflowCustomResource.Namespace,
   485  				Labels:      workflowCustomResource.Labels,
   486  				Annotations: workflowCustomResource.Annotations,
   487  			},
   488  			Spec: workflowCustomResource.Spec,
   489  		},
   490  	}, nil
   491  }
   492