...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core/workflow.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package core
    17  
    18  import (
    19  	"context"
    20  	"encoding/json"
    21  	"strings"
    22  	"time"
    23  
    24  	"github.com/pkg/errors"
    25  	corev1 "k8s.io/api/core/v1"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    29  
    30  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    31  	wfcontrollers "github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers"
    32  )
    33  
    34  type WorkflowRepository interface {
    35  	List(ctx context.Context) ([]WorkflowMeta, error)
    36  	ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error)
    37  	Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error)
    38  	Get(ctx context.Context, namespace, name string) (WorkflowDetail, error)
    39  	Delete(ctx context.Context, namespace, name string) error
    40  	Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error)
    41  }
    42  
    43  type WorkflowStatus string
    44  
    45  const (
    46  	WorkflowRunning WorkflowStatus = "running"
    47  	WorkflowSucceed WorkflowStatus = "finished"
    48  	WorkflowFailed  WorkflowStatus = "failed"
    49  	WorkflowUnknown WorkflowStatus = "unknown"
    50  )
    51  
    52  // WorkflowMeta defines the root structure of a workflow.
    53  type WorkflowMeta struct {
    54  	ID        uint      `gorm:"primary_key" json:"id"`
    55  	UID       string    `gorm:"index:workflow_uid" json:"uid"`
    56  	Namespace string    `json:"namespace"`
    57  	Name      string    `json:"name"`
    58  	Entry     string    `json:"entry"` // the entry node name
    59  	CreatedAt time.Time `json:"created_at"`
    60  	// FinishTime represents the time when the workflow was deleted from Kubernetes.
    61  	FinishTime *time.Time `json:"finish_time"`
    62  	// EndTime represents the time when the workflow completed all steps.
    63  	EndTime  string         `json:"end_time"`
    64  	Status   WorkflowStatus `json:"status,omitempty"`
    65  	Archived bool           `json:"-"`
    66  }
    67  
    68  type WorkflowDetail struct {
    69  	WorkflowMeta `json:",inline"`
    70  	Topology     Topology       `json:"topology"`
    71  	KubeObject   KubeObjectDesc `json:"kube_object,omitempty"`
    72  }
    73  
    74  // Topology describes the process of a workflow.
    75  type Topology struct {
    76  	Nodes []Node `json:"nodes"`
    77  }
    78  
    79  type NodeState string
    80  
    81  const (
    82  	NodeRunning NodeState = "Running"
    83  	NodeSucceed NodeState = "Succeed"
    84  	NodeFailed  NodeState = "Failed"
    85  )
    86  
    87  // Node defines a single step of a workflow.
    88  type Node struct {
    89  	Name                string                 `json:"name"`
    90  	Type                NodeType               `json:"type"`
    91  	State               NodeState              `json:"state"`
    92  	Serial              []NodeNameWithTemplate `json:"serial,omitempty"`
    93  	Parallel            []NodeNameWithTemplate `json:"parallel,omitempty"`
    94  	ConditionalBranches []ConditionalBranch    `json:"conditional_branches,omitempty"`
    95  	Template            string                 `json:"template"`
    96  	UID                 string                 `json:"uid"`
    97  }
    98  
    99  type NodeNameWithTemplate struct {
   100  	Name     string `json:"name,omitempty"`
   101  	Template string `json:"template,omitempty"`
   102  }
   103  
   104  type ConditionalBranch struct {
   105  	NodeNameWithTemplate `json:",inline,omitempty"`
   106  	Expression           string `json:"expression,omitempty"`
   107  }
   108  
   109  // NodeType represents the type of a workflow node.
   110  //
   111  // There will be five types can be referred as NodeType:
   112  // ChaosNode, SerialNode, ParallelNode, SuspendNode, TaskNode.
   113  //
   114  // Const definitions can be found below this type.
   115  type NodeType string
   116  
   117  const (
   118  	// ChaosNode represents a node will perform a single Chaos Experiment.
   119  	ChaosNode NodeType = "ChaosNode"
   120  
   121  	// SerialNode represents a node that will perform continuous templates.
   122  	SerialNode NodeType = "SerialNode"
   123  
   124  	// ParallelNode represents a node that will perform parallel templates.
   125  	ParallelNode NodeType = "ParallelNode"
   126  
   127  	// SuspendNode represents a node that will perform wait operation.
   128  	SuspendNode NodeType = "SuspendNode"
   129  
   130  	// TaskNode represents a node that will perform user-defined task.
   131  	TaskNode NodeType = "TaskNode"
   132  )
   133  
   134  var nodeTypeTemplateTypeMapping = map[v1alpha1.TemplateType]NodeType{
   135  	v1alpha1.TypeSerial:   SerialNode,
   136  	v1alpha1.TypeParallel: ParallelNode,
   137  	v1alpha1.TypeSuspend:  SuspendNode,
   138  	v1alpha1.TypeTask:     TaskNode,
   139  }
   140  
   141  type KubeWorkflowRepository struct {
   142  	kubeclient client.Client
   143  }
   144  
   145  func NewKubeWorkflowRepository(kubeclient client.Client) *KubeWorkflowRepository {
   146  	return &KubeWorkflowRepository{kubeclient: kubeclient}
   147  }
   148  
   149  func (it *KubeWorkflowRepository) Create(ctx context.Context, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
   150  	err := it.kubeclient.Create(ctx, &workflow)
   151  	if err != nil {
   152  		return WorkflowDetail{}, err
   153  	}
   154  
   155  	return it.Get(ctx, workflow.Namespace, workflow.Name)
   156  }
   157  
   158  func (it *KubeWorkflowRepository) Update(ctx context.Context, namespace, name string, workflow v1alpha1.Workflow) (WorkflowDetail, error) {
   159  	current := v1alpha1.Workflow{}
   160  
   161  	err := it.kubeclient.Get(ctx, types.NamespacedName{
   162  		Namespace: namespace,
   163  		Name:      name,
   164  	}, &current)
   165  	if err != nil {
   166  		return WorkflowDetail{}, err
   167  	}
   168  	workflow.ObjectMeta.ResourceVersion = current.ObjectMeta.ResourceVersion
   169  
   170  	err = it.kubeclient.Update(ctx, &workflow)
   171  	if err != nil {
   172  		return WorkflowDetail{}, err
   173  	}
   174  
   175  	return it.Get(ctx, workflow.Namespace, workflow.Name)
   176  }
   177  
   178  func (it *KubeWorkflowRepository) ListByNamespace(ctx context.Context, namespace string) ([]WorkflowMeta, error) {
   179  	workflowList := v1alpha1.WorkflowList{}
   180  
   181  	err := it.kubeclient.List(ctx, &workflowList, &client.ListOptions{
   182  		Namespace: namespace,
   183  	})
   184  	if err != nil {
   185  		return nil, err
   186  	}
   187  
   188  	var result []WorkflowMeta
   189  	for _, item := range workflowList.Items {
   190  		result = append(result, convertWorkflow(item))
   191  	}
   192  
   193  	return result, nil
   194  }
   195  
   196  func (it *KubeWorkflowRepository) List(ctx context.Context) ([]WorkflowMeta, error) {
   197  	return it.ListByNamespace(ctx, "")
   198  }
   199  
   200  func (it *KubeWorkflowRepository) Get(ctx context.Context, namespace, name string) (WorkflowDetail, error) {
   201  	kubeWorkflow := v1alpha1.Workflow{}
   202  
   203  	err := it.kubeclient.Get(ctx, types.NamespacedName{
   204  		Namespace: namespace,
   205  		Name:      name,
   206  	}, &kubeWorkflow)
   207  	if err != nil {
   208  		return WorkflowDetail{}, err
   209  	}
   210  
   211  	workflowNodes := v1alpha1.WorkflowNodeList{}
   212  	// labeling workflow nodes, see pkg/workflow/controllers/new_node.go
   213  	selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   214  		MatchLabels: map[string]string{
   215  			v1alpha1.LabelWorkflow: kubeWorkflow.Name,
   216  		},
   217  	})
   218  	if err != nil {
   219  		return WorkflowDetail{}, err
   220  	}
   221  
   222  	err = it.kubeclient.List(ctx, &workflowNodes, &client.ListOptions{
   223  		Namespace:     namespace,
   224  		LabelSelector: selector,
   225  	})
   226  	if err != nil {
   227  		return WorkflowDetail{}, err
   228  	}
   229  
   230  	return convertWorkflowDetail(kubeWorkflow, workflowNodes.Items)
   231  }
   232  
   233  func (it *KubeWorkflowRepository) Delete(ctx context.Context, namespace, name string) error {
   234  	kubeWorkflow := v1alpha1.Workflow{}
   235  
   236  	err := it.kubeclient.Get(ctx, types.NamespacedName{
   237  		Namespace: namespace,
   238  		Name:      name,
   239  	}, &kubeWorkflow)
   240  	if err != nil {
   241  		return err
   242  	}
   243  
   244  	return it.kubeclient.Delete(ctx, &kubeWorkflow)
   245  }
   246  
   247  func convertWorkflow(kubeWorkflow v1alpha1.Workflow) WorkflowMeta {
   248  	result := WorkflowMeta{
   249  		Namespace: kubeWorkflow.Namespace,
   250  		Name:      kubeWorkflow.Name,
   251  		Entry:     kubeWorkflow.Spec.Entry,
   252  		UID:       string(kubeWorkflow.UID),
   253  	}
   254  
   255  	if kubeWorkflow.Status.StartTime != nil {
   256  		result.CreatedAt = kubeWorkflow.Status.StartTime.Time
   257  	}
   258  
   259  	if kubeWorkflow.Status.EndTime != nil {
   260  		result.EndTime = kubeWorkflow.Status.EndTime.Format(time.RFC3339)
   261  	}
   262  
   263  	if kubeWorkflow.GetDeletionTimestamp() != nil {
   264  		result.FinishTime = &kubeWorkflow.GetDeletionTimestamp().Time
   265  	}
   266  
   267  	if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionAccomplished, corev1.ConditionTrue) {
   268  		result.Status = WorkflowSucceed
   269  	} else if wfcontrollers.WorkflowConditionEqualsTo(kubeWorkflow.Status, v1alpha1.WorkflowConditionScheduled, corev1.ConditionTrue) {
   270  		result.Status = WorkflowRunning
   271  	} else {
   272  		result.Status = WorkflowUnknown
   273  	}
   274  
   275  	// TODO: status failed
   276  
   277  	return result
   278  }
   279  
   280  func convertWorkflowDetail(kubeWorkflow v1alpha1.Workflow, kubeNodes []v1alpha1.WorkflowNode) (WorkflowDetail, error) {
   281  	nodes := make([]Node, 0)
   282  
   283  	for _, item := range kubeNodes {
   284  		node, err := convertWorkflowNode(item)
   285  		if err != nil {
   286  			return WorkflowDetail{}, nil
   287  		}
   288  
   289  		nodes = append(nodes, node)
   290  	}
   291  
   292  	result := WorkflowDetail{
   293  		WorkflowMeta: convertWorkflow(kubeWorkflow),
   294  		Topology: Topology{
   295  			Nodes: nodes,
   296  		},
   297  		KubeObject: KubeObjectDesc{
   298  			TypeMeta: kubeWorkflow.TypeMeta,
   299  			Meta: KubeObjectMeta{
   300  				Name:        kubeWorkflow.Name,
   301  				Namespace:   kubeWorkflow.Namespace,
   302  				Labels:      kubeWorkflow.Labels,
   303  				Annotations: kubeWorkflow.Annotations,
   304  			},
   305  			Spec: kubeWorkflow.Spec,
   306  		},
   307  	}
   308  
   309  	return result, nil
   310  }
   311  
   312  func convertWorkflowNode(kubeWorkflowNode v1alpha1.WorkflowNode) (Node, error) {
   313  	templateType, err := mappingTemplateType(kubeWorkflowNode.Spec.Type)
   314  	if err != nil {
   315  		return Node{}, err
   316  	}
   317  
   318  	result := Node{
   319  		Name:     kubeWorkflowNode.Name,
   320  		Type:     templateType,
   321  		Serial:   nil,
   322  		Parallel: nil,
   323  		Template: kubeWorkflowNode.Spec.TemplateName,
   324  		UID:      string(kubeWorkflowNode.UID),
   325  	}
   326  
   327  	if kubeWorkflowNode.Spec.Type == v1alpha1.TypeSerial {
   328  		var nodes []string
   329  		for _, child := range kubeWorkflowNode.Status.FinishedChildren {
   330  			nodes = append(nodes, child.Name)
   331  		}
   332  		for _, child := range kubeWorkflowNode.Status.ActiveChildren {
   333  			nodes = append(nodes, child.Name)
   334  		}
   335  		result.Serial = composeSerialTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes)
   336  
   337  	} else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeParallel {
   338  		var nodes []string
   339  		for _, child := range kubeWorkflowNode.Status.FinishedChildren {
   340  			nodes = append(nodes, child.Name)
   341  		}
   342  		for _, child := range kubeWorkflowNode.Status.ActiveChildren {
   343  			nodes = append(nodes, child.Name)
   344  		}
   345  		result.Parallel = composeParallelTaskAndNodes(kubeWorkflowNode.Spec.Children, nodes)
   346  
   347  	} else if kubeWorkflowNode.Spec.Type == v1alpha1.TypeTask {
   348  		var nodes []string
   349  		for _, child := range kubeWorkflowNode.Status.FinishedChildren {
   350  			nodes = append(nodes, child.Name)
   351  		}
   352  		for _, child := range kubeWorkflowNode.Status.ActiveChildren {
   353  			nodes = append(nodes, child.Name)
   354  		}
   355  		result.ConditionalBranches = composeTaskConditionalBranches(kubeWorkflowNode.Spec.ConditionalBranches, nodes)
   356  	}
   357  
   358  	if wfcontrollers.WorkflowNodeFinished(kubeWorkflowNode.Status) {
   359  		result.State = NodeSucceed
   360  	} else {
   361  		result.State = NodeRunning
   362  	}
   363  
   364  	return result, nil
   365  }
   366  
   367  // composeSerialTaskAndNodes need nodes to be ordered with its creation time
   368  func composeSerialTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
   369  	var result []NodeNameWithTemplate
   370  	for _, node := range nodes {
   371  		// TODO: that reverse the generated name, maybe we could use WorkflowNode.TemplateName in the future
   372  		templateName := node[0:strings.LastIndex(node, "-")]
   373  		result = append(result, NodeNameWithTemplate{Name: node, Template: templateName})
   374  	}
   375  	for _, task := range children[len(nodes):] {
   376  		result = append(result, NodeNameWithTemplate{Template: task})
   377  	}
   378  	return result
   379  }
   380  
   381  func composeParallelTaskAndNodes(children []string, nodes []string) []NodeNameWithTemplate {
   382  	var result []NodeNameWithTemplate
   383  	for _, task := range children {
   384  		result = append(result, NodeNameWithTemplate{
   385  			Name:     "",
   386  			Template: task,
   387  		})
   388  	}
   389  	for _, node := range nodes {
   390  		for i, item := range result {
   391  			if len(item.Name) == 0 && strings.HasPrefix(node, item.Template) {
   392  				result[i].Name = node
   393  				break
   394  			}
   395  		}
   396  	}
   397  	return result
   398  }
   399  
   400  func composeTaskConditionalBranches(conditionalBranches []v1alpha1.ConditionalBranch, nodes []string) []ConditionalBranch {
   401  	var result []ConditionalBranch
   402  	for _, item := range conditionalBranches {
   403  		nodeName := ""
   404  		for _, node := range nodes {
   405  			if strings.HasPrefix(node, item.Target) {
   406  				nodeName = node
   407  			}
   408  		}
   409  		result = append(result,
   410  			ConditionalBranch{
   411  				NodeNameWithTemplate: NodeNameWithTemplate{
   412  					Name:     nodeName,
   413  					Template: item.Target,
   414  				},
   415  				Expression: item.Expression,
   416  			})
   417  	}
   418  
   419  	return result
   420  }
   421  
   422  func mappingTemplateType(templateType v1alpha1.TemplateType) (NodeType, error) {
   423  	if v1alpha1.IsChaosTemplateType(templateType) {
   424  		return ChaosNode, nil
   425  	} else if target, ok := nodeTypeTemplateTypeMapping[templateType]; ok {
   426  		return target, nil
   427  	} else {
   428  		return "", errors.Errorf("can not resolve such type called %s", templateType)
   429  	}
   430  }
   431  
   432  // The WorkflowStore of workflow is not so similar with others store.
   433  type WorkflowStore interface {
   434  	List(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowEntity, error)
   435  	ListMeta(ctx context.Context, namespace, name string, archived bool) ([]*WorkflowMeta, error)
   436  	FindByID(ctx context.Context, ID uint) (*WorkflowEntity, error)
   437  	FindByUID(ctx context.Context, UID string) (*WorkflowEntity, error)
   438  	FindMetaByUID(ctx context.Context, UID string) (*WorkflowMeta, error)
   439  	Save(ctx context.Context, entity *WorkflowEntity) error
   440  	DeleteByUID(ctx context.Context, UID string) error
   441  	DeleteByUIDs(ctx context.Context, UIDs []string) error
   442  	DeleteByFinishTime(ctx context.Context, ttl time.Duration) error
   443  	MarkAsArchived(ctx context.Context, namespace, name string) error
   444  	MarkAsArchivedWithUID(ctx context.Context, UID string) error
   445  }
   446  
   447  // WorkflowEntity is the gorm entity, refers to a row of data
   448  type WorkflowEntity struct {
   449  	WorkflowMeta
   450  	Workflow string `gorm:"type:text;size:32768"`
   451  }
   452  
   453  func WorkflowCR2WorkflowEntity(workflow *v1alpha1.Workflow) (*WorkflowEntity, error) {
   454  	if workflow == nil {
   455  		return nil, nil
   456  
   457  	}
   458  	jsonContent, err := json.Marshal(workflow)
   459  	if err != nil {
   460  		return nil, err
   461  	}
   462  
   463  	return &WorkflowEntity{
   464  		WorkflowMeta: convertWorkflow(*workflow),
   465  		Workflow:     string(jsonContent),
   466  	}, nil
   467  
   468  }
   469  
   470  func WorkflowEntity2WorkflowCR(entity *WorkflowEntity) (*v1alpha1.Workflow, error) {
   471  	if entity == nil {
   472  		return nil, nil
   473  	}
   474  	result := v1alpha1.Workflow{}
   475  	err := json.Unmarshal([]byte(entity.Workflow), &result)
   476  	if err != nil {
   477  		return nil, err
   478  	}
   479  	return &result, nil
   480  }
   481  
   482  func WorkflowEntity2WorkflowDetail(entity *WorkflowEntity) (*WorkflowDetail, error) {
   483  	workflowCustomResource, err := WorkflowEntity2WorkflowCR(entity)
   484  	if err != nil {
   485  		return nil, err
   486  	}
   487  	return &WorkflowDetail{
   488  		WorkflowMeta: entity.WorkflowMeta,
   489  		KubeObject: KubeObjectDesc{
   490  			TypeMeta: workflowCustomResource.TypeMeta,
   491  			Meta: KubeObjectMeta{
   492  				Name:        workflowCustomResource.Name,
   493  				Namespace:   workflowCustomResource.Namespace,
   494  				Labels:      workflowCustomResource.Labels,
   495  				Annotations: workflowCustomResource.Annotations,
   496  			},
   497  			Spec: workflowCustomResource.Spec,
   498  		},
   499  	}, nil
   500  }
   501