...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/utils.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"sort"
    22  	"strings"
    23  
    24  	"github.com/go-logr/logr"
    25  	corev1 "k8s.io/api/core/v1"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  
    29  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    30  )
    31  
    32  func SetCondition(status *v1alpha1.WorkflowNodeStatus, condition v1alpha1.WorkflowNodeCondition) {
    33  	currentCond := GetCondition(*status, condition.Type)
    34  	if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
    35  		return
    36  	}
    37  	newConditions := filterOutCondition(status.Conditions, condition.Type)
    38  	status.Conditions = append(newConditions, condition)
    39  }
    40  
    41  func GetCondition(status v1alpha1.WorkflowNodeStatus, conditionType v1alpha1.WorkflowNodeConditionType) *v1alpha1.WorkflowNodeCondition {
    42  	for _, item := range status.Conditions {
    43  		if item.Type == conditionType {
    44  			return &item
    45  		}
    46  	}
    47  	return nil
    48  }
    49  
    50  func ConditionEqualsTo(status v1alpha1.WorkflowNodeStatus, conditionType v1alpha1.WorkflowNodeConditionType, expected corev1.ConditionStatus) bool {
    51  	condition := GetCondition(status, conditionType)
    52  	if condition == nil {
    53  		return false
    54  	}
    55  	return condition.Status == expected
    56  }
    57  
    58  func filterOutCondition(conditions []v1alpha1.WorkflowNodeCondition, except v1alpha1.WorkflowNodeConditionType) []v1alpha1.WorkflowNodeCondition {
    59  	var newConditions []v1alpha1.WorkflowNodeCondition
    60  	for _, c := range conditions {
    61  		if c.Type == except {
    62  			continue
    63  		}
    64  		newConditions = append(newConditions, c)
    65  	}
    66  	return newConditions
    67  }
    68  
    69  func WorkflowNodeFinished(status v1alpha1.WorkflowNodeStatus) bool {
    70  	return ConditionEqualsTo(status, v1alpha1.ConditionAccomplished, corev1.ConditionTrue) ||
    71  		ConditionEqualsTo(status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue) ||
    72  		ConditionEqualsTo(status, v1alpha1.ConditionAborted, corev1.ConditionTrue)
    73  }
    74  
    75  func WorkflowAborted(workflow v1alpha1.Workflow) bool {
    76  	return workflow.Annotations[v1alpha1.WorkflowAnnotationAbort] == "true"
    77  }
    78  
    79  func SetWorkflowCondition(status *v1alpha1.WorkflowStatus, condition v1alpha1.WorkflowCondition) {
    80  	currentCond := GetWorkflowCondition(*status, condition.Type)
    81  	if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
    82  		return
    83  	}
    84  	newConditions := filterOutWorkflowCondition(status.Conditions, condition.Type)
    85  	status.Conditions = append(newConditions, condition)
    86  }
    87  
    88  func GetWorkflowCondition(status v1alpha1.WorkflowStatus, conditionType v1alpha1.WorkflowConditionType) *v1alpha1.WorkflowCondition {
    89  	for _, item := range status.Conditions {
    90  		if item.Type == conditionType {
    91  			return &item
    92  		}
    93  	}
    94  	return nil
    95  }
    96  
    97  func WorkflowConditionEqualsTo(status v1alpha1.WorkflowStatus, conditionType v1alpha1.WorkflowConditionType, expected corev1.ConditionStatus) bool {
    98  	condition := GetWorkflowCondition(status, conditionType)
    99  	if condition == nil {
   100  		return false
   101  	}
   102  	return condition.Status == expected
   103  }
   104  
   105  func filterOutWorkflowCondition(conditions []v1alpha1.WorkflowCondition, except v1alpha1.WorkflowConditionType) []v1alpha1.WorkflowCondition {
   106  	var newConditions []v1alpha1.WorkflowCondition
   107  	for _, c := range conditions {
   108  		if c.Type == except {
   109  			continue
   110  		}
   111  		newConditions = append(newConditions, c)
   112  	}
   113  	return newConditions
   114  }
   115  
   116  type SortByCreationTimestamp []v1alpha1.WorkflowNode
   117  
   118  func (it SortByCreationTimestamp) Len() int {
   119  	return len(it)
   120  }
   121  
   122  func (it SortByCreationTimestamp) Less(i, j int) bool {
   123  	return it[j].GetCreationTimestamp().After(it[i].GetCreationTimestamp().Time)
   124  }
   125  
   126  func (it SortByCreationTimestamp) Swap(i, j int) {
   127  	it[i], it[j] = it[j], it[i]
   128  }
   129  
   130  type ChildNodesFetcher struct {
   131  	kubeClient client.Client
   132  	logger     logr.Logger
   133  }
   134  
   135  func NewChildNodesFetcher(kubeClient client.Client, logger logr.Logger) *ChildNodesFetcher {
   136  	return &ChildNodesFetcher{kubeClient: kubeClient, logger: logger}
   137  }
   138  
   139  // fetchChildNodes will return children workflow nodes controlled by given node
   140  // Should only be used with Parallel and Serial Node
   141  func (it *ChildNodesFetcher) fetchChildNodes(ctx context.Context, node v1alpha1.WorkflowNode) (activeChildNodes []v1alpha1.WorkflowNode, finishedChildNodes []v1alpha1.WorkflowNode, err error) {
   142  	childNodes := v1alpha1.WorkflowNodeList{}
   143  	controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   144  		MatchLabels: map[string]string{
   145  			v1alpha1.LabelControlledBy: node.Name,
   146  		},
   147  	})
   148  
   149  	if err != nil {
   150  		it.logger.Error(err, "failed to build label selector with filtering children workflow node controlled by current node",
   151  			"current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   152  		return nil, nil, err
   153  	}
   154  
   155  	err = it.kubeClient.List(ctx, &childNodes, &client.ListOptions{
   156  		Namespace:     node.Namespace,
   157  		LabelSelector: controlledByThisNode,
   158  	})
   159  
   160  	if err != nil {
   161  		it.logger.Error(err, "failed to list children workflow node controlled by current node",
   162  			"current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   163  		return nil, nil, err
   164  	}
   165  
   166  	sortedChildNodes := SortByCreationTimestamp(childNodes.Items)
   167  	sort.Sort(sortedChildNodes)
   168  
   169  	it.logger.V(4).Info("list children node", "current node",
   170  		"current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   171  		len(sortedChildNodes), "children", sortedChildNodes)
   172  
   173  	var activeChildren []v1alpha1.WorkflowNode
   174  	var finishedChildren []v1alpha1.WorkflowNode
   175  
   176  	for _, item := range sortedChildNodes {
   177  		childNode := item
   178  		if WorkflowNodeFinished(childNode.Status) {
   179  			finishedChildren = append(finishedChildren, childNode)
   180  		} else {
   181  			activeChildren = append(activeChildren, childNode)
   182  		}
   183  	}
   184  	return activeChildren, finishedChildren, nil
   185  }
   186  
   187  func getTaskNameFromGeneratedName(generatedNodeName string) string {
   188  	index := strings.LastIndex(generatedNodeName, "-")
   189  	if index < 0 {
   190  		return generatedNodeName
   191  	}
   192  	return generatedNodeName[:index]
   193  }
   194  
   195  // setDifference return the set of elements which contained in former but not in latter
   196  func setDifference(former []string, latter []string) []string {
   197  	var result []string
   198  	formerSet := make(map[string]struct{})
   199  	latterSet := make(map[string]struct{})
   200  
   201  	for _, item := range former {
   202  		formerSet[item] = struct{}{}
   203  	}
   204  	for _, item := range latter {
   205  		latterSet[item] = struct{}{}
   206  	}
   207  	for k := range formerSet {
   208  		if _, ok := latterSet[k]; !ok {
   209  			result = append(result, k)
   210  		}
   211  	}
   212  	return result
   213  }
   214