...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/common/records/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/common/records

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package records
    17  
    18  import (
    19  	"context"
    20  	"reflect"
    21  	"strings"
    22  
    23  	"github.com/go-logr/logr"
    24  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    25  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    26  	"k8s.io/client-go/util/retry"
    27  	ctrl "sigs.k8s.io/controller-runtime"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    29  
    30  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    33  	"github.com/chaos-mesh/chaos-mesh/pkg/selector"
    34  )
    35  
    36  // Reconciler for chaos records
    37  type Reconciler struct {
    38  	Impl types.ChaosImpl
    39  
    40  	// Object is used to mark the target type of this Reconciler
    41  	Object v1alpha1.InnerObject
    42  
    43  	// Client is used to operate on the Kubernetes cluster
    44  	client.Client
    45  	client.Reader
    46  
    47  	Recorder recorder.ChaosRecorder
    48  
    49  	Selector *selector.Selector
    50  
    51  	Log logr.Logger
    52  }
    53  
    54  type Operation string
    55  
    56  const (
    57  	Apply   Operation = "apply"
    58  	Recover Operation = "recover"
    59  	Nothing Operation = ""
    60  )
    61  
    62  // Reconcile the chaos records
    63  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    64  	obj := r.Object.DeepCopyObject().(v1alpha1.InnerObjectWithSelector)
    65  	if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
    66  		if apierrors.IsNotFound(err) {
    67  			r.Log.Info("chaos not found")
    68  		} else {
    69  			// TODO: handle this error
    70  			r.Log.Error(err, "unable to get chaos")
    71  		}
    72  		return ctrl.Result{}, nil
    73  	}
    74  
    75  	shouldUpdate := false
    76  
    77  	desiredPhase := obj.GetStatus().Experiment.DesiredPhase
    78  	records := obj.GetStatus().Experiment.Records
    79  	selectors := obj.GetSelectorSpecs()
    80  
    81  	logger := r.Log.WithValues("name", obj.GetName(), "namespace", obj.GetNamespace(), "kind", obj.GetObjectKind().GroupVersionKind().Kind)
    82  
    83  	if records == nil {
    84  		for name, sel := range selectors {
    85  			targets, err := r.Selector.Select(context.TODO(), sel)
    86  			if err != nil {
    87  				logger.Error(err, "fail to select")
    88  				r.Recorder.Event(obj, recorder.Failed{
    89  					Activity: "select targets",
    90  					Err:      err.Error(),
    91  				})
    92  				return ctrl.Result{}, nil
    93  			}
    94  
    95  			if len(targets) == 0 {
    96  				logger.Info("no target has been selected")
    97  				r.Recorder.Event(obj, recorder.Failed{
    98  					Activity: "select targets",
    99  					Err:      "no target has been selected",
   100  				})
   101  				return ctrl.Result{}, nil
   102  			}
   103  
   104  			for _, target := range targets {
   105  				records = append(records, &v1alpha1.Record{
   106  					Id:          target.Id(),
   107  					SelectorKey: name,
   108  					Phase:       v1alpha1.NotInjected,
   109  				})
   110  				shouldUpdate = true
   111  			}
   112  		}
   113  		// TODO: dynamic upgrade the records when some of these pods/containers stopped
   114  	}
   115  
   116  	needRetry := false
   117  	for index, record := range records {
   118  		var err error
   119  		idLogger := logger.WithValues("id", records[index].Id)
   120  		idLogger.Info("iterating record", "record", record, "desiredPhase", desiredPhase)
   121  
   122  		// The whole running logic is a cycle:
   123  		// Not Injected -> Not Injected/* -> Injected -> Injected/* -> Not Injected
   124  		// Every step should follow the cycle. For example, if it's in "Not Injected/*" status, and it wants to recover
   125  		// then it has to apply and then recover, but not recover directly.
   126  
   127  		originalPhase := record.Phase
   128  		operation := Nothing
   129  		if desiredPhase == v1alpha1.RunningPhase && originalPhase != v1alpha1.Injected {
   130  			// The originalPhase has three possible situations: Not Injected, Not Injected/* or Injected/*
   131  			// In the first two situations, it should apply, in the last situation, it should recover
   132  
   133  			if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
   134  				operation = Apply
   135  			} else {
   136  				operation = Recover
   137  			}
   138  		}
   139  		if desiredPhase == v1alpha1.StoppedPhase && originalPhase != v1alpha1.NotInjected {
   140  			// The originalPhase has three possible situations: Not Injected/*, Injected, or Injected/*
   141  			// In the first one situation, it should apply, in the last two situations, it should recover
   142  
   143  			if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
   144  				operation = Apply
   145  			} else {
   146  				operation = Recover
   147  			}
   148  		}
   149  
   150  		if operation == Apply {
   151  			idLogger.Info("apply chaos")
   152  			record.Phase, err = r.Impl.Apply(context.TODO(), index, records, obj)
   153  			if record.Phase != originalPhase {
   154  				shouldUpdate = true
   155  			}
   156  			if err != nil {
   157  				// TODO: add backoff and retry mechanism
   158  				// but the retry shouldn't block other resource process
   159  				idLogger.Error(err, "fail to apply chaos")
   160  				applyFailedEvent := newRecordEvent(v1alpha1.TypeFailed, v1alpha1.Apply, err.Error())
   161  				records[index].Events = append(records[index].Events, *applyFailedEvent)
   162  				r.Recorder.Event(obj, recorder.Failed{
   163  					Activity: "apply chaos",
   164  					Err:      err.Error(),
   165  				})
   166  				needRetry = true
   167  				// if the impl.Apply() failed, we need to update the status to update the records[index].Events
   168  				shouldUpdate = true
   169  				continue
   170  			}
   171  
   172  			if record.Phase == v1alpha1.Injected {
   173  				records[index].InjectedCount++
   174  				applySucceedEvent := newRecordEvent(v1alpha1.TypeSucceeded, v1alpha1.Apply, "")
   175  				records[index].Events = append(records[index].Events, *applySucceedEvent)
   176  				r.Recorder.Event(obj, recorder.Applied{
   177  					Id: records[index].Id,
   178  				})
   179  			}
   180  		} else if operation == Recover {
   181  			idLogger.Info("recover chaos")
   182  			record.Phase, err = r.Impl.Recover(context.TODO(), index, records, obj)
   183  			if record.Phase != originalPhase {
   184  				shouldUpdate = true
   185  			}
   186  			if err != nil {
   187  				// TODO: add backoff and retry mechanism
   188  				// but the retry shouldn't block other resource process
   189  				idLogger.Error(err, "fail to recover chaos")
   190  				recoverFailedEvent := newRecordEvent(v1alpha1.TypeFailed, v1alpha1.Recover, err.Error())
   191  				records[index].Events = append(records[index].Events, *recoverFailedEvent)
   192  				r.Recorder.Event(obj, recorder.Failed{
   193  					Activity: "recover chaos",
   194  					Err:      err.Error(),
   195  				})
   196  				needRetry = true
   197  				// if the impl.Recover() failed, we need to update the status to update the records[index].Events
   198  				shouldUpdate = true
   199  				continue
   200  			}
   201  
   202  			if record.Phase == v1alpha1.NotInjected {
   203  				records[index].RecoveredCount++
   204  				recoverSucceedEvent := newRecordEvent(v1alpha1.TypeSucceeded, v1alpha1.Recover, "")
   205  				records[index].Events = append(records[index].Events, *recoverSucceedEvent)
   206  				r.Recorder.Event(obj, recorder.Recovered{
   207  					Id: records[index].Id,
   208  				})
   209  			}
   210  		}
   211  	}
   212  
   213  	// TODO: auto generate SetCustomStatus rather than reflect
   214  	var customStatus reflect.Value
   215  	if objWithStatus, ok := obj.(v1alpha1.InnerObjectWithCustomStatus); ok {
   216  		customStatus = reflect.Indirect(reflect.ValueOf(objWithStatus.GetCustomStatus()))
   217  	}
   218  	if shouldUpdate {
   219  		updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   220  			logger.Info("updating records", "records", records)
   221  			obj := r.Object.DeepCopyObject().(v1alpha1.InnerObjectWithSelector)
   222  
   223  			if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
   224  				logger.Error(err, "unable to get chaos")
   225  				return err
   226  			}
   227  
   228  			obj.GetStatus().Experiment.Records = records
   229  			if objWithStatus, ok := obj.(v1alpha1.InnerObjectWithCustomStatus); ok {
   230  				ptrToCustomStatus := objWithStatus.GetCustomStatus()
   231  				// TODO: auto generate SetCustomStatus rather than reflect
   232  				reflect.Indirect(reflect.ValueOf(ptrToCustomStatus)).Set(reflect.Indirect(customStatus))
   233  			}
   234  			return r.Client.Update(context.TODO(), obj)
   235  		})
   236  		if updateError != nil {
   237  			logger.Error(updateError, "fail to update")
   238  			r.Recorder.Event(obj, recorder.Failed{
   239  				Activity: "update records",
   240  				Err:      updateError.Error(),
   241  			})
   242  			return ctrl.Result{Requeue: true}, nil
   243  		}
   244  
   245  		r.Recorder.Event(obj, recorder.Updated{
   246  			Field: "records",
   247  		})
   248  	}
   249  	return ctrl.Result{Requeue: needRetry}, nil
   250  }
   251  
   252  func newRecordEvent(eventType v1alpha1.RecordEventType, eventStage v1alpha1.RecordEventOperation, msg string) *v1alpha1.RecordEvent {
   253  	return v1alpha1.NewRecordEvent(eventType, eventStage, msg, metav1.Now())
   254  }
   255