...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/statuscheck_reconciler.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"time"
    22  
    23  	"github.com/go-logr/logr"
    24  	"github.com/pkg/errors"
    25  	corev1 "k8s.io/api/core/v1"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	"k8s.io/client-go/util/retry"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    31  
    32  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    34  )
    35  
    36  type StatusCheckReconciler struct {
    37  	kubeClient    client.Client
    38  	eventRecorder recorder.ChaosRecorder
    39  	logger        logr.Logger
    40  }
    41  
    42  func NewStatusCheckReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *StatusCheckReconciler {
    43  	return &StatusCheckReconciler{kubeClient: kubeClient, eventRecorder: eventRecorder, logger: logger}
    44  }
    45  
    46  func (it *StatusCheckReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
    47  	startTime := time.Now()
    48  	defer func() {
    49  		it.logger.V(4).Info("finished syncing for status check node",
    50  			"node", request.NamespacedName,
    51  			"duration", time.Since(startTime),
    52  		)
    53  	}()
    54  
    55  	node := v1alpha1.WorkflowNode{}
    56  	err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
    57  	if err != nil {
    58  		return reconcile.Result{}, client.IgnoreNotFound(err)
    59  	}
    60  	if node.Spec.Type != v1alpha1.TypeStatusCheck {
    61  		return reconcile.Result{}, nil
    62  	}
    63  
    64  	it.logger.V(4).Info("resolve status check node", "node", request)
    65  	if err := it.syncStatusCheck(ctx, request, node); err != nil {
    66  		return reconcile.Result{}, errors.Wrap(err, "sync status check")
    67  	}
    68  
    69  	updateError := retry.RetryOnConflict(retry.DefaultRetry, it.updateNodeStatus(ctx, request))
    70  
    71  	return reconcile.Result{}, updateError
    72  }
    73  
    74  func (it *StatusCheckReconciler) syncStatusCheck(ctx context.Context, request reconcile.Request, node v1alpha1.WorkflowNode) error {
    75  	statusChecks, err := it.fetchChildrenStatusCheck(ctx, node)
    76  	if err != nil {
    77  		return errors.Wrap(err, "fetch children status check")
    78  	}
    79  
    80  	if WorkflowNodeFinished(node.Status) {
    81  		for _, item := range statusChecks {
    82  			// best efforts deletion
    83  			item := item
    84  			err := it.kubeClient.Delete(ctx, &item)
    85  			if client.IgnoreNotFound(err) != nil {
    86  				it.logger.Error(err, "failed to delete StatusCheck for workflow status check node",
    87  					"namespace", node.Namespace,
    88  					"nodeName", node.Name,
    89  					"statusCheckName", item.GetName(),
    90  				)
    91  				it.eventRecorder.Event(&node, recorder.StatusCheckDeletedFailed{Name: item.GetName()})
    92  			} else {
    93  				it.eventRecorder.Event(&node, recorder.StatusCheckDeleted{Name: item.GetName()})
    94  			}
    95  		}
    96  		return nil
    97  	}
    98  
    99  	if len(statusChecks) == 0 {
   100  		parentWorkflow, err := getParentWorkflow(ctx, it.kubeClient, node)
   101  		if err != nil {
   102  			return errors.WithStack(err)
   103  		}
   104  		spawnedStatusCheck, err := it.spawnStatusCheck(ctx, &node, parentWorkflow)
   105  		if err != nil {
   106  			it.eventRecorder.Event(&node, recorder.StatusCheckCreatedFailed{Name: spawnedStatusCheck.GetName()})
   107  			return errors.Wrap(err, "spawn status check")
   108  		}
   109  		it.eventRecorder.Event(&node, recorder.StatusCheckCreated{Name: spawnedStatusCheck.GetName()})
   110  	} else if len(statusChecks) > 1 {
   111  		var statusCheckToRemove []string
   112  		for _, item := range statusChecks[1:] {
   113  			statusCheckToRemove = append(statusCheckToRemove, item.GetName())
   114  		}
   115  		it.logger.Info("removing duplicated StatusCheck",
   116  			"node", request,
   117  			"statusCheckToRemove", statusCheckToRemove)
   118  
   119  		for _, item := range statusChecks[1:] {
   120  			// best efforts deletion
   121  			item := item
   122  			err := it.kubeClient.Delete(ctx, &item)
   123  			if client.IgnoreNotFound(err) != nil {
   124  				it.logger.Error(err, "failed to delete StatusCheck for workflow status check node",
   125  					"namespace", node.Namespace,
   126  					"node", node.Name,
   127  					"statusCheck", item.GetName(),
   128  				)
   129  			}
   130  		}
   131  	} else {
   132  		it.logger.V(4).Info("do not need spawn or remove StatusCheck")
   133  	}
   134  
   135  	return nil
   136  }
   137  
   138  func (it *StatusCheckReconciler) updateNodeStatus(ctx context.Context, request reconcile.Request) func() error {
   139  	return func() error {
   140  		node := v1alpha1.WorkflowNode{}
   141  		if err := it.kubeClient.Get(ctx, request.NamespacedName, &node); err != nil {
   142  			return client.IgnoreNotFound(err)
   143  		}
   144  
   145  		statusChecks, err := it.fetchChildrenStatusCheck(ctx, node)
   146  		if err != nil {
   147  			return client.IgnoreNotFound(err)
   148  		}
   149  		if len(statusChecks) > 1 {
   150  			it.logger.Info("the number of StatusCheck affected by status check node is more than 1",
   151  				"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   152  				"statusCheck", statusChecks,
   153  			)
   154  		} else if len(statusChecks) == 0 {
   155  			it.logger.Info("the number of StatusCheck affected by status check node is 0",
   156  				"node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   157  			)
   158  			return nil
   159  		}
   160  
   161  		statusCheck := statusChecks[0]
   162  		if statusCheck.IsCompleted() {
   163  			SetCondition(&node.Status, v1alpha1.WorkflowNodeCondition{
   164  				Type:   v1alpha1.ConditionAccomplished,
   165  				Status: corev1.ConditionTrue,
   166  				Reason: v1alpha1.StatusCheckCompleted,
   167  			})
   168  		} else {
   169  			SetCondition(&node.Status, v1alpha1.WorkflowNodeCondition{
   170  				Type:   v1alpha1.ConditionAccomplished,
   171  				Status: corev1.ConditionFalse,
   172  				Reason: "",
   173  			})
   174  		}
   175  
   176  		if node.Spec.AbortWithStatusCheck && needToAbort(statusCheck) {
   177  			SetCondition(&node.Status, v1alpha1.WorkflowNodeCondition{
   178  				Type:   v1alpha1.ConditionAborted,
   179  				Status: corev1.ConditionTrue,
   180  				Reason: v1alpha1.StatusCheckNotExceedSuccessThreshold,
   181  			})
   182  		} else {
   183  			SetCondition(&node.Status, v1alpha1.WorkflowNodeCondition{
   184  				Type:   v1alpha1.ConditionAborted,
   185  				Status: corev1.ConditionFalse,
   186  				Reason: "",
   187  			})
   188  		}
   189  
   190  		return client.IgnoreNotFound(it.kubeClient.Status().Update(ctx, &node))
   191  	}
   192  }
   193  
   194  func (it *StatusCheckReconciler) fetchChildrenStatusCheck(ctx context.Context, node v1alpha1.WorkflowNode) ([]v1alpha1.StatusCheck, error) {
   195  	controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   196  		MatchLabels: map[string]string{
   197  			v1alpha1.LabelControlledBy: node.Name,
   198  		},
   199  	})
   200  	if err != nil {
   201  		return nil, errors.Wrap(err, "build label selector")
   202  	}
   203  
   204  	var childStatusChecks v1alpha1.StatusCheckList
   205  	if err = it.kubeClient.List(ctx, &childStatusChecks, &client.ListOptions{LabelSelector: controlledByThisNode}); err != nil {
   206  		return nil, errors.Wrap(err, "list child status checks")
   207  	}
   208  	return childStatusChecks.Items, nil
   209  }
   210  
   211  func (it *StatusCheckReconciler) spawnStatusCheck(ctx context.Context, node *v1alpha1.WorkflowNode, workflow *v1alpha1.Workflow) (*v1alpha1.StatusCheck, error) {
   212  	if node.Spec.StatusCheck == nil {
   213  		return nil, errors.Errorf("node %s/%s does not contains spec of Target", node.Namespace, node.Name)
   214  	}
   215  	statusCheckSpec := node.Spec.StatusCheck.DeepCopy()
   216  	statusCheck := v1alpha1.StatusCheck{
   217  		ObjectMeta: metav1.ObjectMeta{
   218  			GenerateName: fmt.Sprintf("%s-", node.Name),
   219  			Namespace:    node.Namespace,
   220  			Labels: map[string]string{
   221  				v1alpha1.LabelControlledBy: node.Name,
   222  				v1alpha1.LabelWorkflow:     workflow.Name,
   223  			},
   224  			OwnerReferences: []metav1.OwnerReference{
   225  				{
   226  					APIVersion:         ApiVersion,
   227  					Kind:               KindWorkflowNode,
   228  					Name:               node.Name,
   229  					UID:                node.UID,
   230  					Controller:         &isController,
   231  					BlockOwnerDeletion: &blockOwnerDeletion,
   232  				},
   233  			},
   234  			Finalizers: []string{metav1.FinalizerDeleteDependents},
   235  		},
   236  		Spec: *statusCheckSpec,
   237  	}
   238  	if err := it.kubeClient.Create(ctx, &statusCheck); err != nil {
   239  		return nil, errors.Wrap(err, "create status check")
   240  	}
   241  	return &statusCheck, nil
   242  }
   243  
   244  func getParentWorkflow(ctx context.Context, kubeClient client.Client, node v1alpha1.WorkflowNode) (*v1alpha1.Workflow, error) {
   245  	workflowName, ok := node.Labels[v1alpha1.LabelWorkflow]
   246  	if !ok {
   247  		return nil, errors.Errorf("node %s/%s does not contains label %s", node.Namespace, node.Name, v1alpha1.LabelWorkflow)
   248  	}
   249  	parentWorkflow := v1alpha1.Workflow{}
   250  	if err := kubeClient.Get(ctx, types.NamespacedName{
   251  		Namespace: node.Namespace,
   252  		Name:      workflowName,
   253  	}, &parentWorkflow); err != nil {
   254  		return nil, errors.Wrap(err, "get parent workflow")
   255  	}
   256  	return &parentWorkflow, nil
   257  }
   258  
   259  func needToAbort(statusCheck v1alpha1.StatusCheck) bool {
   260  	if !statusCheck.IsCompleted() {
   261  		return false
   262  	}
   263  	for _, condition := range statusCheck.Status.Conditions {
   264  		if condition.Type == v1alpha1.StatusCheckConditionSuccessThresholdExceed &&
   265  			condition.Status != corev1.ConditionTrue {
   266  			return true
   267  		}
   268  	}
   269  	return false
   270  }
   271