...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/common/records/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/common/records

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package records
    17  
    18  import (
    19  	"context"
    20  	"reflect"
    21  	"strings"
    22  
    23  	"github.com/go-logr/logr"
    24  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    25  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    26  	"k8s.io/client-go/util/retry"
    27  	ctrl "sigs.k8s.io/controller-runtime"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    29  
    30  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    34  	"github.com/chaos-mesh/chaos-mesh/pkg/selector"
    35  )
    36  
    37  // Reconciler for chaos records
    38  type Reconciler struct {
    39  	Impl types.ChaosImpl
    40  
    41  	// Object is used to mark the target type of this Reconciler
    42  	Object v1alpha1.InnerObject
    43  
    44  	// Client is used to operate on the Kubernetes cluster
    45  	client.Client
    46  	client.Reader
    47  
    48  	Recorder recorder.ChaosRecorder
    49  
    50  	Selector *selector.Selector
    51  
    52  	Log logr.Logger
    53  }
    54  
    55  type Operation string
    56  
    57  const (
    58  	Apply   Operation = "apply"
    59  	Recover Operation = "recover"
    60  	Nothing Operation = ""
    61  )
    62  
    63  // Reconcile the chaos records
    64  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    65  	obj := r.Object.DeepCopyObject().(v1alpha1.InnerObjectWithSelector)
    66  	if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
    67  		if apierrors.IsNotFound(err) {
    68  			r.Log.Info("chaos not found")
    69  		} else {
    70  			// TODO: handle this error
    71  			r.Log.Error(err, "unable to get chaos")
    72  		}
    73  		return ctrl.Result{}, nil
    74  	}
    75  
    76  	shouldUpdate := false
    77  
    78  	desiredPhase := obj.GetStatus().Experiment.DesiredPhase
    79  	records := obj.GetStatus().Experiment.Records
    80  	selectors := obj.GetSelectorSpecs()
    81  
    82  	logger := r.Log.WithValues("name", obj.GetName(), "namespace", obj.GetNamespace(), "kind", obj.GetObjectKind().GroupVersionKind().Kind)
    83  
    84  	if records == nil {
    85  		for name, sel := range selectors {
    86  			targets, err := r.Selector.Select(context.TODO(), sel)
    87  			if err != nil {
    88  				logger.Error(err, "fail to select")
    89  				r.Recorder.Event(obj, recorder.Failed{
    90  					Activity: "select targets",
    91  					Err:      err.Error(),
    92  				})
    93  				return ctrl.Result{}, nil
    94  			}
    95  
    96  			if len(targets) == 0 {
    97  				logger.Info("no target has been selected")
    98  				r.Recorder.Event(obj, recorder.Failed{
    99  					Activity: "select targets",
   100  					Err:      "no target has been selected",
   101  				})
   102  				return ctrl.Result{}, nil
   103  			}
   104  
   105  			for _, target := range targets {
   106  				records = append(records, &v1alpha1.Record{
   107  					Id:          target.Id(),
   108  					SelectorKey: name,
   109  					Phase:       v1alpha1.NotInjected,
   110  				})
   111  				shouldUpdate = true
   112  			}
   113  		}
   114  		// TODO: dynamic upgrade the records when some of these pods/containers stopped
   115  	}
   116  
   117  	needRetry := false
   118  	for index, record := range records {
   119  		var err error
   120  		idLogger := logger.WithValues("id", records[index].Id)
   121  		idLogger.Info("iterating record", "record", record, "desiredPhase", desiredPhase)
   122  
   123  		// The whole running logic is a cycle:
   124  		// Not Injected -> Not Injected/* -> Injected -> Injected/* -> Not Injected
   125  		// Every step should follow the cycle. For example, if it's in "Not Injected/*" status, and it wants to recover
   126  		// then it has to apply and then recover, but not recover directly.
   127  
   128  		originalPhase := record.Phase
   129  		operation := Nothing
   130  		if desiredPhase == v1alpha1.RunningPhase && originalPhase != v1alpha1.Injected {
   131  			// The originalPhase has three possible situations: Not Injected, Not Injected/* or Injected/*
   132  			// In the first two situations, it should apply, in the last situation, it should recover
   133  
   134  			if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
   135  				operation = Apply
   136  			} else {
   137  				operation = Recover
   138  			}
   139  		}
   140  		if desiredPhase == v1alpha1.StoppedPhase && originalPhase != v1alpha1.NotInjected {
   141  			// The originalPhase has three possible situations: Not Injected/*, Injected, or Injected/*
   142  			// In the first one situation, it should apply, in the last two situations, it should recover
   143  
   144  			if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
   145  				operation = Apply
   146  			} else {
   147  				operation = Recover
   148  			}
   149  		}
   150  
   151  		if operation == Apply {
   152  			idLogger.Info("apply chaos")
   153  			record.Phase, err = r.Impl.Apply(context.TODO(), index, records, obj)
   154  			if record.Phase != originalPhase {
   155  				shouldUpdate = true
   156  			}
   157  			if err != nil {
   158  				// TODO: add backoff and retry mechanism
   159  				// but the retry shouldn't block other resource process
   160  				idLogger.Error(err, "fail to apply chaos")
   161  				applyFailedEvent := newRecordEvent(v1alpha1.TypeFailed, v1alpha1.Apply, err.Error())
   162  				if len(records[index].Events) >= config.ControllerCfg.MaxEvents {
   163  					records[index].Events = records[index].Events[1:]
   164  				}
   165  				records[index].Events = append(records[index].Events, *applyFailedEvent)
   166  				r.Recorder.Event(obj, recorder.Failed{
   167  					Activity: "apply chaos",
   168  					Err:      err.Error(),
   169  				})
   170  				needRetry = true
   171  				// if the impl.Apply() failed, we need to update the status to update the records[index].Events
   172  				shouldUpdate = true
   173  				continue
   174  			}
   175  
   176  			if record.Phase == v1alpha1.Injected {
   177  				records[index].InjectedCount++
   178  				applySucceedEvent := newRecordEvent(v1alpha1.TypeSucceeded, v1alpha1.Apply, "")
   179  				if len(records[index].Events) >= config.ControllerCfg.MaxEvents {
   180  					records[index].Events = records[index].Events[1:]
   181  				}
   182  				records[index].Events = append(records[index].Events, *applySucceedEvent)
   183  				r.Recorder.Event(obj, recorder.Applied{
   184  					Id: records[index].Id,
   185  				})
   186  			}
   187  		} else if operation == Recover {
   188  			idLogger.Info("recover chaos")
   189  			record.Phase, err = r.Impl.Recover(context.TODO(), index, records, obj)
   190  			if record.Phase != originalPhase {
   191  				shouldUpdate = true
   192  			}
   193  			if err != nil {
   194  				// TODO: add backoff and retry mechanism
   195  				// but the retry shouldn't block other resource process
   196  				idLogger.Error(err, "fail to recover chaos")
   197  				recoverFailedEvent := newRecordEvent(v1alpha1.TypeFailed, v1alpha1.Recover, err.Error())
   198  				if len(records[index].Events) >= config.ControllerCfg.MaxEvents {
   199  					records[index].Events = records[index].Events[1:]
   200  				}
   201  				records[index].Events = append(records[index].Events, *recoverFailedEvent)
   202  				r.Recorder.Event(obj, recorder.Failed{
   203  					Activity: "recover chaos",
   204  					Err:      err.Error(),
   205  				})
   206  				needRetry = true
   207  				// if the impl.Recover() failed, we need to update the status to update the records[index].Events
   208  				shouldUpdate = true
   209  				continue
   210  			}
   211  
   212  			if record.Phase == v1alpha1.NotInjected {
   213  				records[index].RecoveredCount++
   214  				recoverSucceedEvent := newRecordEvent(v1alpha1.TypeSucceeded, v1alpha1.Recover, "")
   215  				if len(records[index].Events) >= config.ControllerCfg.MaxEvents {
   216  					records[index].Events = records[index].Events[1:]
   217  				}
   218  				records[index].Events = append(records[index].Events, *recoverSucceedEvent)
   219  				r.Recorder.Event(obj, recorder.Recovered{
   220  					Id: records[index].Id,
   221  				})
   222  			}
   223  		}
   224  	}
   225  
   226  	// TODO: auto generate SetCustomStatus rather than reflect
   227  	var customStatus reflect.Value
   228  	if objWithStatus, ok := obj.(v1alpha1.InnerObjectWithCustomStatus); ok {
   229  		customStatus = reflect.Indirect(reflect.ValueOf(objWithStatus.GetCustomStatus()))
   230  	}
   231  	if shouldUpdate {
   232  		updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   233  			logger.Info("updating records", "records", records)
   234  			obj := r.Object.DeepCopyObject().(v1alpha1.InnerObjectWithSelector)
   235  
   236  			if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
   237  				logger.Error(err, "unable to get chaos")
   238  				return err
   239  			}
   240  
   241  			obj.GetStatus().Experiment.Records = records
   242  			if objWithStatus, ok := obj.(v1alpha1.InnerObjectWithCustomStatus); ok {
   243  				ptrToCustomStatus := objWithStatus.GetCustomStatus()
   244  				// TODO: auto generate SetCustomStatus rather than reflect
   245  				reflect.Indirect(reflect.ValueOf(ptrToCustomStatus)).Set(reflect.Indirect(customStatus))
   246  			}
   247  			return r.Client.Update(context.TODO(), obj)
   248  		})
   249  		if updateError != nil {
   250  			logger.Error(updateError, "fail to update")
   251  			r.Recorder.Event(obj, recorder.Failed{
   252  				Activity: "update records",
   253  				Err:      updateError.Error(),
   254  			})
   255  			return ctrl.Result{Requeue: true}, nil
   256  		}
   257  
   258  		r.Recorder.Event(obj, recorder.Updated{
   259  			Field: "records",
   260  		})
   261  	}
   262  	return ctrl.Result{Requeue: needRetry}, nil
   263  }
   264  
   265  func newRecordEvent(eventType v1alpha1.RecordEventType, eventStage v1alpha1.RecordEventOperation, msg string) *v1alpha1.RecordEvent {
   266  	return v1alpha1.NewRecordEvent(eventType, eventStage, msg, metav1.Now())
   267  }
   268