...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/serial_node_reconciler.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"strings"
    22  	"time"
    23  
    24  	"github.com/go-logr/logr"
    25  	corev1 "k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/types"
    27  	"k8s.io/client-go/util/retry"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    29  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    30  
    31  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    33  )
    34  
    35  // SerialNodeReconciler watches on nodes which type is Serial
    36  type SerialNodeReconciler struct {
    37  	*ChildNodesFetcher
    38  	kubeClient    client.Client
    39  	eventRecorder recorder.ChaosRecorder
    40  	logger        logr.Logger
    41  }
    42  
    43  func NewSerialNodeReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *SerialNodeReconciler {
    44  	return &SerialNodeReconciler{
    45  		ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
    46  		kubeClient:        kubeClient,
    47  		eventRecorder:     eventRecorder,
    48  		logger:            logger,
    49  	}
    50  }
    51  
    52  // Reconcile should be invoked by: changes on a serial node, or changes on a node which controlled by serial node.
    53  // So we need to setup EnqueueRequestForOwner while setting up this reconciler.
    54  //
    55  // Reconcile does these things:
    56  // 1. walk through on tasks in spec, compare them with the node instances (listed with v1alpha1.LabelControlledBy),
    57  // remove the outdated instance;
    58  // 2. find out the node needs to be created, then create one if exists;
    59  // 3. update the status of serial node;
    60  //
    61  // In this reconciler, we SHOULD NOT use v1alpha1.WorkflowNodeStatus as the state.
    62  // Because v1alpha1.WorkflowNodeStatus is generated by this reconciler, if that itself also depends on that state,
    63  // it will be complex to decide when to update the status, and even require to update status more than one time,
    64  // that sounds not good.
    65  // And We MUST update v1alpha1.WorkflowNodeStatus by "observing real world" at EACH TIME, such as listing controlled
    66  // children nodes.
    67  // We only update v1alpha1.WorkflowNodeStatus once(wrapped with retry on conflict), at the end of this method.
    68  func (it *SerialNodeReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
    69  	startTime := time.Now()
    70  	defer func() {
    71  		it.logger.V(4).Info("Finished syncing for serial node",
    72  			"node", request.NamespacedName,
    73  			"duration", time.Since(startTime),
    74  		)
    75  	}()
    76  
    77  	node := v1alpha1.WorkflowNode{}
    78  	err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
    79  	if err != nil {
    80  		return reconcile.Result{}, client.IgnoreNotFound(err)
    81  	}
    82  
    83  	// only resolve serial nodes
    84  	if node.Spec.Type != v1alpha1.TypeSerial {
    85  		return reconcile.Result{}, nil
    86  	}
    87  
    88  	it.logger.V(4).Info("resolve serial node", "node", request)
    89  
    90  	// make effects, create/remove children nodes
    91  	err = it.syncChildNodes(ctx, node)
    92  	if err != nil {
    93  		return reconcile.Result{}, err
    94  	}
    95  
    96  	// update status
    97  	updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
    98  		nodeNeedUpdate := v1alpha1.WorkflowNode{}
    99  		err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
   100  		if err != nil {
   101  			return err
   102  		}
   103  
   104  		activeChildren, finishedChildren, err := it.fetchChildNodes(ctx, nodeNeedUpdate)
   105  		if err != nil {
   106  			return err
   107  		}
   108  
   109  		nodeNeedUpdate.Status.FinishedChildren = nil
   110  		for _, finishedChild := range finishedChildren {
   111  			nodeNeedUpdate.Status.FinishedChildren = append(nodeNeedUpdate.Status.FinishedChildren,
   112  				corev1.LocalObjectReference{
   113  					Name: finishedChild.Name,
   114  				})
   115  		}
   116  
   117  		nodeNeedUpdate.Status.ActiveChildren = nil
   118  		for _, activeChild := range activeChildren {
   119  			nodeNeedUpdate.Status.ActiveChildren = append(nodeNeedUpdate.Status.ActiveChildren,
   120  				corev1.LocalObjectReference{
   121  					Name: activeChild.Name,
   122  				})
   123  		}
   124  
   125  		if len(activeChildren) > 1 {
   126  			it.logger.Info("warning: serial node has more than 1 active children", "namespace", nodeNeedUpdate.Namespace, "name", nodeNeedUpdate.Name, "children", nodeNeedUpdate.Status.ActiveChildren)
   127  		}
   128  
   129  		// TODO: also check the consistent between spec in task and the spec in child node
   130  		if len(finishedChildren) == len(nodeNeedUpdate.Spec.Children) {
   131  			if !WorkflowNodeFinished(nodeNeedUpdate.Status) {
   132  				it.eventRecorder.Event(&nodeNeedUpdate, recorder.NodeAccomplished{})
   133  			}
   134  			SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   135  				Type:   v1alpha1.ConditionAccomplished,
   136  				Status: corev1.ConditionTrue,
   137  				Reason: "",
   138  			})
   139  		} else {
   140  			SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   141  				Type:   v1alpha1.ConditionAccomplished,
   142  				Status: corev1.ConditionFalse,
   143  				Reason: "",
   144  			})
   145  		}
   146  
   147  		return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
   148  	})
   149  
   150  	if updateError != nil {
   151  		it.logger.Error(err, "failed to update the status of node", "node", request)
   152  		return reconcile.Result{}, updateError
   153  	}
   154  
   155  	return reconcile.Result{}, nil
   156  }
   157  
   158  // syncChildNodes reconciles the children nodes to following the desired states.
   159  // It does the first 2 steps mentioned in Reconcile.
   160  //
   161  // Notice again: we SHOULD NOT decide the operation based on v1alpha1.WorkflowNodeStatus, please
   162  // use kubeClient to fetch information from real world.
   163  func (it *SerialNodeReconciler) syncChildNodes(ctx context.Context, node v1alpha1.WorkflowNode) error {
   164  
   165  	// empty serial node
   166  	if len(node.Spec.Children) == 0 {
   167  		it.logger.V(4).Info("empty serial node, NOOP",
   168  			"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   169  		)
   170  		return nil
   171  	}
   172  
   173  	if WorkflowNodeFinished(node.Status) {
   174  		return nil
   175  	}
   176  
   177  	activeChildNodes, finishedChildNodes, err := it.fetchChildNodes(ctx, node)
   178  	if err != nil {
   179  		return err
   180  	}
   181  	var taskToStartup string
   182  	if len(activeChildNodes) == 0 {
   183  		// no active children, trying to spawn a new one
   184  		for index, task := range node.Spec.Children {
   185  			// Walking through on the Spec.Children, each one of task SHOULD has one corresponding workflow node;
   186  			// If the spec of one task has been changed, the corresponding workflow node and other
   187  			// workflow nodes **behinds** that workflow node will be deleted.
   188  			// That's so called "partial rerun" feature.
   189  			// For example:
   190  			// One serial node have three children nodes: A, B, C, and all of them have finished.
   191  			// Then user updates the Spec.Children[B], the expected behavior is workflow node B and C will be
   192  			// deleted, then create a new node that refs to B, no effects on A.
   193  			if index < len(finishedChildNodes) {
   194  				// TODO: if the definition/spec of task changed, we should also respawn the node
   195  				// child node start with task name
   196  
   197  				// TODO: maybe the changes on Spec.Children should be concerned each time, not only during spawning
   198  				// new instances, for shutdown outdated nodes **instantly**
   199  
   200  				if strings.HasPrefix(task, finishedChildNodes[index].Name) {
   201  					// TODO: emit event
   202  					taskToStartup = task
   203  
   204  					// TODO: nodes to delete should be all other unrecognized children nodes, include not contained in finishedChildNodes
   205  					// delete that related nodes with best-effort pattern
   206  					nodesToDelete := finishedChildNodes[index:]
   207  
   208  					if len(nodesToDelete) > 0 {
   209  						var nodesToCleanup []string
   210  						for _, item := range nodesToDelete {
   211  							nodesToCleanup = append(nodesToCleanup, item.Name)
   212  						}
   213  						it.eventRecorder.Event(&node, recorder.RerunBySpecChanged{CleanedChildrenNode: nodesToCleanup})
   214  
   215  						for _, refToDelete := range nodesToDelete {
   216  							nodeToDelete := v1alpha1.WorkflowNode{}
   217  							err := it.kubeClient.Get(ctx, types.NamespacedName{
   218  								Namespace: node.Namespace,
   219  								Name:      refToDelete.Name,
   220  							}, &nodeToDelete)
   221  							if client.IgnoreNotFound(err) != nil {
   222  								it.logger.Error(err, "failed to fetch outdated child node",
   223  									"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   224  									"child node", fmt.Sprintf("%s/%s", node.Namespace, nodeToDelete.Name))
   225  							}
   226  							err = it.kubeClient.Delete(ctx, &nodeToDelete)
   227  							if client.IgnoreNotFound(err) != nil {
   228  								it.logger.Error(err, "failed to fetch outdated child node",
   229  									"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   230  									"child node", fmt.Sprintf("%s/%s", node.Namespace, nodeToDelete.Name))
   231  							}
   232  						}
   233  					}
   234  					break
   235  				}
   236  			} else {
   237  				// spawn child node
   238  				taskToStartup = task
   239  				break
   240  			}
   241  		}
   242  	} else {
   243  		it.logger.V(4).Info("serial node has active child/children, skip scheduling",
   244  			"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   245  			"active children", activeChildNodes)
   246  	}
   247  
   248  	if len(taskToStartup) == 0 {
   249  		it.logger.Info("no need to spawn new child node", "node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   250  		return nil
   251  	}
   252  
   253  	parentWorkflow := v1alpha1.Workflow{}
   254  	err = it.kubeClient.Get(ctx, types.NamespacedName{
   255  		Namespace: node.Namespace,
   256  		Name:      node.Spec.WorkflowName,
   257  	}, &parentWorkflow)
   258  	if err != nil {
   259  		it.logger.Error(err, "failed to fetch parent workflow",
   260  			"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   261  			"workflow name", node.Spec.WorkflowName)
   262  		return err
   263  	}
   264  	// TODO: using ordered id instead of random suffix is better, like StatefulSet, also related to the sorting
   265  	childNodes, err := renderNodesByTemplates(&parentWorkflow, &node, taskToStartup)
   266  	if err != nil {
   267  		it.logger.Error(err, "failed to render children childNodes",
   268  			"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   269  		return err
   270  	}
   271  
   272  	var childrenNames []string
   273  	for _, childNode := range childNodes {
   274  		err := it.kubeClient.Create(ctx, childNode)
   275  		if err != nil {
   276  			it.logger.Error(err, "failed to create child node",
   277  				"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   278  				"child node", childNode)
   279  			return err
   280  		}
   281  		childrenNames = append(childrenNames, childNode.Name)
   282  	}
   283  	it.eventRecorder.Event(&node, recorder.NodesCreated{ChildNodes: childrenNames})
   284  	it.logger.Info("serial node spawn new child node",
   285  		"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   286  		"child node", childrenNames)
   287  
   288  	return nil
   289  }
   290