...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/twophase/state_machine.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/twophase

     1  // Copyright 2020 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package twophase
    15  
    16  import (
    17  	"context"
    18  	"reflect"
    19  	"time"
    20  
    21  	"github.com/pkg/errors"
    22  	k8sError "k8s.io/apimachinery/pkg/api/errors"
    23  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    24  	"k8s.io/apimachinery/pkg/types"
    25  	"k8s.io/client-go/util/retry"
    26  	ctrl "sigs.k8s.io/controller-runtime"
    27  
    28  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    29  )
    30  
    31  const iterMax = 1e4
    32  
    33  type chaosStateMachine struct {
    34  	Chaos v1alpha1.InnerSchedulerObject
    35  	Req   ctrl.Request
    36  	*Reconciler
    37  }
    38  
    39  func unexpected(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
    40  	currentPhase := m.Chaos.GetStatus().Experiment.Phase
    41  
    42  	return true, errors.Errorf("turn from %s into %s is unexpected", currentPhase, targetPhase)
    43  }
    44  
    45  func noop(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
    46  	updated := false
    47  	currentPhase := m.Chaos.GetStatus().Experiment.Phase
    48  
    49  	if currentPhase != targetPhase {
    50  		m.Chaos.GetStatus().Experiment.Phase = targetPhase
    51  		updated = true
    52  	}
    53  	return updated, nil
    54  }
    55  
    56  func apply(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, startTime time.Time) (bool, error) {
    57  	duration, err := m.Chaos.GetDuration()
    58  	if err != nil {
    59  		m.Log.Error(err, "failed to get chaos duration")
    60  		return false, err
    61  	}
    62  	if duration == nil {
    63  		zero := time.Duration(0)
    64  		duration = &zero
    65  	}
    66  
    67  	currentPhase := m.Chaos.GetStatus().Experiment.Phase
    68  	status := m.Chaos.GetStatus()
    69  
    70  	m.Log.Info("applying", "current phase", currentPhase, "target phase", targetPhase)
    71  	err = m.Apply(ctx, m.Req, m.Chaos)
    72  	if err != nil {
    73  		m.Log.Error(err, "fail to apply")
    74  
    75  		status.Experiment.Phase = v1alpha1.ExperimentPhaseFailed
    76  		status.FailedMessage = err.Error()
    77  
    78  		return true, err
    79  	}
    80  	// reset failed message
    81  	status.FailedMessage = emptyString
    82  	status.Experiment.Phase = targetPhase
    83  
    84  	nextStart, nextRecover, err := m.IterateNextTime(startTime, *duration)
    85  	if err != nil {
    86  		m.Log.Error(err, "failed to get the next start time and recover time")
    87  		return true, err
    88  	}
    89  
    90  	m.Chaos.SetNextStart(*nextStart)
    91  	m.Chaos.SetNextRecover(*nextRecover)
    92  
    93  	status.Experiment.StartTime = &metav1.Time{Time: startTime}
    94  	status.Experiment.EndTime = nil
    95  	status.Experiment.Duration = duration.String()
    96  
    97  	return true, nil
    98  }
    99  
   100  func recover(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
   101  	duration, err := m.Chaos.GetDuration()
   102  	if err != nil {
   103  		m.Log.Error(err, "failed to get chaos duration")
   104  		return false, err
   105  	}
   106  	if duration == nil {
   107  		zero := time.Duration(0)
   108  		duration = &zero
   109  	}
   110  
   111  	currentPhase := m.Chaos.GetStatus().Experiment.Phase
   112  	status := m.Chaos.GetStatus()
   113  
   114  	m.Log.Info("recovering", "current phase", currentPhase, "target phase", targetPhase)
   115  	if err := m.Recover(ctx, m.Req, m.Chaos); err != nil {
   116  		status.FailedMessage = err.Error()
   117  
   118  		m.Log.Error(err, "fail to recover")
   119  		return true, err
   120  	}
   121  
   122  	status.Experiment.Phase = targetPhase
   123  	status.Experiment.EndTime = &metav1.Time{
   124  		Time: now,
   125  	}
   126  
   127  	if status.Experiment.StartTime != nil {
   128  		status.Experiment.Duration = now.Sub(status.Experiment.StartTime.Time).String()
   129  	}
   130  
   131  	// If this recover action is not called by pause action, reset recover time
   132  	if !now.Before(m.Chaos.GetNextRecover()) {
   133  		m.Chaos.SetNextRecover(m.Chaos.GetNextStart().Add(*duration))
   134  	}
   135  
   136  	return true, nil
   137  }
   138  
   139  func resume(ctx context.Context, m *chaosStateMachine, _ v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
   140  	startTime := now
   141  	duration, err := m.Chaos.GetDuration()
   142  	if err != nil {
   143  		m.Log.Error(err, "failed to get chaos duration")
   144  		return false, err
   145  	}
   146  	if duration == nil {
   147  		zero := time.Duration(0)
   148  		duration = &zero
   149  	}
   150  	status := m.Chaos.GetStatus()
   151  
   152  	nextStart := m.Chaos.GetNextStart()
   153  	nextRecover := m.Chaos.GetNextRecover()
   154  	var lastStart time.Time
   155  	if status.Experiment.StartTime == nil {
   156  		// in this condition, the experiment has never executed
   157  		nextStart = now
   158  		lastStart = now
   159  	} else {
   160  		lastStart = status.Experiment.StartTime.Time
   161  	}
   162  
   163  	defer func() {
   164  		m.Chaos.SetNextStart(nextStart)
   165  		m.Chaos.SetNextRecover(nextRecover)
   166  	}()
   167  
   168  	counter := 0
   169  	for {
   170  		if nextRecover.After(now) && nextRecover.Before(nextStart) {
   171  			startTime = lastStart
   172  
   173  			return apply(ctx, m, v1alpha1.ExperimentPhaseRunning, startTime)
   174  		}
   175  
   176  		if nextStart.After(now) {
   177  			return noop(ctx, m, v1alpha1.ExperimentPhaseWaiting, now)
   178  		}
   179  
   180  		lastStart = nextStart
   181  		start, recover, err := m.IterateNextTime(nextStart, *duration)
   182  		if err != nil {
   183  			m.Log.Error(err, "failed to get the next start time and recover time")
   184  
   185  			return false, err
   186  		}
   187  
   188  		nextStart = *start
   189  		nextRecover = *recover
   190  
   191  		counter++
   192  		if counter > iterMax {
   193  			// If counter > iterMax, it means that chaos has been suspended for a long time,
   194  			// then directly restart chaos and set startTime to now.
   195  			startTime = now
   196  			start, recover, err = m.IterateNextTime(startTime, *duration)
   197  			if err != nil {
   198  				m.Log.Error(err, "failed to get the next start time and recover time")
   199  				return false, err
   200  			}
   201  			nextStart = *start
   202  			nextRecover = *recover
   203  
   204  			return apply(ctx, m, v1alpha1.ExperimentPhaseRunning, startTime)
   205  		}
   206  	}
   207  }
   208  
   209  // This method changes the phase of an object and do some side effects
   210  // There are 6 different phases, so there could be 6 * 6 = 36 branches
   211  func (m *chaosStateMachine) run(ctx context.Context, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
   212  	currentPhase := m.Chaos.GetStatus().Experiment.Phase
   213  	m.Log.Info("change phase", "current phase", currentPhase, "target phase", targetPhase)
   214  
   215  	if phaseTransitionMap[currentPhase] == nil {
   216  		err := errors.Errorf("unexpected current phase '%s'", currentPhase)
   217  		return false, err
   218  	}
   219  
   220  	if phaseTransitionMap[currentPhase][targetPhase] == nil {
   221  		err := errors.Errorf("unexpected target phase '%s'", targetPhase)
   222  		return false, err
   223  	}
   224  
   225  	return phaseTransitionMap[currentPhase][targetPhase](ctx, m, targetPhase, now)
   226  }
   227  
   228  func (m *chaosStateMachine) Into(ctx context.Context, targetPhase v1alpha1.ExperimentPhase, now time.Time) error {
   229  	originFinalizers := m.Chaos.DeepCopyObject().(v1alpha1.InnerSchedulerObject).GetMeta().GetFinalizers()
   230  	updated, err := m.run(ctx, targetPhase, now)
   231  	if err != nil {
   232  		m.Log.Error(err, "error while executing state machine")
   233  	}
   234  
   235  	if updated {
   236  
   237  		updateError := m.Update(ctx, m.Chaos)
   238  		if updateError == nil {
   239  			return err
   240  		}
   241  
   242  		if !k8sError.IsConflict(updateError) {
   243  			return updateError
   244  		}
   245  
   246  		m.Log.Info("fail to update, and will retry on conflict")
   247  
   248  		// avoid panic
   249  		if m.Chaos.GetChaos() == nil || m.Chaos.GetStatus() == nil {
   250  			return updateError
   251  		}
   252  
   253  		namespacedName := types.NamespacedName{
   254  			Namespace: m.Chaos.GetChaos().Namespace,
   255  			Name:      m.Chaos.GetChaos().Name,
   256  		}
   257  
   258  		updateError = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   259  			// Fetch the resource
   260  			_chaos := m.Object()
   261  			if err := m.Client.Get(ctx, namespacedName, _chaos); err != nil {
   262  				m.Log.Error(err, "unable to get chaos")
   263  				return err
   264  			}
   265  			chaos := _chaos.(v1alpha1.InnerSchedulerObject)
   266  
   267  			// Make updates to the resource
   268  			status := chaos.GetStatus()
   269  			status.FailedMessage = m.Chaos.GetStatus().FailedMessage
   270  			status.Scheduler = m.Chaos.GetStatus().Scheduler
   271  			status.Experiment = m.Chaos.GetStatus().Experiment
   272  
   273  			newFinalizers := m.Chaos.GetMeta().GetFinalizers()
   274  			if !reflect.DeepEqual(originFinalizers, newFinalizers) {
   275  				chaos.GetMeta().SetFinalizers(newFinalizers)
   276  			}
   277  
   278  			// Try to update
   279  			return m.Update(ctx, chaos)
   280  		})
   281  
   282  		if updateError != nil {
   283  			return updateError
   284  		}
   285  	}
   286  
   287  	return err
   288  }
   289  
   290  func (m *chaosStateMachine) IterateNextTime(startTime time.Time, duration time.Duration) (*time.Time, *time.Time, error) {
   291  	scheduler := m.Chaos.GetScheduler()
   292  	if scheduler == nil {
   293  		return nil, nil, errors.Errorf("misdefined scheduler")
   294  	}
   295  	m.Log.Info("iterate nextStart and nextRecover", "startTime", startTime, "duration", duration, "scheduler", scheduler)
   296  	nextStart, err := nextTime(*scheduler, startTime)
   297  
   298  	if err != nil {
   299  		m.Log.Error(err, "failed to get the next start time")
   300  		return nil, nil, err
   301  	}
   302  	nextRecover := startTime.Add(duration)
   303  
   304  	counter := 0
   305  	// if the duration is too long, `nextRecover` could be after `nextStart`
   306  	// we can jump over a start to make sure `nextRecover` is before `nextStart`
   307  	for nextRecover.After(*nextStart) {
   308  		nextStart, err = nextTime(*scheduler, *nextStart)
   309  		if err != nil {
   310  			m.Log.Error(err, "failed to get the next start time")
   311  			return nil, nil, err
   312  		}
   313  
   314  		counter++
   315  		if counter > iterMax {
   316  			err = errors.Errorf("the number of iterations exceeded with nextRecover(%s) nextStart(%s)", nextRecover, nextStart)
   317  			return nil, nil, err
   318  		}
   319  	}
   320  
   321  	return nextStart, &nextRecover, nil
   322  }
   323  
   324  var phaseTransitionMap = map[v1alpha1.ExperimentPhase]map[v1alpha1.ExperimentPhase]func(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error){
   325  	v1alpha1.ExperimentPhaseUninitialized: {
   326  		v1alpha1.ExperimentPhaseUninitialized: noop,
   327  		v1alpha1.ExperimentPhaseRunning:       apply,
   328  		v1alpha1.ExperimentPhaseWaiting:       noop,
   329  		v1alpha1.ExperimentPhasePaused:        noop,
   330  		v1alpha1.ExperimentPhaseFailed:        unexpected,
   331  		v1alpha1.ExperimentPhaseFinished:      noop,
   332  	},
   333  	v1alpha1.ExperimentPhaseRunning: {
   334  		v1alpha1.ExperimentPhaseUninitialized: unexpected,
   335  		v1alpha1.ExperimentPhaseRunning:       noop,
   336  		v1alpha1.ExperimentPhaseWaiting:       recover,
   337  		v1alpha1.ExperimentPhasePaused:        recover,
   338  		v1alpha1.ExperimentPhaseFailed:        unexpected,
   339  		v1alpha1.ExperimentPhaseFinished:      recover,
   340  	},
   341  	v1alpha1.ExperimentPhaseWaiting: {
   342  		v1alpha1.ExperimentPhaseUninitialized: unexpected,
   343  		v1alpha1.ExperimentPhaseRunning:       apply,
   344  		v1alpha1.ExperimentPhaseWaiting:       noop,
   345  		v1alpha1.ExperimentPhasePaused:        noop,
   346  		v1alpha1.ExperimentPhaseFailed:        unexpected,
   347  		v1alpha1.ExperimentPhaseFinished:      noop,
   348  	},
   349  	v1alpha1.ExperimentPhasePaused: {
   350  		v1alpha1.ExperimentPhaseUninitialized: unexpected,
   351  		v1alpha1.ExperimentPhaseRunning:       resume,
   352  		v1alpha1.ExperimentPhaseWaiting:       resume,
   353  		v1alpha1.ExperimentPhasePaused:        noop,
   354  		v1alpha1.ExperimentPhaseFailed:        unexpected,
   355  		v1alpha1.ExperimentPhaseFinished:      noop,
   356  	},
   357  	v1alpha1.ExperimentPhaseFailed: {
   358  		v1alpha1.ExperimentPhaseUninitialized: unexpected,
   359  		v1alpha1.ExperimentPhaseRunning:       apply,
   360  		v1alpha1.ExperimentPhaseWaiting:       noop,
   361  		v1alpha1.ExperimentPhasePaused:        noop,
   362  		v1alpha1.ExperimentPhaseFailed:        noop,
   363  		v1alpha1.ExperimentPhaseFinished:      recover,
   364  	},
   365  	v1alpha1.ExperimentPhaseFinished: {
   366  		v1alpha1.ExperimentPhaseUninitialized: unexpected,
   367  		v1alpha1.ExperimentPhaseRunning:       unexpected,
   368  		v1alpha1.ExperimentPhaseWaiting:       unexpected,
   369  		v1alpha1.ExperimentPhasePaused:        unexpected,
   370  		v1alpha1.ExperimentPhaseFailed:        unexpected,
   371  		v1alpha1.ExperimentPhaseFinished:      noop,
   372  	},
   373  }
   374