...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/abort_node_reconciler.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  
    22  	"github.com/go-logr/logr"
    23  	"github.com/pkg/errors"
    24  	corev1 "k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	"k8s.io/client-go/util/retry"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    29  
    30  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    32  )
    33  
    34  type AbortNodeReconciler struct {
    35  	*ChildNodesFetcher
    36  	kubeClient    client.Client
    37  	eventRecorder recorder.ChaosRecorder
    38  	logger        logr.Logger
    39  }
    40  
    41  func NewAbortNodeReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *AbortNodeReconciler {
    42  	return &AbortNodeReconciler{
    43  		ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
    44  		kubeClient:        kubeClient,
    45  		eventRecorder:     eventRecorder,
    46  		logger:            logger,
    47  	}
    48  }
    49  
    50  // Reconcile watches `WorkflowNodes`, if:
    51  // 1. the abort condition is `False`, just return.
    52  // 2. the abort condition is `True`, the node is not `TypeStatusCheck`, it will propagate abort condition to children nodes.
    53  // 3. the abort condition is `True`, the node is `TypeStatusCheck`, it will add abort annotation to the parent workflow.
    54  func (it *AbortNodeReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
    55  	node := v1alpha1.WorkflowNode{}
    56  	err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
    57  	if err != nil {
    58  		return reconcile.Result{}, client.IgnoreNotFound(err)
    59  	}
    60  
    61  	if !ConditionEqualsTo(node.Status, v1alpha1.ConditionAborted, corev1.ConditionTrue) {
    62  		return reconcile.Result{}, nil
    63  	}
    64  
    65  	if node.Spec.Type != v1alpha1.TypeStatusCheck {
    66  		// if this node is aborted, try propagating to children node
    67  		return reconcile.Result{}, it.propagateAbortToChildren(ctx, &node)
    68  	}
    69  
    70  	updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
    71  		if err := it.abortWorkflow(ctx, node); client.IgnoreNotFound(err) != nil {
    72  			return errors.Wrapf(err, "abort parent workflow")
    73  		}
    74  		return nil
    75  	})
    76  
    77  	return reconcile.Result{}, client.IgnoreNotFound(updateError)
    78  }
    79  
    80  func (it *AbortNodeReconciler) propagateAbortToChildren(ctx context.Context, parent *v1alpha1.WorkflowNode) error {
    81  	switch parent.Spec.Type {
    82  	case v1alpha1.TypeSerial, v1alpha1.TypeParallel, v1alpha1.TypeTask:
    83  		activeChildNodes, _, err := it.ChildNodesFetcher.fetchChildNodes(ctx, *parent)
    84  		if err != nil {
    85  			return errors.Wrap(err, "fetch children nodes")
    86  		}
    87  		for _, childNode := range activeChildNodes {
    88  			childNode := childNode
    89  
    90  			if WorkflowNodeFinished(childNode.Status) {
    91  				it.logger.Info("child node already finished, skip for propagate abort", "node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name))
    92  				continue
    93  			}
    94  
    95  			err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
    96  				nodeNeedUpdate := v1alpha1.WorkflowNode{}
    97  				err := it.kubeClient.Get(ctx, types.NamespacedName{
    98  					Namespace: childNode.Namespace,
    99  					Name:      childNode.Name,
   100  				}, &nodeNeedUpdate)
   101  				if err != nil {
   102  					return errors.Wrap(err, "get child workflow node")
   103  				}
   104  				if ConditionEqualsTo(nodeNeedUpdate.Status, v1alpha1.ConditionAborted, corev1.ConditionTrue) {
   105  					it.logger.Info("omit propagate abort to children, child already aborted",
   106  						"node", fmt.Sprintf("%s/%s", nodeNeedUpdate.Namespace, nodeNeedUpdate.Name),
   107  						"parent node", fmt.Sprintf("%s/%s", parent.Namespace, parent.Name),
   108  					)
   109  					return nil
   110  				}
   111  				SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   112  					Type:   v1alpha1.ConditionAborted,
   113  					Status: corev1.ConditionTrue,
   114  					Reason: v1alpha1.ParentNodeAborted,
   115  				})
   116  				it.eventRecorder.Event(&nodeNeedUpdate, recorder.ParentNodeAborted{ParentNodeName: parent.Name})
   117  				return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
   118  			})
   119  			if err != nil {
   120  				return errors.Wrap(err, "update status of child workflow node")
   121  			}
   122  			it.logger.Info("propagate abort for child node",
   123  				"child node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name),
   124  				"parent node", fmt.Sprintf("%s/%s", parent.Namespace, parent.Name),
   125  			)
   126  		}
   127  		return nil
   128  	default:
   129  		it.logger.V(4).Info("no need to propagate with this type of workflow node", "type", parent.Spec.Type)
   130  		return nil
   131  	}
   132  }
   133  
   134  func (it *AbortNodeReconciler) abortWorkflow(ctx context.Context, node v1alpha1.WorkflowNode) error {
   135  	parentWorkflow, err := getParentWorkflow(ctx, it.kubeClient, node)
   136  	if err != nil {
   137  		return errors.WithStack(err)
   138  	}
   139  	if WorkflowAborted(*parentWorkflow) {
   140  		return nil
   141  	}
   142  
   143  	it.logger.Info("add abort annotation to parent workflow",
   144  		"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   145  		"workflow", fmt.Sprintf("%s/%s", parentWorkflow.Namespace, parentWorkflow.Name))
   146  	parentWorkflow.Annotations[v1alpha1.WorkflowAnnotationAbort] = "true"
   147  	return it.kubeClient.Update(ctx, parentWorkflow)
   148  }
   149