...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/task_reconciler.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"encoding/json"
    21  	"fmt"
    22  	"time"
    23  
    24  	"github.com/go-logr/logr"
    25  	"github.com/pkg/errors"
    26  	corev1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/types"
    29  	"k8s.io/client-go/rest"
    30  	"k8s.io/client-go/util/retry"
    31  	"sigs.k8s.io/controller-runtime/pkg/client"
    32  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    33  
    34  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    36  	"github.com/chaos-mesh/chaos-mesh/pkg/workflow/task"
    37  	"github.com/chaos-mesh/chaos-mesh/pkg/workflow/task/collector"
    38  )
    39  
    40  type TaskReconciler struct {
    41  	*ChildNodesFetcher
    42  	kubeClient    client.Client
    43  	restConfig    *rest.Config
    44  	eventRecorder recorder.ChaosRecorder
    45  	logger        logr.Logger
    46  }
    47  
    48  func NewTaskReconciler(kubeClient client.Client, restConfig *rest.Config, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *TaskReconciler {
    49  	return &TaskReconciler{
    50  		ChildNodesFetcher: NewChildNodesFetcher(kubeClient, logger),
    51  		kubeClient:        kubeClient,
    52  		restConfig:        restConfig,
    53  		eventRecorder:     eventRecorder,
    54  		logger:            logger,
    55  	}
    56  }
    57  
    58  func (it *TaskReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
    59  
    60  	startTime := time.Now()
    61  	defer func() {
    62  		it.logger.V(4).Info("Finished syncing for task node",
    63  			"node", request.NamespacedName,
    64  			"duration", time.Since(startTime),
    65  		)
    66  	}()
    67  
    68  	node := v1alpha1.WorkflowNode{}
    69  	err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
    70  	if err != nil {
    71  		return reconcile.Result{}, client.IgnoreNotFound(err)
    72  	}
    73  
    74  	// only resolve task nodes
    75  	if node.Spec.Type != v1alpha1.TypeTask {
    76  		return reconcile.Result{}, nil
    77  	}
    78  
    79  	it.logger.V(4).Info("resolve task node", "node", request)
    80  
    81  	pods, err := it.FetchPodControlledByThisWorkflowNode(ctx, node)
    82  	if err != nil {
    83  		return reconcile.Result{}, err
    84  	}
    85  
    86  	if len(pods) == 0 {
    87  		if workflowName, ok := node.Labels[v1alpha1.LabelWorkflow]; ok {
    88  			parentWorkflow := v1alpha1.Workflow{}
    89  			err := it.kubeClient.Get(ctx, types.NamespacedName{
    90  				Namespace: node.Namespace,
    91  				Name:      workflowName,
    92  			}, &parentWorkflow)
    93  			if err != nil {
    94  				return reconcile.Result{}, err
    95  			}
    96  			spawnedPod, err := it.SpawnTaskPod(ctx, &node, &parentWorkflow)
    97  			if err != nil {
    98  				it.logger.Error(err, "failed to spawn pod for Task Node", "node", request)
    99  				it.eventRecorder.Event(&node, recorder.TaskPodSpawnFailed{})
   100  				return reconcile.Result{}, err
   101  			}
   102  			it.eventRecorder.Event(&node, recorder.TaskPodSpawned{PodName: spawnedPod.Name})
   103  		} else {
   104  			return reconcile.Result{}, errors.Errorf("node %s/%s does not contains label %s", node.Namespace, node.Name, v1alpha1.LabelWorkflow)
   105  		}
   106  
   107  	}
   108  
   109  	if len(pods) > 1 {
   110  		var podNames []string
   111  		for _, pod := range pods {
   112  			podNames = append(podNames, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
   113  		}
   114  		it.logger.Info("unexpected more than 1 pod created by task node, it will pick random one",
   115  			"node", request,
   116  			"pods", podNames,
   117  			"picked", fmt.Sprintf("%s/%s", pods[0].Namespace, pods[0].Name),
   118  		)
   119  	}
   120  
   121  	// update the status about conditional tasks
   122  	if len(pods) > 0 && (pods[0].Status.Phase == corev1.PodFailed || pods[0].Status.Phase == corev1.PodSucceeded) {
   123  		evaluated, err := it.conditionalBranchesEvaluated(ctx, node)
   124  		if err != nil {
   125  			return reconcile.Result{}, err
   126  		}
   127  		if !evaluated {
   128  			it.eventRecorder.Event(&node, recorder.TaskPodPodCompleted{PodName: pods[0].Name})
   129  			// task pod is terminated
   130  			updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
   131  				nodeNeedUpdate := v1alpha1.WorkflowNode{}
   132  				err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
   133  				if err != nil {
   134  					return err
   135  				}
   136  
   137  				if nodeNeedUpdate.Status.ConditionalBranchesStatus == nil {
   138  					nodeNeedUpdate.Status.ConditionalBranchesStatus = &v1alpha1.ConditionalBranchesStatus{}
   139  				}
   140  
   141  				// TODO: update related condition
   142  				defaultCollector := collector.DefaultCollector(it.kubeClient, it.restConfig, pods[0].Namespace, pods[0].Name, nodeNeedUpdate.Spec.Task.Container.Name)
   143  				env, err := defaultCollector.CollectContext(ctx)
   144  				if err != nil {
   145  					it.logger.Error(err, "failed to fetch env from task",
   146  						"task", fmt.Sprintf("%s/%s", nodeNeedUpdate.Namespace, nodeNeedUpdate.Name),
   147  					)
   148  					return err
   149  				}
   150  				if env != nil {
   151  					jsonString, err := json.Marshal(env)
   152  					if err != nil {
   153  						it.logger.Error(err, "failed to convert env to json",
   154  							"task", fmt.Sprintf("%s/%s", nodeNeedUpdate.Namespace, nodeNeedUpdate.Name),
   155  							"env", env)
   156  					} else {
   157  						nodeNeedUpdate.Status.ConditionalBranchesStatus.Context = []string{string(jsonString)}
   158  					}
   159  				}
   160  
   161  				evaluator := task.NewEvaluator(it.logger, it.kubeClient)
   162  				evaluateConditionBranches, err := evaluator.EvaluateConditionBranches(nodeNeedUpdate.Spec.ConditionalBranches, env)
   163  				if err != nil {
   164  					it.logger.Error(err, "failed to evaluate expression",
   165  						"task", fmt.Sprintf("%s/%s", nodeNeedUpdate.Namespace, nodeNeedUpdate.Name),
   166  					)
   167  					return err
   168  				}
   169  
   170  				nodeNeedUpdate.Status.ConditionalBranchesStatus.Branches = evaluateConditionBranches
   171  
   172  				var selectedBranches []string
   173  				for _, item := range evaluateConditionBranches {
   174  					if item.EvaluationResult == corev1.ConditionTrue {
   175  						selectedBranches = append(selectedBranches, item.Target)
   176  					}
   177  				}
   178  				it.eventRecorder.Event(&nodeNeedUpdate, recorder.ConditionalBranchesSelected{SelectedBranches: selectedBranches})
   179  
   180  				err = it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
   181  				return err
   182  			})
   183  			if client.IgnoreNotFound(updateError) != nil {
   184  				it.logger.Error(updateError, "failed to update the condition status of task",
   185  					"task", request)
   186  			}
   187  		}
   188  	} else {
   189  		// task pod is still running or not exists
   190  		updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
   191  			nodeNeedUpdate := v1alpha1.WorkflowNode{}
   192  			err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
   193  			if err != nil {
   194  				return err
   195  			}
   196  			// TODO: update related condition
   197  			var branches []v1alpha1.ConditionalBranchStatus
   198  
   199  			if nodeNeedUpdate.Status.ConditionalBranchesStatus == nil {
   200  				nodeNeedUpdate.Status.ConditionalBranchesStatus = &v1alpha1.ConditionalBranchesStatus{}
   201  			}
   202  
   203  			for _, conditionalTask := range nodeNeedUpdate.Spec.ConditionalBranches {
   204  				branch := v1alpha1.ConditionalBranchStatus{
   205  					Target:           conditionalTask.Target,
   206  					EvaluationResult: corev1.ConditionUnknown,
   207  				}
   208  				branches = append(branches, branch)
   209  			}
   210  
   211  			nodeNeedUpdate.Status.ConditionalBranchesStatus.Branches = branches
   212  
   213  			err = it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
   214  			return err
   215  		})
   216  
   217  		if client.IgnoreNotFound(updateError) != nil {
   218  			it.logger.Error(updateError, "k failed to update the condition status of task",
   219  				"task", request)
   220  		}
   221  
   222  	}
   223  
   224  	// update the status about children nodes
   225  	var evaluatedNode v1alpha1.WorkflowNode
   226  
   227  	err = it.kubeClient.Get(ctx, request.NamespacedName, &evaluatedNode)
   228  	if err != nil {
   229  		return reconcile.Result{}, err
   230  	}
   231  	evaluated, err := it.conditionalBranchesEvaluated(ctx, evaluatedNode)
   232  	if err != nil {
   233  		return reconcile.Result{}, err
   234  	}
   235  	if evaluated {
   236  		err = it.syncChildNodes(ctx, evaluatedNode)
   237  		if err != nil {
   238  			return reconcile.Result{}, err
   239  		}
   240  
   241  		// update the status of children workflow nodes
   242  		updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
   243  			nodeNeedUpdate := v1alpha1.WorkflowNode{}
   244  			err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
   245  			if err != nil {
   246  				return err
   247  			}
   248  			var tasks []string
   249  			for _, branch := range evaluatedNode.Status.ConditionalBranchesStatus.Branches {
   250  				if branch.EvaluationResult == corev1.ConditionTrue {
   251  					tasks = append(tasks, branch.Target)
   252  				}
   253  			}
   254  
   255  			activeChildren, finishedChildren, err := it.fetchChildNodes(ctx, nodeNeedUpdate)
   256  			if err != nil {
   257  				return err
   258  			}
   259  
   260  			nodeNeedUpdate.Status.FinishedChildren = nil
   261  			for _, finishedChild := range finishedChildren {
   262  				nodeNeedUpdate.Status.FinishedChildren = append(nodeNeedUpdate.Status.FinishedChildren,
   263  					corev1.LocalObjectReference{
   264  						Name: finishedChild.Name,
   265  					})
   266  			}
   267  
   268  			nodeNeedUpdate.Status.ActiveChildren = nil
   269  			for _, activeChild := range activeChildren {
   270  				nodeNeedUpdate.Status.ActiveChildren = append(nodeNeedUpdate.Status.ActiveChildren,
   271  					corev1.LocalObjectReference{
   272  						Name: activeChild.Name,
   273  					})
   274  			}
   275  
   276  			// TODO: also check the consistent between spec in task and the spec in child node
   277  			evaluated, err := it.conditionalBranchesEvaluated(ctx, nodeNeedUpdate)
   278  			if err != nil {
   279  				return err
   280  			}
   281  			if evaluated && len(finishedChildren) == len(tasks) {
   282  				if !WorkflowNodeFinished(nodeNeedUpdate.Status) {
   283  					it.eventRecorder.Event(&nodeNeedUpdate, recorder.NodeAccomplished{})
   284  				}
   285  				SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   286  					Type:   v1alpha1.ConditionAccomplished,
   287  					Status: corev1.ConditionTrue,
   288  					Reason: "",
   289  				})
   290  			} else {
   291  				SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   292  					Type:   v1alpha1.ConditionAccomplished,
   293  					Status: corev1.ConditionFalse,
   294  					Reason: "",
   295  				})
   296  			}
   297  
   298  			return it.kubeClient.Status().Update(ctx, &nodeNeedUpdate)
   299  		})
   300  
   301  		return reconcile.Result{}, client.IgnoreNotFound(updateError)
   302  	}
   303  
   304  	return reconcile.Result{}, nil
   305  
   306  }
   307  
   308  func (it *TaskReconciler) syncChildNodes(ctx context.Context, evaluatedNode v1alpha1.WorkflowNode) error {
   309  
   310  	var tasks []string
   311  	for _, branch := range evaluatedNode.Status.ConditionalBranchesStatus.Branches {
   312  		if branch.EvaluationResult == corev1.ConditionTrue {
   313  			tasks = append(tasks, branch.Target)
   314  		}
   315  	}
   316  
   317  	if len(tasks) == 0 {
   318  		it.logger.V(4).Info("0 condition of branch in task node is True, Noop",
   319  			"node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name),
   320  		)
   321  		return nil
   322  	}
   323  
   324  	activeChildNodes, finishedChildNodes, err := it.fetchChildNodes(ctx, evaluatedNode)
   325  	if err != nil {
   326  		return err
   327  	}
   328  	existsChildNodes := append(activeChildNodes, finishedChildNodes...)
   329  
   330  	var taskNamesOfNodes []string
   331  	for _, childNode := range existsChildNodes {
   332  		taskNamesOfNodes = append(taskNamesOfNodes, getTaskNameFromGeneratedName(childNode.GetName()))
   333  	}
   334  
   335  	if len(existsChildNodes) > 0 {
   336  		// TODO: check the specific of task and workflow nodes
   337  		// the definition of tasks changed, remove all the existed nodes
   338  		if len(setDifference(taskNamesOfNodes, tasks)) > 0 || len(setDifference(tasks, taskNamesOfNodes)) > 0 {
   339  			// nodesToCleanup is just a vanilla string array
   340  			var nodesToCleanup []string
   341  			for _, item := range existsChildNodes {
   342  				nodesToCleanup = append(nodesToCleanup, item.Name)
   343  			}
   344  			it.eventRecorder.Event(&evaluatedNode, recorder.RerunBySpecChanged{CleanedChildrenNode: nodesToCleanup})
   345  
   346  			for _, childNode := range existsChildNodes {
   347  				// best effort deletion
   348  				err := it.kubeClient.Delete(ctx, &childNode)
   349  				if err != nil {
   350  					it.logger.Error(err, "failed to delete outdated child node",
   351  						"node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name),
   352  						"child node", fmt.Sprintf("%s/%s", childNode.Namespace, childNode.Name),
   353  					)
   354  				}
   355  			}
   356  		} else {
   357  			// exactly same, NOOP
   358  			return nil
   359  		}
   360  	}
   361  
   362  	parentWorkflow := v1alpha1.Workflow{}
   363  	err = it.kubeClient.Get(ctx, types.NamespacedName{
   364  		Namespace: evaluatedNode.Namespace,
   365  		Name:      evaluatedNode.Spec.WorkflowName,
   366  	}, &parentWorkflow)
   367  	if err != nil {
   368  		it.logger.Error(err, "failed to fetch parent workflow",
   369  			"node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name),
   370  			"workflow name", evaluatedNode.Spec.WorkflowName)
   371  		return err
   372  	}
   373  
   374  	childNodes, err := renderNodesByTemplates(&parentWorkflow, &evaluatedNode, tasks...)
   375  	if err != nil {
   376  		it.logger.Error(err, "failed to render children childNodes",
   377  			"node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name))
   378  		return err
   379  	}
   380  
   381  	// TODO: emit event
   382  	var childrenNames []string
   383  	for _, childNode := range childNodes {
   384  		err := it.kubeClient.Create(ctx, childNode)
   385  		if err != nil {
   386  			it.logger.Error(err, "failed to create child node",
   387  				"node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name),
   388  				"child node", childNode)
   389  			return err
   390  		}
   391  		childrenNames = append(childrenNames, childNode.Name)
   392  	}
   393  	it.eventRecorder.Event(&evaluatedNode, recorder.NodesCreated{ChildNodes: childrenNames})
   394  	it.logger.Info("task node spawn new child node",
   395  		"node", fmt.Sprintf("%s/%s", evaluatedNode.Namespace, evaluatedNode.Name),
   396  		"child node", childrenNames)
   397  
   398  	return nil
   399  }
   400  
   401  func (it *TaskReconciler) FetchPodControlledByThisWorkflowNode(ctx context.Context, node v1alpha1.WorkflowNode) ([]corev1.Pod, error) {
   402  	controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   403  		MatchLabels: map[string]string{
   404  			v1alpha1.LabelControlledBy: node.Name,
   405  		},
   406  	})
   407  
   408  	if err != nil {
   409  		it.logger.Error(err, "failed to build label selector with filtering children workflow node controlled by current node",
   410  			"current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   411  		return nil, err
   412  	}
   413  
   414  	var childPods corev1.PodList
   415  
   416  	err = it.kubeClient.List(ctx, &childPods, &client.ListOptions{
   417  		LabelSelector: controlledByThisNode,
   418  	})
   419  	if err != nil {
   420  		return nil, err
   421  	}
   422  	return childPods.Items, nil
   423  }
   424  
   425  func (it *TaskReconciler) SpawnTaskPod(ctx context.Context, node *v1alpha1.WorkflowNode, workflow *v1alpha1.Workflow) (*corev1.Pod, error) {
   426  	if node.Spec.Task == nil {
   427  		return nil, errors.Errorf("node %s/%s does not contains spec of Target", node.Namespace, node.Name)
   428  	}
   429  	podSpec, err := task.SpawnPodForTask(*node.Spec.Task)
   430  	if err != nil {
   431  		return nil, err
   432  	}
   433  	taskPod := corev1.Pod{
   434  		TypeMeta: metav1.TypeMeta{},
   435  		ObjectMeta: metav1.ObjectMeta{
   436  			GenerateName: fmt.Sprintf("%s-", node.Name),
   437  			Namespace:    node.Namespace,
   438  			Labels: map[string]string{
   439  				v1alpha1.LabelControlledBy: node.Name,
   440  				v1alpha1.LabelWorkflow:     workflow.Name,
   441  			},
   442  			OwnerReferences: []metav1.OwnerReference{
   443  				{
   444  					APIVersion:         ApiVersion,
   445  					Kind:               KindWorkflowNode,
   446  					Name:               node.Name,
   447  					UID:                node.UID,
   448  					Controller:         &isController,
   449  					BlockOwnerDeletion: &blockOwnerDeletion,
   450  				},
   451  			},
   452  			Finalizers: []string{metav1.FinalizerDeleteDependents},
   453  		},
   454  		Spec: podSpec,
   455  	}
   456  	err = it.kubeClient.Create(ctx, &taskPod)
   457  	if err != nil {
   458  		return nil, err
   459  	}
   460  	return &taskPod, nil
   461  }
   462  
   463  func (it *TaskReconciler) conditionalBranchesEvaluated(ctx context.Context, node v1alpha1.WorkflowNode) (bool, error) {
   464  	// task pod should be completed, it's phase should be PodSucceeded or PodFailed
   465  	pods, err := it.FetchPodControlledByThisWorkflowNode(ctx, node)
   466  	if err != nil {
   467  		return false, err
   468  	}
   469  	if len(pods) == 0 {
   470  		return false, nil
   471  	}
   472  	if !(pods[0].Status.Phase == corev1.PodFailed || pods[0].Status.Phase == corev1.PodSucceeded) {
   473  		return false, nil
   474  	}
   475  
   476  	// conditional branches status should not be nil
   477  	if node.Status.ConditionalBranchesStatus == nil {
   478  		return false, nil
   479  	}
   480  
   481  	// count of status should be equals to branches
   482  	if len(node.Spec.ConditionalBranches) != len(node.Status.ConditionalBranchesStatus.Branches) {
   483  		return false, nil
   484  	}
   485  
   486  	// each status should be evaluated
   487  	for _, branch := range node.Status.ConditionalBranchesStatus.Branches {
   488  		if branch.EvaluationResult == corev1.ConditionUnknown {
   489  			return false, nil
   490  		}
   491  	}
   492  	return true, nil
   493  }
   494