...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/desiredphase/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/desiredphase

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package desiredphase
    15  
    16  import (
    17  	"context"
    18  	"time"
    19  
    20  	"github.com/go-logr/logr"
    21  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    22  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    23  	"k8s.io/client-go/util/retry"
    24  	ctrl "sigs.k8s.io/controller-runtime"
    25  	"sigs.k8s.io/controller-runtime/pkg/client"
    26  
    27  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    28  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    29  )
    30  
    31  // Reconciler for common chaos
    32  type Reconciler struct {
    33  	// Object is used to mark the target type of this Reconciler
    34  	Object v1alpha1.InnerObject
    35  
    36  	// Client is used to operate on the Kubernetes cluster
    37  	client.Client
    38  
    39  	Recorder recorder.ChaosRecorder
    40  	Log      logr.Logger
    41  }
    42  
    43  // Reconcile the common chaos
    44  func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    45  	obj := r.Object.DeepCopyObject().(v1alpha1.InnerObject)
    46  
    47  	if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
    48  		if apierrors.IsNotFound(err) {
    49  			r.Log.Info("chaos not found")
    50  		} else {
    51  			// TODO: handle this error
    52  			r.Log.Error(err, "unable to get chaos")
    53  		}
    54  		return ctrl.Result{}, nil
    55  	}
    56  
    57  	ctx := &reconcileContext{
    58  		obj:          obj,
    59  		Reconciler:   r,
    60  		shouldUpdate: false,
    61  	}
    62  	return ctx.Reconcile(req)
    63  }
    64  
    65  type reconcileContext struct {
    66  	obj v1alpha1.InnerObject
    67  
    68  	*Reconciler
    69  	shouldUpdate bool
    70  	requeueAfter time.Duration
    71  }
    72  
    73  func (ctx *reconcileContext) GetCreationTimestamp() metav1.Time {
    74  	return ctx.obj.GetObjectMeta().CreationTimestamp
    75  }
    76  
    77  func (ctx *reconcileContext) CalcDesiredPhase() (v1alpha1.DesiredPhase, []recorder.ChaosEvent) {
    78  	events := []recorder.ChaosEvent{}
    79  
    80  	// Consider the finalizers
    81  	if ctx.obj.IsDeleted() {
    82  		if ctx.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.StoppedPhase {
    83  			events = append(events, recorder.Deleted{})
    84  		}
    85  		return v1alpha1.StoppedPhase, events
    86  	}
    87  
    88  	if ctx.obj.IsOneShot() {
    89  		// An oneshot chaos should always be in running phase, so that it cannot
    90  		// be applied multiple times or cause other bugs :(
    91  		return v1alpha1.RunningPhase, events
    92  	}
    93  
    94  	// Consider the duration
    95  	now := time.Now()
    96  
    97  	durationExceeded, untilStop, err := ctx.obj.DurationExceeded(now)
    98  	if err != nil {
    99  		ctx.Log.Error(err, "failed to parse duration")
   100  	}
   101  	if durationExceeded {
   102  		if ctx.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.StoppedPhase {
   103  			events = append(events, recorder.TimeUp{})
   104  		}
   105  		return v1alpha1.StoppedPhase, events
   106  	}
   107  
   108  	ctx.requeueAfter = untilStop
   109  
   110  	// Then decide the pause logic
   111  	if ctx.obj.IsPaused() {
   112  		if ctx.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.StoppedPhase {
   113  			events = append(events, recorder.Paused{})
   114  		}
   115  		return v1alpha1.StoppedPhase, events
   116  	}
   117  
   118  	if ctx.obj.GetStatus().Experiment.DesiredPhase != v1alpha1.RunningPhase {
   119  		events = append(events, recorder.Started{})
   120  	}
   121  	return v1alpha1.RunningPhase, events
   122  }
   123  
   124  func (ctx *reconcileContext) Reconcile(req ctrl.Request) (ctrl.Result, error) {
   125  	desiredPhase, events := ctx.CalcDesiredPhase()
   126  
   127  	ctx.Log.Info("modify desiredPhase", "desiredPhase", desiredPhase)
   128  	if ctx.obj.GetStatus().Experiment.DesiredPhase != desiredPhase {
   129  		for _, ev := range events {
   130  			ctx.Recorder.Event(ctx.obj, ev)
   131  		}
   132  
   133  		updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   134  			obj := ctx.Object.DeepCopyObject().(v1alpha1.InnerObject)
   135  
   136  			if err := ctx.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
   137  				ctx.Log.Error(err, "unable to get chaos")
   138  				return err
   139  			}
   140  
   141  			if obj.GetStatus().Experiment.DesiredPhase != desiredPhase {
   142  				obj.GetStatus().Experiment.DesiredPhase = desiredPhase
   143  				ctx.Log.Info("update object", "namespace", obj.GetObjectMeta().GetNamespace(), "name", obj.GetObjectMeta().GetName())
   144  				return ctx.Client.Update(context.TODO(), obj)
   145  			}
   146  
   147  			return nil
   148  		})
   149  		if updateError != nil {
   150  			ctx.Log.Error(updateError, "fail to update")
   151  			ctx.Recorder.Event(ctx.obj, recorder.Failed{
   152  				Activity: "update desiredphase",
   153  				Err:      updateError.Error(),
   154  			})
   155  			return ctrl.Result{}, nil
   156  		}
   157  
   158  		ctx.Recorder.Event(ctx.obj, recorder.Updated{
   159  			Field: "desiredPhase",
   160  		})
   161  	}
   162  	return ctrl.Result{RequeueAfter: ctx.requeueAfter}, nil
   163  }
   164