...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/workflow_entry_reconciler.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"sort"
    22  	"time"
    23  
    24  	"github.com/go-logr/logr"
    25  	"github.com/pkg/errors"
    26  	corev1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/client-go/util/retry"
    29  	"k8s.io/utils/pointer"
    30  	"sigs.k8s.io/controller-runtime/pkg/client"
    31  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    32  
    33  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    34  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    35  )
    36  
    37  // WorkflowEntryReconciler watches on Workflow, creates new Entry Node for created Workflow.
    38  type WorkflowEntryReconciler struct {
    39  	kubeClient    client.Client
    40  	eventRecorder recorder.ChaosRecorder
    41  	logger        logr.Logger
    42  }
    43  
    44  func NewWorkflowEntryReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *WorkflowEntryReconciler {
    45  	return &WorkflowEntryReconciler{kubeClient: kubeClient, eventRecorder: eventRecorder, logger: logger}
    46  }
    47  
    48  func (it *WorkflowEntryReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
    49  	startTime := time.Now()
    50  	defer func() {
    51  		it.logger.V(4).Info("Finished syncing for workflow",
    52  			"node", request.NamespacedName,
    53  			"duration", time.Since(startTime),
    54  		)
    55  	}()
    56  
    57  	workflow := v1alpha1.Workflow{}
    58  	err := it.kubeClient.Get(ctx, request.NamespacedName, &workflow)
    59  	if err != nil {
    60  		return reconcile.Result{}, client.IgnoreNotFound(err)
    61  	}
    62  
    63  	entryNodes, err := fetchEntryNode(ctx, it.kubeClient, workflow)
    64  	if err != nil {
    65  		it.logger.Error(err, "failed to list entry nodes of workflow",
    66  			"workflow", request.NamespacedName)
    67  		return reconcile.Result{}, err
    68  	}
    69  
    70  	if len(entryNodes) == 0 {
    71  		func() {
    72  			// Not scheduled yet, spawn the entry workflow node
    73  			spawnedEntryNode, err := it.spawnEntryNode(ctx, workflow)
    74  			if err != nil {
    75  				it.eventRecorder.Event(&workflow, recorder.InvalidEntry{
    76  					EntryTemplate: workflow.Spec.Entry,
    77  				})
    78  				it.logger.Error(err, "failed to spawn new entry node of workflow",
    79  					"workflow", request.NamespacedName,
    80  					"entry", workflow.Spec.Entry)
    81  				// failed to spawn new entry, but will not break the reconcile, continue to sync status
    82  				return
    83  			}
    84  			it.logger.Info(
    85  				"entry node for workflow created",
    86  				"workflow", request.NamespacedName,
    87  				"entry node", fmt.Sprintf("%s/%s", spawnedEntryNode.Namespace, spawnedEntryNode.Name),
    88  			)
    89  			it.eventRecorder.Event(&workflow, recorder.EntryCreated{Entry: spawnedEntryNode.Name})
    90  		}()
    91  	}
    92  
    93  	if len(entryNodes) > 1 {
    94  		var nodeNames []string
    95  		for _, node := range entryNodes {
    96  			nodeNames = append(nodeNames, node.GetName())
    97  		}
    98  		it.logger.Info("there are more than 1 entry nodes of workflow, cleaning up except first one",
    99  			"workflow", request.NamespacedName,
   100  			"entry nodes", nodeNames,
   101  		)
   102  		for _, redundantEntryNode := range entryNodes[1:] {
   103  			redundantEntryNode := redundantEntryNode
   104  			// best effort deletion
   105  			err := it.kubeClient.Delete(ctx, &redundantEntryNode)
   106  			if err != nil {
   107  				it.logger.Error(err,
   108  					"failed to delete redundant entry node",
   109  					"workflow", request.NamespacedName,
   110  					"redundant entry node", fmt.Sprintf("%s/%s", redundantEntryNode.Namespace, redundantEntryNode.Name),
   111  				)
   112  			}
   113  		}
   114  	}
   115  
   116  	// sync the status
   117  	updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
   118  		workflowNeedUpdate := v1alpha1.Workflow{}
   119  		err := it.kubeClient.Get(ctx, request.NamespacedName, &workflowNeedUpdate)
   120  		if err != nil {
   121  			it.logger.Error(err,
   122  				"failed to fetch the latest state of workflow",
   123  				"workflow", request.NamespacedName,
   124  			)
   125  			return err
   126  		}
   127  
   128  		entryNodes, err := fetchEntryNode(ctx, it.kubeClient, workflowNeedUpdate)
   129  		if err != nil {
   130  			it.logger.Error(err,
   131  				"failed to list entry nodes of workflow",
   132  				"workflow", request.NamespacedName,
   133  			)
   134  			return err
   135  		}
   136  
   137  		if len(entryNodes) > 0 {
   138  			if len(entryNodes) > 1 {
   139  				var nodeNames []string
   140  				for _, node := range entryNodes {
   141  					nodeNames = append(nodeNames, node.GetName())
   142  				}
   143  				it.logger.Info("there are more than 1 entry nodes of workflow",
   144  					"workflow", request.NamespacedName,
   145  					"entry nodes", nodeNames,
   146  				)
   147  			}
   148  
   149  			workflowNeedUpdate.Status.EntryNode = pointer.StringPtr(entryNodes[0].Name)
   150  			SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
   151  				Type:   v1alpha1.WorkflowConditionScheduled,
   152  				Status: corev1.ConditionTrue,
   153  				Reason: "",
   154  			})
   155  
   156  			if WorkflowNodeFinished(entryNodes[0].Status) {
   157  				SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
   158  					Type:   v1alpha1.WorkflowConditionAccomplished,
   159  					Status: corev1.ConditionTrue,
   160  					Reason: "",
   161  				})
   162  				if workflowNeedUpdate.Status.EndTime == nil {
   163  					now := metav1.NewTime(time.Now())
   164  					workflowNeedUpdate.Status.EndTime = &now
   165  					it.eventRecorder.Event(&workflow, recorder.WorkflowAccomplished{})
   166  				}
   167  			} else {
   168  				SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
   169  					Type:   v1alpha1.WorkflowConditionAccomplished,
   170  					Status: corev1.ConditionFalse,
   171  					Reason: "",
   172  				})
   173  				workflowNeedUpdate.Status.EndTime = nil
   174  			}
   175  		} else {
   176  			SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
   177  				Type:   v1alpha1.WorkflowConditionScheduled,
   178  				Status: corev1.ConditionFalse,
   179  				Reason: "",
   180  			})
   181  			SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
   182  				Type:   v1alpha1.WorkflowConditionAccomplished,
   183  				Status: corev1.ConditionFalse,
   184  				Reason: "",
   185  			})
   186  			workflowNeedUpdate.Status.EndTime = nil
   187  		}
   188  
   189  		if workflowNeedUpdate.Status.StartTime == nil {
   190  			tmp := metav1.NewTime(startTime)
   191  			workflowNeedUpdate.Status.StartTime = &tmp
   192  		}
   193  
   194  		err = it.kubeClient.Status().Update(ctx, &workflowNeedUpdate)
   195  		if err != nil {
   196  			it.logger.Error(err, "failed to update workflowNeedUpdate status")
   197  			return err
   198  		}
   199  		return nil
   200  	})
   201  
   202  	return reconcile.Result{}, client.IgnoreNotFound(updateError)
   203  }
   204  
   205  // fetchEntryNode will return the entry workflow node(s) of that workflow, return nil if not exists.
   206  //
   207  // The expected length of result is 1, but due to the reconcile and the inconsistent cache, there might be more than one
   208  // entry nodes created, if should be reported to the upper logic.
   209  func fetchEntryNode(ctx context.Context, kubeClient client.Client, workflow v1alpha1.Workflow) ([]v1alpha1.WorkflowNode, error) {
   210  	entryNodesList := v1alpha1.WorkflowNodeList{}
   211  	controlledByWorkflow, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   212  		MatchLabels: map[string]string{
   213  			v1alpha1.LabelControlledBy: workflow.Name,
   214  		},
   215  	})
   216  	if err != nil {
   217  		return nil, errors.Wrap(err, "build label selector")
   218  	}
   219  
   220  	err = kubeClient.List(ctx, &entryNodesList, &client.ListOptions{
   221  		Namespace:     workflow.Namespace,
   222  		LabelSelector: controlledByWorkflow,
   223  	})
   224  	if err != nil {
   225  		return nil, errors.Wrapf(err, "list entry workflow node")
   226  	}
   227  
   228  	sortedEntryNodes := SortByCreationTimestamp(entryNodesList.Items)
   229  	sort.Sort(sortedEntryNodes)
   230  
   231  	return sortedEntryNodes, nil
   232  }
   233  
   234  // spawnEntryNode will create **one** entry workflow node for current workflow
   235  func (it *WorkflowEntryReconciler) spawnEntryNode(ctx context.Context, workflow v1alpha1.Workflow) (*v1alpha1.WorkflowNode, error) {
   236  	// This workflow is just created, create entry node
   237  	nodes, err := renderNodesByTemplates(&workflow, nil, workflow.Spec.Entry)
   238  	if err != nil {
   239  		it.logger.Error(err, "failed create entry node", "workflow", workflow.Name, "entry", workflow.Spec.Entry)
   240  		return nil, err
   241  	}
   242  
   243  	if len(nodes) > 1 {
   244  		it.logger.Info("the results of entry nodes are more than 1, will only pick the first one",
   245  			"workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name),
   246  			"nodes", nodes,
   247  		)
   248  	}
   249  
   250  	entryNode := nodes[0]
   251  	err = it.kubeClient.Create(ctx, entryNode)
   252  	if err != nil {
   253  		it.logger.Info("failed to create workflow nodes")
   254  		return nil, err
   255  	}
   256  	it.logger.Info("entry workflow node created",
   257  		"workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name),
   258  		"entry node", entryNode.Name,
   259  	)
   260  
   261  	return entryNode, nil
   262  }
   263