...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/parallel_node_reconciler.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"time"
    22  
    23  	"github.com/go-logr/logr"
    24  	corev1 "k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	"k8s.io/client-go/util/retry"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    29  
    30  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    32  )
    33  
    34  // ParallelNodeReconciler watches on nodes which type is Parallel
    35  type ParallelNodeReconciler struct {
    36  	*ChildNodesFetcher
    37  	kubeClient    client.Client
    38  	eventRecorder recorder.ChaosRecorder
    39  	logger        logr.Logger
    40  }
    41  
    42  func NewParallelNodeReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *ParallelNodeReconciler {
    43  	return &ParallelNodeReconciler{
    44  		ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
    45  		kubeClient:        kubeClient,
    46  		eventRecorder:     eventRecorder,
    47  		logger:            logger,
    48  	}
    49  }
    50  
    51  // Reconcile is extremely like the one in SerialNodeReconciler, only allows the parallel schedule, and respawn **all** the children tasks during retry
    52  func (it *ParallelNodeReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
    53  	startTime := time.Now()
    54  	defer func() {
    55  		it.logger.V(4).Info("Finished syncing for parallel node",
    56  			"node", request.NamespacedName,
    57  			"duration", time.Since(startTime),
    58  		)
    59  	}()
    60  
    61  	node := v1alpha1.WorkflowNode{}
    62  	err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
    63  	if err != nil {
    64  		return reconcile.Result{}, client.IgnoreNotFound(err)
    65  	}
    66  
    67  	// only resolve parallel nodes
    68  	if node.Spec.Type != v1alpha1.TypeParallel {
    69  		return reconcile.Result{}, nil
    70  	}
    71  
    72  	it.logger.V(4).Info("resolve parallel node", "node", request)
    73  
    74  	// make effects, create/remove children nodes
    75  	err = it.syncChildNodes(ctx, node)
    76  	if err != nil {
    77  		return reconcile.Result{}, err
    78  	}
    79  
    80  	// update status
    81  	updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
    82  		nodeNeedUpdate := v1alpha1.WorkflowNode{}
    83  		err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
    84  		if err != nil {
    85  			return err
    86  		}
    87  
    88  		activeChildren, finishedChildren, err := it.fetchChildNodes(ctx, nodeNeedUpdate)
    89  		if err != nil {
    90  			return err
    91  		}
    92  
    93  		nodeNeedUpdate.Status.FinishedChildren = nil
    94  		for _, finishedChild := range finishedChildren {
    95  			nodeNeedUpdate.Status.FinishedChildren = append(nodeNeedUpdate.Status.FinishedChildren,
    96  				corev1.LocalObjectReference{
    97  					Name: finishedChild.Name,
    98  				})
    99  		}
   100  
   101  		nodeNeedUpdate.Status.ActiveChildren = nil
   102  		for _, activeChild := range activeChildren {
   103  			nodeNeedUpdate.Status.ActiveChildren = append(nodeNeedUpdate.Status.ActiveChildren,
   104  				corev1.LocalObjectReference{
   105  					Name: activeChild.Name,
   106  				})
   107  		}
   108  
   109  		// TODO: also check the consistent between spec in task and the spec in child node
   110  		if len(finishedChildren) == len(nodeNeedUpdate.Spec.Children) {
   111  			SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   112  				Type:   v1alpha1.ConditionAccomplished,
   113  				Status: corev1.ConditionTrue,
   114  				Reason: "",
   115  			})
   116  			it.eventRecorder.Event(&nodeNeedUpdate, recorder.NodeAccomplished{})
   117  		} else {
   118  			SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   119  				Type:   v1alpha1.ConditionAccomplished,
   120  				Status: corev1.ConditionFalse,
   121  				Reason: "",
   122  			})
   123  		}
   124  
   125  		return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
   126  	})
   127  
   128  	if updateError != nil {
   129  		it.logger.Error(err, "failed to update the status of node", "node", request)
   130  		return reconcile.Result{}, updateError
   131  	}
   132  
   133  	return reconcile.Result{}, nil
   134  }
   135  
   136  func (it *ParallelNodeReconciler) syncChildNodes(ctx context.Context, node v1alpha1.WorkflowNode) error {
   137  
   138  	// empty parallel node
   139  	if len(node.Spec.Children) == 0 {
   140  		it.logger.V(4).Info("empty parallel node, NOOP",
   141  			"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   142  		)
   143  		return nil
   144  	}
   145  
   146  	if WorkflowNodeFinished(node.Status) {
   147  		return nil
   148  	}
   149  
   150  	activeChildNodes, finishedChildNodes, err := it.fetchChildNodes(ctx, node)
   151  	if err != nil {
   152  		return err
   153  	}
   154  	existsChildNodes := append(activeChildNodes, finishedChildNodes...)
   155  
   156  	var taskNamesOfNodes []string
   157  	for _, childNode := range existsChildNodes {
   158  		taskNamesOfNodes = append(taskNamesOfNodes, getTaskNameFromGeneratedName(childNode.GetName()))
   159  	}
   160  
   161  	var tasksToStartup []string
   162  
   163  	if len(existsChildNodes) == 0 {
   164  		tasksToStartup = node.Spec.Children
   165  	}
   166  	// TODO: check the specific of task and workflow nodes
   167  	// the definition of Spec.Children changed, remove all the existed nodes
   168  	if len(existsChildNodes) > 0 && (len(setDifference(taskNamesOfNodes, node.Spec.Children)) > 0 || len(setDifference(node.Spec.Children, taskNamesOfNodes)) > 0) {
   169  		tasksToStartup = node.Spec.Children
   170  
   171  		var nodesToCleanup []string
   172  		for _, item := range existsChildNodes {
   173  			nodesToCleanup = append(nodesToCleanup, item.Name)
   174  		}
   175  		it.eventRecorder.Event(&node, recorder.RerunBySpecChanged{CleanedChildrenNode: nodesToCleanup})
   176  
   177  		for _, childNode := range existsChildNodes {
   178  			// best effort deletion
   179  			err := it.kubeClient.Delete(ctx, &childNode)
   180  			if err != nil {
   181  				it.logger.Error(err, "failed to delete outdated child node",
   182  					"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   183  					"child node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name),
   184  				)
   185  			}
   186  		}
   187  
   188  	}
   189  
   190  	if len(tasksToStartup) == 0 {
   191  		it.logger.Info("no need to spawn new child node", "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   192  		return nil
   193  	}
   194  
   195  	parentWorkflow := v1alpha1.Workflow{}
   196  	err = it.kubeClient.Get(ctx, types.NamespacedName{
   197  		Namespace: node.Namespace,
   198  		Name:      node.Spec.WorkflowName,
   199  	}, &parentWorkflow)
   200  	if err != nil {
   201  		it.logger.Error(err, "failed to fetch parent workflow",
   202  			"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   203  			"workflow name", node.Spec.WorkflowName)
   204  		return err
   205  	}
   206  
   207  	childNodes, err := renderNodesByTemplates(&parentWorkflow, &node, tasksToStartup...)
   208  	if err != nil {
   209  		it.logger.Error(err, "failed to render children childNodes",
   210  			"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   211  		return err
   212  	}
   213  
   214  	var childrenNames []string
   215  	for _, childNode := range childNodes {
   216  		err := it.kubeClient.Create(ctx, childNode)
   217  		if err != nil {
   218  			it.logger.Error(err, "failed to create child node",
   219  				"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   220  				"child node", childNode)
   221  			return err
   222  		}
   223  		childrenNames = append(childrenNames, childNode.Name)
   224  	}
   225  	it.eventRecorder.Event(&node, recorder.NodesCreated{ChildNodes: childrenNames})
   226  	it.logger.Info("parallel node spawn new child node",
   227  		"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   228  		"child node", childrenNames)
   229  
   230  	return nil
   231  }
   232