...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers/chaos_node_reconciler.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/workflow/controllers

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package controllers
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"sort"
    22  	"time"
    23  
    24  	"github.com/go-logr/logr"
    25  	"github.com/pkg/errors"
    26  	corev1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/client-go/util/retry"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    31  
    32  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    34  )
    35  
    36  type ChaosNodeReconciler struct {
    37  	kubeClient    client.Client
    38  	eventRecorder recorder.ChaosRecorder
    39  	logger        logr.Logger
    40  }
    41  
    42  func NewChaosNodeReconciler(kubeClient client.Client, eventRecorder recorder.ChaosRecorder, logger logr.Logger) *ChaosNodeReconciler {
    43  	return &ChaosNodeReconciler{kubeClient: kubeClient, eventRecorder: eventRecorder, logger: logger}
    44  }
    45  
    46  func (it *ChaosNodeReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
    47  
    48  	startTime := time.Now()
    49  	defer func() {
    50  		it.logger.V(4).Info("Finished syncing for chaos node",
    51  			"node", request.NamespacedName,
    52  			"duration", time.Since(startTime),
    53  		)
    54  	}()
    55  
    56  	node := v1alpha1.WorkflowNode{}
    57  
    58  	err := it.kubeClient.Get(ctx, request.NamespacedName, &node)
    59  	if err != nil {
    60  		return reconcile.Result{}, client.IgnoreNotFound(err)
    61  	}
    62  
    63  	if !v1alpha1.IsChaosTemplateType(node.Spec.Type) {
    64  		return reconcile.Result{}, nil
    65  	}
    66  
    67  	it.logger.V(4).Info("resolve chaos node", "node", request)
    68  
    69  	if node.Spec.Type == v1alpha1.TypeSchedule {
    70  		err := it.syncSchedule(ctx, node)
    71  		if err != nil {
    72  			return reconcile.Result{}, err
    73  		}
    74  	} else {
    75  		err = it.syncChaosResources(ctx, node)
    76  		if err != nil {
    77  			return reconcile.Result{}, err
    78  		}
    79  	}
    80  
    81  	updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
    82  		nodeNeedUpdate := v1alpha1.WorkflowNode{}
    83  		err := it.kubeClient.Get(ctx, request.NamespacedName, &nodeNeedUpdate)
    84  		if err != nil {
    85  			return client.IgnoreNotFound(err)
    86  		}
    87  
    88  		if nodeNeedUpdate.Spec.Type == v1alpha1.TypeSchedule {
    89  			// sync status with schedule
    90  			scheduleList, err := it.fetchChildrenSchedule(ctx, nodeNeedUpdate)
    91  			if err != nil {
    92  				return client.IgnoreNotFound(err)
    93  			}
    94  			if len(scheduleList) > 1 {
    95  				it.logger.Info("the number of schedule custom resource affected by chaos node is more than 1",
    96  					"chaos node", fmt.Sprintf("%s/%s", nodeNeedUpdate.Namespace, nodeNeedUpdate.Name),
    97  					"schedule custom resources", scheduleList,
    98  				)
    99  			}
   100  			if len(scheduleList) > 0 {
   101  				scheduleObject := scheduleList[0]
   102  				group := scheduleObject.GetObjectKind().GroupVersionKind().Group
   103  				chaosRef := corev1.TypedLocalObjectReference{
   104  					APIGroup: &group,
   105  					Kind:     scheduleObject.GetObjectKind().GroupVersionKind().Kind,
   106  					Name:     scheduleObject.GetName(),
   107  				}
   108  				nodeNeedUpdate.Status.ChaosResource = &chaosRef
   109  				SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   110  					Type:   v1alpha1.ConditionChaosInjected,
   111  					Status: corev1.ConditionTrue,
   112  					Reason: v1alpha1.ChaosCRCreated,
   113  				})
   114  			} else {
   115  				nodeNeedUpdate.Status.ChaosResource = nil
   116  				SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   117  					Type:   v1alpha1.ConditionChaosInjected,
   118  					Status: corev1.ConditionFalse,
   119  					Reason: v1alpha1.ChaosCRNotExists,
   120  				})
   121  			}
   122  
   123  			return client.IgnoreNotFound(it.kubeClient.Status().Update(ctx, &nodeNeedUpdate))
   124  		}
   125  
   126  		// sync status with chaos CustomResource
   127  		chaosList, err := it.fetchChildrenChaosCustomResource(ctx, nodeNeedUpdate)
   128  		if err != nil {
   129  			return client.IgnoreNotFound(err)
   130  		}
   131  		if len(chaosList) > 1 {
   132  			it.logger.Info("the number of chaos custom resource affected by chaos node is more than 1",
   133  				"chaos node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   134  				"chaos custom resources", chaosList,
   135  			)
   136  		}
   137  
   138  		if len(chaosList) > 0 {
   139  			chaosObject := chaosList[0]
   140  			group := chaosObject.GetObjectKind().GroupVersionKind().Group
   141  			chaosRef := corev1.TypedLocalObjectReference{
   142  				APIGroup: &group,
   143  				Kind:     chaosObject.GetObjectKind().GroupVersionKind().Kind,
   144  				Name:     chaosObject.GetName(),
   145  			}
   146  			nodeNeedUpdate.Status.ChaosResource = &chaosRef
   147  			SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   148  				Type:   v1alpha1.ConditionChaosInjected,
   149  				Status: corev1.ConditionTrue,
   150  				Reason: v1alpha1.ChaosCRCreated,
   151  			})
   152  		} else {
   153  			nodeNeedUpdate.Status.ChaosResource = nil
   154  			SetCondition(&nodeNeedUpdate.Status, v1alpha1.WorkflowNodeCondition{
   155  				Type:   v1alpha1.ConditionChaosInjected,
   156  				Status: corev1.ConditionFalse,
   157  				Reason: v1alpha1.ChaosCRNotExists,
   158  			})
   159  		}
   160  
   161  		return client.IgnoreNotFound(it.kubeClient.Status().Update(ctx, &nodeNeedUpdate))
   162  	})
   163  
   164  	return reconcile.Result{}, updateError
   165  }
   166  
   167  func (it *ChaosNodeReconciler) syncSchedule(ctx context.Context, node v1alpha1.WorkflowNode) error {
   168  	scheduleList, err := it.fetchChildrenSchedule(ctx, node)
   169  	if err != nil {
   170  		return err
   171  	}
   172  	if WorkflowNodeFinished(node.Status) {
   173  		// make the number of schedule to 0
   174  		for _, item := range scheduleList {
   175  			item := item
   176  			err := it.kubeClient.Delete(ctx, &item)
   177  			if client.IgnoreNotFound(err) != nil {
   178  				it.logger.Error(err, "failed to delete schedule CR for workflow chaos node",
   179  					"namespace", node.Namespace,
   180  					"chaos node", node.Name,
   181  					"schedule CR name", item.GetName(),
   182  				)
   183  				it.eventRecorder.Event(&node, recorder.ChaosCustomResourceDeleteFailed{
   184  					Name: item.GetName(),
   185  					Kind: item.GetObjectKind().GroupVersionKind().Kind,
   186  				})
   187  			} else {
   188  				it.eventRecorder.Event(&node, recorder.ChaosCustomResourceDeleted{
   189  					Name: item.GetName(),
   190  					Kind: item.GetObjectKind().GroupVersionKind().Kind,
   191  				})
   192  			}
   193  		}
   194  		return nil
   195  	}
   196  	if len(scheduleList) == 0 {
   197  		return it.createSchedule(ctx, node)
   198  	} else if len(scheduleList) > 1 {
   199  		// need cleanup
   200  
   201  		var scheduleCrToRemove []string
   202  		for _, item := range scheduleList[1:] {
   203  			scheduleCrToRemove = append(scheduleCrToRemove, item.GetName())
   204  		}
   205  
   206  		it.logger.Info("removing duplicated schedule custom resource",
   207  			"chaos node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   208  			"schedule cr to remove", scheduleCrToRemove,
   209  		)
   210  
   211  		for _, item := range scheduleList[1:] {
   212  			// best efforts deletion
   213  			item := item
   214  			err := it.kubeClient.Delete(ctx, &item)
   215  			if client.IgnoreNotFound(err) != nil {
   216  				it.logger.Error(err, "failed to delete schedule CR for workflow chaos node",
   217  					"namespace", node.Namespace,
   218  					"chaos node", node.Name,
   219  					"schedule CR name", item.GetName(),
   220  				)
   221  			}
   222  		}
   223  	} else {
   224  		it.logger.V(4).Info("do not need spawn or remove schedule CR")
   225  	}
   226  	return nil
   227  
   228  }
   229  
   230  func (it *ChaosNodeReconciler) syncChaosResources(ctx context.Context, node v1alpha1.WorkflowNode) error {
   231  
   232  	chaosList, err := it.fetchChildrenChaosCustomResource(ctx, node)
   233  	if err != nil {
   234  		return err
   235  	}
   236  
   237  	if WorkflowNodeFinished(node.Status) {
   238  		// make the number of chaos resource to 0
   239  		for _, item := range chaosList {
   240  			// best efforts deletion
   241  			item := item
   242  			// TODO: it should not be delete directly with the new implementation of *Chaos controller in branch nirvana
   243  			err := it.kubeClient.Delete(ctx, item)
   244  			if client.IgnoreNotFound(err) != nil {
   245  				it.logger.Error(err, "failed to delete chaos CR for workflow chaos node",
   246  					"namespace", node.Namespace,
   247  					"chaos node", node.Name,
   248  					"chaos CR name", item.GetName(),
   249  				)
   250  				it.eventRecorder.Event(&node, recorder.ChaosCustomResourceDeleteFailed{
   251  					Name: item.GetName(),
   252  					Kind: item.GetObjectKind().GroupVersionKind().Kind,
   253  				})
   254  			} else {
   255  				it.eventRecorder.Event(&node, recorder.ChaosCustomResourceDeleted{
   256  					Name: item.GetName(),
   257  					Kind: item.GetObjectKind().GroupVersionKind().Kind,
   258  				})
   259  			}
   260  		}
   261  		return nil
   262  	}
   263  	// make the number of chaos resource to 1
   264  	if len(chaosList) == 0 {
   265  		return it.createChaos(ctx, node)
   266  	} else if len(chaosList) > 1 {
   267  
   268  		var chaosCrToRemove []string
   269  		for _, item := range chaosList[1:] {
   270  			chaosCrToRemove = append(chaosCrToRemove, item.GetName())
   271  		}
   272  
   273  		it.logger.Info("removing duplicated chaos custom resource",
   274  			"chaos node", fmt.Sprintf("%s/%s", node.Namespace, node.Name),
   275  			"chaos cr to remove", chaosCrToRemove,
   276  		)
   277  
   278  		for _, item := range chaosList[1:] {
   279  			// best efforts deletion
   280  			item := item
   281  			err := it.kubeClient.Delete(ctx, item)
   282  			if client.IgnoreNotFound(err) != nil {
   283  				it.logger.Error(err, "failed to delete chaos CR for workflow chaos node",
   284  					"namespace", node.Namespace,
   285  					"chaos node", node.Name,
   286  					"chaos CR name", item.GetName(),
   287  				)
   288  			}
   289  		}
   290  	} else {
   291  		it.logger.V(4).Info("do not need spawn or remove chaos CR")
   292  	}
   293  
   294  	// TODO: also respawn the chaos resource if Spec changed in workflow
   295  
   296  	return nil
   297  }
   298  
   299  // inject Chaos will create one instance of chaos CR
   300  func (it *ChaosNodeReconciler) createChaos(ctx context.Context, node v1alpha1.WorkflowNode) error {
   301  
   302  	chaosObject, err := node.Spec.EmbedChaos.SpawnNewObject(node.Spec.Type)
   303  	if err != nil {
   304  		return err
   305  	}
   306  
   307  	chaosObject.SetGenerateName(fmt.Sprintf("%s-", node.Name))
   308  	chaosObject.SetNamespace(node.Namespace)
   309  	chaosObject.SetOwnerReferences(append(chaosObject.GetOwnerReferences(), metav1.OwnerReference{
   310  		APIVersion:         node.APIVersion,
   311  		Kind:               node.Kind,
   312  		Name:               node.Name,
   313  		UID:                node.UID,
   314  		Controller:         &isController,
   315  		BlockOwnerDeletion: &blockOwnerDeletion,
   316  	}))
   317  	chaosObject.SetLabels(map[string]string{
   318  		v1alpha1.LabelControlledBy: node.Name,
   319  		v1alpha1.LabelWorkflow:     node.Spec.WorkflowName,
   320  	})
   321  
   322  	err = it.kubeClient.Create(ctx, chaosObject)
   323  	if err != nil {
   324  		it.eventRecorder.Event(&node, recorder.ChaosCustomResourceCreateFailed{})
   325  		it.logger.Error(err, "failed to create chaos")
   326  		return nil
   327  	}
   328  	it.logger.Info("chaos object created", "namespace", chaosObject.GetNamespace(), "name", chaosObject.GetName(), "parent node", node)
   329  	it.eventRecorder.Event(&node, recorder.ChaosCustomResourceCreated{
   330  		Name: chaosObject.GetName(),
   331  		Kind: chaosObject.GetObjectKind().GroupVersionKind().Kind,
   332  	})
   333  	return nil
   334  }
   335  
   336  func (it *ChaosNodeReconciler) fetchChildrenChaosCustomResource(ctx context.Context, node v1alpha1.WorkflowNode) ([]v1alpha1.GenericChaos, error) {
   337  	genericChaosList, err := node.Spec.EmbedChaos.SpawnNewList(node.Spec.Type)
   338  	if err != nil {
   339  		return nil, err
   340  	}
   341  	controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   342  		MatchLabels: map[string]string{
   343  			v1alpha1.LabelControlledBy: node.Name,
   344  		},
   345  	})
   346  	if err != nil {
   347  		it.logger.Error(err, "failed to build label selector with filtering children workflow node controlled by current node",
   348  			"current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   349  		return nil, err
   350  	}
   351  
   352  	err = it.kubeClient.List(ctx, genericChaosList, &client.ListOptions{
   353  		LabelSelector: controlledByThisNode,
   354  	})
   355  	if err != nil {
   356  		return nil, err
   357  	}
   358  
   359  	var sorted SortGenericChaosByCreationTimestamp = genericChaosList.GetItems()
   360  	sort.Sort(sorted)
   361  	return sorted, err
   362  }
   363  
   364  func (it ChaosNodeReconciler) createSchedule(ctx context.Context, node v1alpha1.WorkflowNode) error {
   365  	if node.Spec.Schedule == nil {
   366  		return errors.New("invalid workfow node, the spec of schedule is nil")
   367  	}
   368  	scheduleToCreate := v1alpha1.Schedule{
   369  		TypeMeta: metav1.TypeMeta{},
   370  		ObjectMeta: metav1.ObjectMeta{
   371  			Namespace:    node.Namespace,
   372  			GenerateName: fmt.Sprintf("%s-", node.Name),
   373  			Labels: map[string]string{
   374  				v1alpha1.LabelControlledBy: node.Name,
   375  				v1alpha1.LabelWorkflow:     node.Spec.WorkflowName,
   376  			},
   377  			OwnerReferences: []metav1.OwnerReference{
   378  				{
   379  					APIVersion:         node.APIVersion,
   380  					Kind:               node.Kind,
   381  					Name:               node.Name,
   382  					UID:                node.UID,
   383  					Controller:         &isController,
   384  					BlockOwnerDeletion: &blockOwnerDeletion,
   385  				},
   386  			},
   387  		},
   388  		Spec: *node.Spec.Schedule,
   389  	}
   390  	err := it.kubeClient.Create(ctx, &scheduleToCreate)
   391  	if err != nil {
   392  		it.eventRecorder.Event(&node, recorder.ChaosCustomResourceCreateFailed{})
   393  		it.logger.Error(err, "failed to create schedule CR")
   394  		return nil
   395  	}
   396  	it.logger.Info("schedule CR created", "namespace", scheduleToCreate.GetNamespace(), "name", scheduleToCreate.GetName())
   397  	it.eventRecorder.Event(&node, recorder.ChaosCustomResourceCreated{
   398  		Name: scheduleToCreate.GetName(),
   399  		Kind: scheduleToCreate.GetObjectKind().GroupVersionKind().Kind,
   400  	})
   401  	return nil
   402  
   403  }
   404  
   405  func (it *ChaosNodeReconciler) fetchChildrenSchedule(ctx context.Context, node v1alpha1.WorkflowNode) ([]v1alpha1.Schedule, error) {
   406  	var scheduleList v1alpha1.ScheduleList
   407  	controlledByThisNode, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   408  		MatchLabels: map[string]string{
   409  			v1alpha1.LabelControlledBy: node.Name,
   410  		},
   411  	})
   412  	if err != nil {
   413  		it.logger.Error(err, "failed to build label selector with filtering children workflow node controlled by current node",
   414  			"current node", fmt.Sprintf("%s/%s", node.Namespace, node.Name))
   415  		return nil, err
   416  	}
   417  	err = it.kubeClient.List(ctx, &scheduleList, &client.ListOptions{
   418  		LabelSelector: controlledByThisNode,
   419  	})
   420  	if err != nil {
   421  		return nil, err
   422  	}
   423  	var sorted SortScheduleByCreationTimestamp = scheduleList.Items
   424  	sort.Sort(sorted)
   425  	return sorted, err
   426  }
   427  
   428  type SortGenericChaosByCreationTimestamp []v1alpha1.GenericChaos
   429  
   430  func (it SortGenericChaosByCreationTimestamp) Len() int {
   431  	return len(it)
   432  }
   433  
   434  func (it SortGenericChaosByCreationTimestamp) Less(i, j int) bool {
   435  	return it[j].GetCreationTimestamp().After(it[i].GetCreationTimestamp().Time)
   436  }
   437  
   438  func (it SortGenericChaosByCreationTimestamp) Swap(i, j int) {
   439  	it[i], it[j] = it[j], it[i]
   440  }
   441  
   442  type SortScheduleByCreationTimestamp []v1alpha1.Schedule
   443  
   444  func (it SortScheduleByCreationTimestamp) Len() int {
   445  	return len(it)
   446  }
   447  
   448  func (it SortScheduleByCreationTimestamp) Less(i, j int) bool {
   449  	return it[j].GetCreationTimestamp().After(it[i].GetCreationTimestamp().Time)
   450  }
   451  
   452  func (it SortScheduleByCreationTimestamp) Swap(i, j int) {
   453  	it[i], it[j] = it[j], it[i]
   454  }
   455