...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/parallel_node_reconciler.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"time"
    22  
    23  	"github.com/go-logr/logr"
    24  	corev1 "k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	"k8s.io/client-go/util/retry"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    29  
    30  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    32  )
    33  
    34  // ParallelNodeReconciler watches on nodes which type is Parallel
    35  type ParallelNodeReconciler struct {
    36  	*ChildNodesFetcher
    37  	kubeClient    client.Client
    38  	eventRecorder recorder.ChaosRecorder
    39  	logger        logr.Logger
    40  }
    41  
    42  func NewParallelNodeReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *ParallelNodeReconciler {
    43  	return &ParallelNodeReconciler{
    44  		ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
    45  		kubeClient:        kubeClient,
    46  		eventRecorder:     eventRecorder,
    47  		logger:            logger,
    48  	}
    49  }
    50  
    51  // Reconcile is extremely like the one in SerialNodeReconciler, only allows the parallel schedule, and respawn **all** the children tasks during retry
    52  func (it *ParallelNodeReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
    53  	startTime := time.Now()
    54  	defer func() {
    55  		it.logger.V(4).Info("Finished syncing for parallel node",
    56  			"node", request.NamespacedName,
    57  			"duration", time.Since(startTime),
    58  		)
    59  	}()
    60  
    61  	node := v1alpha1.WorkflowNode{}
    62  	err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
    63  	if err != nil {
    64  		return reconcile.Result{}, client.IgnoreNotFound(err)
    65  	}
    66  
    67  	// only resolve parallel nodes
    68  	if node.Spec.Type != v1alpha1.TypeParallel {
    69  		return reconcile.Result{}, nil
    70  	}
    71  
    72  	it.logger.V(4).Info("resolve parallel node", "node", request)
    73  
    74  	// make effects, create/remove children nodes
    75  	err = it.syncChildNodes(ctx, node)
    76  	if err != nil {
    77  		return reconcile.Result{}, err
    78  	}
    79  
    80  	// update status
    81  	updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
    82  		nodeNeedUpdate := v1alpha1.WorkflowNode{}
    83  		err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
    84  		if err != nil {
    85  			return err
    86  		}
    87  
    88  		activeChildren, finishedChildren, err := it.fetchChildNodes(ctx, nodeNeedUpdate)
    89  		if err != nil {
    90  			return err
    91  		}
    92  
    93  		nodeNeedUpdate.Status.FinishedChildren = nil
    94  		for _, finishedChild := range finishedChildren {
    95  			nodeNeedUpdate.Status.FinishedChildren = append(nodeNeedUpdate.Status.FinishedChildren,
    96  				corev1.LocalObjectReference{
    97  					Name: finishedChild.Name,
    98  				})
    99  		}
   100  
   101  		nodeNeedUpdate.Status.ActiveChildren = nil
   102  		for _, activeChild := range activeChildren {
   103  			nodeNeedUpdate.Status.ActiveChildren = append(nodeNeedUpdate.Status.ActiveChildren,
   104  				corev1.LocalObjectReference{
   105  					Name: activeChild.Name,
   106  				})
   107  		}
   108  
   109  		// TODO: also check the consistent between spec in task and the spec in child node
   110  		if len(finishedChildren) == len(nodeNeedUpdate.Spec.Children) {
   111  			if !WorkflowNodeFinished(nodeNeedUpdate.Status) {
   112  				it.eventRecorder.Event(&nodeNeedUpdate, recorder.NodeAccomplished{})
   113  			}
   114  			SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   115  				Type:   v1alpha1.ConditionAccomplished,
   116  				Status: corev1.ConditionTrue,
   117  				Reason: "",
   118  			})
   119  		} else {
   120  			SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   121  				Type:   v1alpha1.ConditionAccomplished,
   122  				Status: corev1.ConditionFalse,
   123  				Reason: "",
   124  			})
   125  		}
   126  
   127  		return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
   128  	})
   129  
   130  	if updateError != nil {
   131  		it.logger.Error(err, "failed to update the status of node", "node", request)
   132  		return reconcile.Result{}, updateError
   133  	}
   134  
   135  	return reconcile.Result{}, nil
   136  }
   137  
   138  func (it *ParallelNodeReconciler) syncChildNodes(ctx context.Context, node v1alpha1.WorkflowNode) error {
   139  
   140  	// empty parallel node
   141  	if len(node.Spec.Children) == 0 {
   142  		it.logger.V(4).Info("empty parallel node, NOOP",
   143  			"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   144  		)
   145  		return nil
   146  	}
   147  
   148  	if WorkflowNodeFinished(node.Status) {
   149  		return nil
   150  	}
   151  
   152  	activeChildNodes, finishedChildNodes, err := it.fetchChildNodes(ctx, node)
   153  	if err != nil {
   154  		return err
   155  	}
   156  	existsChildNodes := append(activeChildNodes, finishedChildNodes...)
   157  
   158  	var taskNamesOfNodes []string
   159  	for _, childNode := range existsChildNodes {
   160  		taskNamesOfNodes = append(taskNamesOfNodes, getTaskNameFromGeneratedName(childNode.GetName()))
   161  	}
   162  
   163  	var tasksToStartup []string
   164  
   165  	if len(existsChildNodes) == 0 {
   166  		tasksToStartup = node.Spec.Children
   167  	}
   168  	// TODO: check the specific of task and workflow nodes
   169  	// the definition of Spec.Children changed, remove all the existed nodes
   170  	if len(existsChildNodes) > 0 && (len(setDifference(taskNamesOfNodes, node.Spec.Children)) > 0 || len(setDifference(node.Spec.Children, taskNamesOfNodes)) > 0) {
   171  		tasksToStartup = node.Spec.Children
   172  
   173  		var nodesToCleanup []string
   174  		for _, item := range existsChildNodes {
   175  			nodesToCleanup = append(nodesToCleanup, item.Name)
   176  		}
   177  		it.eventRecorder.Event(&node, recorder.RerunBySpecChanged{CleanedChildrenNode: nodesToCleanup})
   178  
   179  		for _, childNode := range existsChildNodes {
   180  			// best effort deletion
   181  			err := it.kubeClient.Delete(ctx, &childNode)
   182  			if err != nil {
   183  				it.logger.Error(err, "failed to delete outdated child node",
   184  					"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   185  					"child node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name),
   186  				)
   187  			}
   188  		}
   189  
   190  	}
   191  
   192  	if len(tasksToStartup) == 0 {
   193  		it.logger.Info("no need to spawn new child node", "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   194  		return nil
   195  	}
   196  
   197  	parentWorkflow := v1alpha1.Workflow{}
   198  	err = it.kubeClient.Get(ctx, types.NamespacedName{
   199  		Namespace: node.Namespace,
   200  		Name:      node.Spec.WorkflowName,
   201  	}, &parentWorkflow)
   202  	if err != nil {
   203  		it.logger.Error(err, "failed to fetch parent workflow",
   204  			"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   205  			"workflow name", node.Spec.WorkflowName)
   206  		return err
   207  	}
   208  
   209  	childNodes, err := renderNodesByTemplates(&parentWorkflow, &node, tasksToStartup...)
   210  	if err != nil {
   211  		it.logger.Error(err, "failed to render children childNodes",
   212  			"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   213  		return err
   214  	}
   215  
   216  	var childrenNames []string
   217  	for _, childNode := range childNodes {
   218  		err := it.kubeClient.Create(ctx, childNode)
   219  		if err != nil {
   220  			it.logger.Error(err, "failed to create child node",
   221  				"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   222  				"child node", childNode)
   223  			return err
   224  		}
   225  		childrenNames = append(childrenNames, childNode.Name)
   226  	}
   227  	it.eventRecorder.Event(&node, recorder.NodesCreated{ChildNodes: childrenNames})
   228  	it.logger.Info("parallel node spawn new child node",
   229  		"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   230  		"child node", childrenNames)
   231  
   232  	return nil
   233  }
   234