...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/common/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/common

     1  // Copyright 2019 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package common
    15  
    16  import (
    17  	"context"
    18  	"reflect"
    19  	"strings"
    20  
    21  	"github.com/go-logr/logr"
    22  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    23  	"k8s.io/client-go/util/retry"
    24  	ctrl "sigs.k8s.io/controller-runtime"
    25  	"sigs.k8s.io/controller-runtime/pkg/client"
    26  
    27  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    28  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    29  	"github.com/chaos-mesh/chaos-mesh/pkg/selector"
    30  )
    31  
    32  type InnerObjectWithCustomStatus interface {
    33  	v1alpha1.InnerObject
    34  
    35  	GetCustomStatus() interface{}
    36  }
    37  
    38  type InnerObjectWithSelector interface {
    39  	v1alpha1.InnerObject
    40  
    41  	GetSelectorSpecs() map[string]interface{}
    42  }
    43  
    44  type ChaosImpl interface {
    45  	Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error)
    46  	Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error)
    47  }
    48  
    49  // Reconciler for common chaos
    50  type Reconciler struct {
    51  	Impl ChaosImpl
    52  
    53  	// Object is used to mark the target type of this Reconciler
    54  	Object InnerObjectWithSelector
    55  
    56  	// Client is used to operate on the Kubernetes cluster
    57  	client.Client
    58  	client.Reader
    59  
    60  	Recorder recorder.ChaosRecorder
    61  
    62  	Selector *selector.Selector
    63  
    64  	Log logr.Logger
    65  }
    66  
    67  type Operation string
    68  
    69  const (
    70  	Apply   Operation = "apply"
    71  	Recover Operation = "recover"
    72  	Nothing Operation = ""
    73  )
    74  
    75  // Reconcile the common chaos
    76  func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    77  	obj := r.Object.DeepCopyObject().(InnerObjectWithSelector)
    78  
    79  	if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
    80  		if apierrors.IsNotFound(err) {
    81  			r.Log.Info("chaos not found")
    82  		} else {
    83  			// TODO: handle this error
    84  			r.Log.Error(err, "unable to get chaos")
    85  		}
    86  		return ctrl.Result{}, nil
    87  	}
    88  
    89  	shouldUpdate := false
    90  
    91  	desiredPhase := obj.GetStatus().Experiment.DesiredPhase
    92  	records := obj.GetStatus().Experiment.Records
    93  	selectors := obj.GetSelectorSpecs()
    94  
    95  	if records == nil {
    96  		for name, sel := range selectors {
    97  			targets, err := r.Selector.Select(context.TODO(), sel)
    98  			if err != nil {
    99  				r.Log.Error(err, "fail to select")
   100  				r.Recorder.Event(obj, recorder.Failed{
   101  					Activity: "select targets",
   102  					Err:      err.Error(),
   103  				})
   104  				return ctrl.Result{}, nil
   105  			}
   106  
   107  			for _, target := range targets {
   108  				records = append(records, &v1alpha1.Record{
   109  					Id:          target.Id(),
   110  					SelectorKey: name,
   111  					Phase:       v1alpha1.NotInjected,
   112  				})
   113  				shouldUpdate = true
   114  			}
   115  		}
   116  		// TODO: dynamic upgrade the records when some of these pods/containers stopped
   117  	}
   118  
   119  	if len(records) == 0 {
   120  		r.Log.Info("no record has been selected")
   121  		r.Recorder.Event(obj, recorder.Failed{
   122  			Activity: "select targets",
   123  			Err:      "no record has been selected",
   124  		})
   125  		return ctrl.Result{}, nil
   126  	}
   127  
   128  	needRetry := false
   129  	for index, record := range records {
   130  		var err error
   131  		r.Log.Info("iterating record", "record", record, "desiredPhase", desiredPhase)
   132  
   133  		// The whole running logic is a cycle:
   134  		// Not Injected -> Not Injected/* -> Injected -> Injected/* -> Not Injected
   135  		// Every steps should follow the cycle. For example, if it's in "Not Injected/*" status, and it wants to recover
   136  		// then it has to apply and then recover, but not recover directly.
   137  
   138  		originalPhase := record.Phase
   139  		operation := Nothing
   140  		if desiredPhase == v1alpha1.RunningPhase && originalPhase != v1alpha1.Injected {
   141  			// The originalPhase has three possible situations: Not Injected, Not Injedcted/* or Injected/*
   142  			// In the first two situations, it should apply, in the last situation, it should recover
   143  
   144  			if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
   145  				operation = Apply
   146  			} else {
   147  				operation = Recover
   148  			}
   149  		}
   150  		if desiredPhase == v1alpha1.StoppedPhase && originalPhase != v1alpha1.NotInjected {
   151  			// The originalPhase has three possible situations: Not Injedcted/*, Injected, or Injected/*
   152  			// In the first one situations, it should apply, in the last two situations, it should recover
   153  
   154  			if strings.HasPrefix(string(originalPhase), string(v1alpha1.NotInjected)) {
   155  				operation = Apply
   156  			} else {
   157  				operation = Recover
   158  			}
   159  		}
   160  
   161  		if operation == Apply {
   162  			r.Log.Info("apply chaos", "id", records[index].Id)
   163  			record.Phase, err = r.Impl.Apply(context.TODO(), index, records, obj)
   164  			if record.Phase != originalPhase {
   165  				shouldUpdate = true
   166  			}
   167  			if err != nil {
   168  				// TODO: add backoff and retry mechanism
   169  				// but the retry shouldn't block other resource process
   170  				r.Log.Error(err, "fail to apply chaos")
   171  				r.Recorder.Event(obj, recorder.Failed{
   172  					Activity: "apply chaos",
   173  					Err:      err.Error(),
   174  				})
   175  				needRetry = true
   176  				continue
   177  			}
   178  
   179  			if record.Phase == v1alpha1.Injected {
   180  				r.Recorder.Event(obj, recorder.Applied{
   181  					Id: records[index].Id,
   182  				})
   183  			}
   184  		} else if operation == Recover {
   185  			r.Log.Info("recover chaos", "id", records[index].Id)
   186  			record.Phase, err = r.Impl.Recover(context.TODO(), index, records, obj)
   187  			if record.Phase != originalPhase {
   188  				shouldUpdate = true
   189  			}
   190  			if err != nil {
   191  				// TODO: add backoff and retry mechanism
   192  				// but the retry shouldn't block other resource process
   193  				r.Log.Error(err, "fail to recover chaos")
   194  				r.Recorder.Event(obj, recorder.Failed{
   195  					Activity: "recover chaos",
   196  					Err:      err.Error(),
   197  				})
   198  				needRetry = true
   199  				continue
   200  			}
   201  
   202  			if record.Phase == v1alpha1.NotInjected {
   203  				r.Recorder.Event(obj, recorder.Recovered{
   204  					Id: records[index].Id,
   205  				})
   206  			}
   207  		}
   208  	}
   209  
   210  	// TODO: auto generate SetCustomStatus rather than reflect
   211  	var customStatus reflect.Value
   212  	if objWithStatus, ok := obj.(InnerObjectWithCustomStatus); ok {
   213  		customStatus = reflect.Indirect(reflect.ValueOf(objWithStatus.GetCustomStatus()))
   214  	}
   215  	if shouldUpdate {
   216  		updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   217  			r.Log.Info("updating records", "records", records)
   218  			obj := r.Object.DeepCopyObject().(InnerObjectWithSelector)
   219  
   220  			if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
   221  				r.Log.Error(err, "unable to get chaos")
   222  				return err
   223  			}
   224  
   225  			obj.GetStatus().Experiment.Records = records
   226  			if objWithStatus, ok := obj.(InnerObjectWithCustomStatus); ok {
   227  				ptrToCustomStatus := objWithStatus.GetCustomStatus()
   228  				// TODO: auto generate SetCustomStatus rather than reflect
   229  				reflect.Indirect(reflect.ValueOf(ptrToCustomStatus)).Set(reflect.Indirect(customStatus))
   230  			}
   231  			return r.Client.Update(context.TODO(), obj)
   232  		})
   233  		if updateError != nil {
   234  			r.Log.Error(updateError, "fail to update")
   235  			r.Recorder.Event(obj, recorder.Failed{
   236  				Activity: "update records",
   237  				Err:      updateError.Error(),
   238  			})
   239  			return ctrl.Result{Requeue: true}, nil
   240  		}
   241  
   242  		r.Recorder.Event(obj, recorder.Updated{
   243  			Field: "records",
   244  		})
   245  	}
   246  	return ctrl.Result{Requeue: needRetry}, nil
   247  }
   248