...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/utils.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"sort"
    22  	"strings"
    23  
    24  	"github.com/go-logr/logr"
    25  	corev1 "k8s.io/api/core/v1"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  
    29  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    30  )
    31  
    32  func SetCondition(status *v1alpha1.WorkflowNodeStatus, condition v1alpha1.WorkflowNodeCondition) {
    33  	currentCond := GetCondition(*status, condition.Type)
    34  	if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
    35  		return
    36  	}
    37  	newConditions := filterOutCondition(status.Conditions, condition.Type)
    38  	status.Conditions = append(newConditions, condition)
    39  }
    40  
    41  func GetCondition(status v1alpha1.WorkflowNodeStatus, conditionType v1alpha1.WorkflowNodeConditionType) *v1alpha1.WorkflowNodeCondition {
    42  	for _, item := range status.Conditions {
    43  		if item.Type == conditionType {
    44  			return &item
    45  		}
    46  	}
    47  	return nil
    48  }
    49  
    50  func ConditionEqualsTo(status v1alpha1.WorkflowNodeStatus, conditionType v1alpha1.WorkflowNodeConditionType, expected corev1.ConditionStatus) bool {
    51  	condition := GetCondition(status, conditionType)
    52  	if condition == nil {
    53  		return false
    54  	}
    55  	return condition.Status == expected
    56  }
    57  
    58  func filterOutCondition(conditions []v1alpha1.WorkflowNodeCondition, except v1alpha1.WorkflowNodeConditionType) []v1alpha1.WorkflowNodeCondition {
    59  	var newConditions []v1alpha1.WorkflowNodeCondition
    60  	for _, c := range conditions {
    61  		if c.Type == except {
    62  			continue
    63  		}
    64  		newConditions = append(newConditions, c)
    65  	}
    66  	return newConditions
    67  }
    68  
    69  func WorkflowNodeFinished(status v1alpha1.WorkflowNodeStatus) bool {
    70  	return ConditionEqualsTo(status, v1alpha1.ConditionAccomplished, corev1.ConditionTrue) ||
    71  		ConditionEqualsTo(status, v1alpha1.ConditionDeadlineExceed, corev1.ConditionTrue)
    72  }
    73  
    74  func SetWorkflowCondition(status *v1alpha1.WorkflowStatus, condition v1alpha1.WorkflowCondition) {
    75  	currentCond := GetWorkflowCondition(*status, condition.Type)
    76  	if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
    77  		return
    78  	}
    79  	newConditions := filterOutWorkflowCondition(status.Conditions, condition.Type)
    80  	status.Conditions = append(newConditions, condition)
    81  }
    82  
    83  func GetWorkflowCondition(status v1alpha1.WorkflowStatus, conditionType v1alpha1.WorkflowConditionType) *v1alpha1.WorkflowCondition {
    84  	for _, item := range status.Conditions {
    85  		if item.Type == conditionType {
    86  			return &item
    87  		}
    88  	}
    89  	return nil
    90  }
    91  
    92  func WorkflowConditionEqualsTo(status v1alpha1.WorkflowStatus, conditionType v1alpha1.WorkflowConditionType, expected corev1.ConditionStatus) bool {
    93  	condition := GetWorkflowCondition(status, conditionType)
    94  	if condition == nil {
    95  		return false
    96  	}
    97  	return condition.Status == expected
    98  }
    99  
   100  func filterOutWorkflowCondition(conditions []v1alpha1.WorkflowCondition, except v1alpha1.WorkflowConditionType) []v1alpha1.WorkflowCondition {
   101  	var newConditions []v1alpha1.WorkflowCondition
   102  	for _, c := range conditions {
   103  		if c.Type == except {
   104  			continue
   105  		}
   106  		newConditions = append(newConditions, c)
   107  	}
   108  	return newConditions
   109  }
   110  
   111  type SortByCreationTimestamp []v1alpha1.WorkflowNode
   112  
   113  func (it SortByCreationTimestamp) Len() int {
   114  	return len(it)
   115  }
   116  
   117  func (it SortByCreationTimestamp) Less(i, j int) bool {
   118  	return it[j].GetCreationTimestamp().After(it[i].GetCreationTimestamp().Time)
   119  }
   120  
   121  func (it SortByCreationTimestamp) Swap(i, j int) {
   122  	it[i], it[j] = it[j], it[i]
   123  }
   124  
   125  type ChildNodesFetcher struct {
   126  	kubeClient client.Client
   127  	logger     logr.Logger
   128  }
   129  
   130  func NewChildNodesFetcher(kubeClient client.Client, logger logr.Logger) *ChildNodesFetcher {
   131  	return &ChildNodesFetcher{kubeClient: kubeClient, logger: logger}
   132  }
   133  
   134  // fetchChildNodes will return children workflow nodes controlled by given node
   135  // Should only be used with Parallel and Serial Node
   136  func (it *ChildNodesFetcher) fetchChildNodes(ctx context.Context, node v1alpha1.WorkflowNode) (activeChildNodes []v1alpha1.WorkflowNode, finishedChildNodes []v1alpha1.WorkflowNode, err error) {
   137  	childNodes := v1alpha1.WorkflowNodeList{}
   138  	controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   139  		MatchLabels: map[string]string{
   140  			v1alpha1.LabelControlledBy: node.Name,
   141  		},
   142  	})
   143  
   144  	if err != nil {
   145  		it.logger.Error(err, "failed to build label selector with filtering children workflow node controlled by current node",
   146  			"current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   147  		return nil, nil, err
   148  	}
   149  
   150  	err = it.kubeClient.List(ctx, &childNodes, &client.ListOptions{
   151  		Namespace:     node.Namespace,
   152  		LabelSelector: controlledByThisNode,
   153  	})
   154  
   155  	if err != nil {
   156  		it.logger.Error(err, "failed to list children workflow node controlled by current node",
   157  			"current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   158  		return nil, nil, err
   159  	}
   160  
   161  	sortedChildNodes := SortByCreationTimestamp(childNodes.Items)
   162  	sort.Sort(sortedChildNodes)
   163  
   164  	it.logger.V(4).Info("list children node", "current node",
   165  		"current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   166  		len(sortedChildNodes), "children", sortedChildNodes)
   167  
   168  	var activeChildren []v1alpha1.WorkflowNode
   169  	var finishedChildren []v1alpha1.WorkflowNode
   170  
   171  	for _, item := range sortedChildNodes {
   172  		childNode := item
   173  		if WorkflowNodeFinished(childNode.Status) {
   174  			finishedChildren = append(finishedChildren, childNode)
   175  		} else {
   176  			activeChildren = append(activeChildren, childNode)
   177  		}
   178  	}
   179  	return activeChildren, finishedChildren, nil
   180  }
   181  
   182  func getTaskNameFromGeneratedName(generatedNodeName string) string {
   183  	index := strings.LastIndex(generatedNodeName, "-")
   184  	if index < 0 {
   185  		return generatedNodeName
   186  	}
   187  	return generatedNodeName[:index]
   188  }
   189  
   190  // setDifference return the set of elements which contained in former but not in latter
   191  func setDifference(former []string, latter []string) []string {
   192  	var result []string
   193  	formerSet := make(map[string]struct{})
   194  	latterSet := make(map[string]struct{})
   195  
   196  	for _, item := range former {
   197  		formerSet[item] = struct{}{}
   198  	}
   199  	for _, item := range latter {
   200  		latterSet[item] = struct{}{}
   201  	}
   202  	for k := range formerSet {
   203  		if _, ok := latterSet[k]; !ok {
   204  			result = append(result, k)
   205  		}
   206  	}
   207  	return result
   208  }
   209