...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/workflow_entry_reconciler.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"sort"
    22  	"time"
    23  
    24  	"github.com/go-logr/logr"
    25  	corev1 "k8s.io/api/core/v1"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/client-go/util/retry"
    28  	"k8s.io/utils/pointer"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    31  
    32  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    34  )
    35  
    36  // WorkflowEntryReconciler watches on Workflow, creates new Entry Node for created Workflow.
    37  type WorkflowEntryReconciler struct {
    38  	kubeClient    client.Client
    39  	eventRecorder recorder.ChaosRecorder
    40  	logger        logr.Logger
    41  }
    42  
    43  func NewWorkflowEntryReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *WorkflowEntryReconciler {
    44  	return &WorkflowEntryReconciler{kubeClient: kubeClient, eventRecorder: eventRecorder, logger: logger}
    45  }
    46  
    47  func (it *WorkflowEntryReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
    48  	startTime := time.Now()
    49  	defer func() {
    50  		it.logger.V(4).Info("Finished syncing for workflow",
    51  			"node", request.NamespacedName,
    52  			"duration", time.Since(startTime),
    53  		)
    54  	}()
    55  
    56  	workflow := v1alpha1.Workflow{}
    57  	err := it.kubeClient.Get(ctx, request.NamespacedName, &workflow)
    58  	if err != nil {
    59  		return reconcile.Result{}, client.IgnoreNotFound(err)
    60  	}
    61  
    62  	entryNodes, err := it.fetchEntryNode(ctx, workflow)
    63  	if err != nil {
    64  		it.logger.Error(err, "failed to list entry nodes of workflow",
    65  			"workflow", request.NamespacedName)
    66  		return reconcile.Result{}, err
    67  	}
    68  
    69  	if len(entryNodes) == 0 {
    70  		func() {
    71  			// Not scheduled yet, spawn the entry workflow node
    72  			spawnedEntryNode, err := it.spawnEntryNode(ctx, workflow)
    73  			if err != nil {
    74  				it.eventRecorder.Event(&workflow, recorder.InvalidEntry{
    75  					EntryTemplate: workflow.Spec.Entry,
    76  				})
    77  				it.logger.Error(err, "failed to spawn new entry node of workflow",
    78  					"workflow", request.NamespacedName,
    79  					"entry", workflow.Spec.Entry)
    80  				// failed to spawn new entry, but will not break the reconcile, continue to sync status
    81  				return
    82  			}
    83  			it.logger.Info(
    84  				"entry node for workflow created",
    85  				"workflow", request.NamespacedName,
    86  				"entry node", fmt.Sprintf("%s/%s", spawnedEntryNode.Namespace, spawnedEntryNode.Name),
    87  			)
    88  			it.eventRecorder.Event(&workflow, recorder.EntryCreated{Entry: spawnedEntryNode.Name})
    89  		}()
    90  	}
    91  
    92  	if len(entryNodes) > 1 {
    93  		var nodeNames []string
    94  		for _, node := range entryNodes {
    95  			nodeNames = append(nodeNames, node.GetName())
    96  		}
    97  		it.logger.Info("there are more than 1 entry nodes of workflow, cleaning up except first one",
    98  			"workflow", request.NamespacedName,
    99  			"entry nodes", nodeNames,
   100  		)
   101  		for _, redundantEntryNode := range entryNodes[1:] {
   102  			redundantEntryNode := redundantEntryNode
   103  			// best effort deletion
   104  			err := it.kubeClient.Delete(ctx, &redundantEntryNode)
   105  			if err != nil {
   106  				it.logger.Error(err,
   107  					"failed to delete redundant entry node",
   108  					"workflow", request.NamespacedName,
   109  					"redundant entry node", fmt.Sprintf("%s/%s", redundantEntryNode.Namespace, redundantEntryNode.Name),
   110  				)
   111  			}
   112  		}
   113  	}
   114  
   115  	// sync the status
   116  	updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
   117  		workflowNeedUpdate := v1alpha1.Workflow{}
   118  		err := it.kubeClient.Get(ctx, request.NamespacedName, &workflowNeedUpdate)
   119  		if err != nil {
   120  			it.logger.Error(err,
   121  				"failed to fetch the latest state of workflow",
   122  				"workflow", request.NamespacedName,
   123  			)
   124  			return err
   125  		}
   126  
   127  		entryNodes, err := it.fetchEntryNode(ctx, workflowNeedUpdate)
   128  		if err != nil {
   129  			it.logger.Error(err,
   130  				"failed to list entry nodes of workflow",
   131  				"workflow", request.NamespacedName,
   132  			)
   133  			return err
   134  		}
   135  
   136  		if len(entryNodes) > 0 {
   137  			if len(entryNodes) > 1 {
   138  				var nodeNames []string
   139  				for _, node := range entryNodes {
   140  					nodeNames = append(nodeNames, node.GetName())
   141  				}
   142  				it.logger.Info("there are more than 1 entry nodes of workflow",
   143  					"workflow", request.NamespacedName,
   144  					"entry nodes", nodeNames,
   145  				)
   146  			}
   147  
   148  			workflowNeedUpdate.Status.EntryNode = pointer.StringPtr(entryNodes[0].Name)
   149  			SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
   150  				Type:   v1alpha1.WorkflowConditionScheduled,
   151  				Status: corev1.ConditionTrue,
   152  				Reason: "",
   153  			})
   154  
   155  			if WorkflowNodeFinished(entryNodes[0].Status) {
   156  				SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
   157  					Type:   v1alpha1.WorkflowConditionAccomplished,
   158  					Status: corev1.ConditionTrue,
   159  					Reason: "",
   160  				})
   161  				if workflowNeedUpdate.Status.EndTime == nil {
   162  					now := metav1.NewTime(time.Now())
   163  					workflowNeedUpdate.Status.EndTime = &now
   164  					it.eventRecorder.Event(&workflow, recorder.WorkflowAccomplished{})
   165  				}
   166  			} else {
   167  				SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
   168  					Type:   v1alpha1.WorkflowConditionAccomplished,
   169  					Status: corev1.ConditionFalse,
   170  					Reason: "",
   171  				})
   172  				workflowNeedUpdate.Status.EndTime = nil
   173  			}
   174  		} else {
   175  			SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
   176  				Type:   v1alpha1.WorkflowConditionScheduled,
   177  				Status: corev1.ConditionFalse,
   178  				Reason: "",
   179  			})
   180  			SetWorkflowCondition(&workflowNeedUpdate.Status, v1alpha1.WorkflowCondition{
   181  				Type:   v1alpha1.WorkflowConditionAccomplished,
   182  				Status: corev1.ConditionFalse,
   183  				Reason: "",
   184  			})
   185  			workflowNeedUpdate.Status.EndTime = nil
   186  		}
   187  
   188  		if workflowNeedUpdate.Status.StartTime == nil {
   189  			tmp := metav1.NewTime(startTime)
   190  			workflowNeedUpdate.Status.StartTime = &tmp
   191  		}
   192  
   193  		err = it.kubeClient.Status().Update(ctx, &workflowNeedUpdate)
   194  		if err != nil {
   195  			it.logger.Error(err, "failed to update workflowNeedUpdate status")
   196  			return err
   197  		}
   198  		return nil
   199  	})
   200  
   201  	return reconcile.Result{}, client.IgnoreNotFound(updateError)
   202  }
   203  
   204  // fetchEntryNode will return the entry workflow node(s) of that workflow, return nil if not exists.
   205  //
   206  // The expected length of result is 1, but due to the reconcile and the inconsistent cache, there might be more than one
   207  // entry nodes created, if should be reported to the upper logic.
   208  func (it *WorkflowEntryReconciler) fetchEntryNode(ctx context.Context, workflow v1alpha1.Workflow) ([]v1alpha1.WorkflowNode, error) {
   209  	entryNodesList := v1alpha1.WorkflowNodeList{}
   210  	controlledByWorkflow, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   211  		MatchLabels: map[string]string{
   212  			v1alpha1.LabelControlledBy: workflow.Name,
   213  		},
   214  	})
   215  	if err != nil {
   216  		it.logger.Error(err, "failed to build label selector with filtering entry workflow node controlled by current workflow",
   217  			"workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name))
   218  		return nil, err
   219  	}
   220  
   221  	err = it.kubeClient.List(ctx, &entryNodesList, &client.ListOptions{
   222  		Namespace:     workflow.Namespace,
   223  		LabelSelector: controlledByWorkflow,
   224  	})
   225  	if err != nil {
   226  		it.logger.Error(err, "failed to list entry workflow node controlled by workflow",
   227  			"workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name))
   228  		return nil, err
   229  	}
   230  
   231  	sortedEntryNodes := SortByCreationTimestamp(entryNodesList.Items)
   232  	sort.Sort(sortedEntryNodes)
   233  
   234  	return sortedEntryNodes, nil
   235  }
   236  
   237  // spawnEntryNode will create **one** entry workflow node for current workflow
   238  func (it *WorkflowEntryReconciler) spawnEntryNode(ctx context.Context, workflow v1alpha1.Workflow) (*v1alpha1.WorkflowNode, error) {
   239  	// This workflow is just created, create entry node
   240  	nodes, err := renderNodesByTemplates(&workflow, nil, workflow.Spec.Entry)
   241  	if err != nil {
   242  		it.logger.Error(err, "failed create entry node", "workflow", workflow.Name, "entry", workflow.Spec.Entry)
   243  		return nil, err
   244  	}
   245  
   246  	if len(nodes) > 1 {
   247  		it.logger.Info("the results of entry nodes are more than 1, will only pick the first one",
   248  			"workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name),
   249  			"nodes", nodes,
   250  		)
   251  	}
   252  
   253  	entryNode := nodes[0]
   254  	err = it.kubeClient.Create(ctx, entryNode)
   255  	if err != nil {
   256  		it.logger.Info("failed to create workflow nodes")
   257  		return nil, err
   258  	}
   259  	it.logger.Info("entry workflow node created",
   260  		"workflow", fmt.Sprintf("%s/%s", workflow.Namespace, workflow.Name),
   261  		"entry node", entryNode.Name,
   262  	)
   263  
   264  	return entryNode, nil
   265  }
   266