...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/twophase/state_machine.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/twophase

     1  // Copyright 2020 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package twophase
    15  
    16  import (
    17  	"context"
    18  	"strings"
    19  	"time"
    20  
    21  	"github.com/pkg/errors"
    22  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    23  	"k8s.io/apimachinery/pkg/types"
    24  	"k8s.io/client-go/util/retry"
    25  	ctrl "sigs.k8s.io/controller-runtime"
    26  
    27  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    28  )
    29  
    30  const iterMax = 1e4
    31  
    32  type chaosStateMachine struct {
    33  	Chaos v1alpha1.InnerSchedulerObject
    34  	Req   ctrl.Request
    35  	*Reconciler
    36  }
    37  
    38  func unexpected(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
    39  	currentPhase := m.Chaos.GetStatus().Experiment.Phase
    40  
    41  	return true, errors.Errorf("turn from %s into %s is unexpected", currentPhase, targetPhase)
    42  }
    43  
    44  func noop(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
    45  	updated := false
    46  	currentPhase := m.Chaos.GetStatus().Experiment.Phase
    47  
    48  	if currentPhase != targetPhase {
    49  		m.Chaos.GetStatus().Experiment.Phase = targetPhase
    50  		updated = true
    51  	}
    52  	return updated, nil
    53  }
    54  
    55  func apply(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, startTime time.Time) (bool, error) {
    56  	duration, err := m.Chaos.GetDuration()
    57  	if err != nil {
    58  		m.Log.Error(err, "failed to get chaos duration")
    59  		return false, err
    60  	}
    61  	if duration == nil {
    62  		zero := time.Duration(0)
    63  		duration = &zero
    64  	}
    65  
    66  	currentPhase := m.Chaos.GetStatus().Experiment.Phase
    67  	status := m.Chaos.GetStatus()
    68  
    69  	m.Log.Info("applying", "current phase", currentPhase, "target phase", targetPhase)
    70  	err = m.Apply(ctx, m.Req, m.Chaos)
    71  	if err != nil {
    72  		m.Log.Error(err, "fail to apply")
    73  
    74  		status.Experiment.Phase = v1alpha1.ExperimentPhaseFailed
    75  		status.FailedMessage = err.Error()
    76  
    77  		return true, err
    78  	}
    79  	// reset failed message
    80  	status.FailedMessage = emptyString
    81  	status.Experiment.Phase = targetPhase
    82  
    83  	nextStart, nextRecover, err := m.IterateNextTime(startTime, *duration)
    84  	if err != nil {
    85  		m.Log.Error(err, "failed to get the next start time and recover time")
    86  		return true, err
    87  	}
    88  
    89  	m.Chaos.SetNextStart(*nextStart)
    90  	m.Chaos.SetNextRecover(*nextRecover)
    91  
    92  	status.Experiment.StartTime = &metav1.Time{Time: startTime}
    93  	status.Experiment.EndTime = nil
    94  	status.Experiment.Duration = duration.String()
    95  
    96  	return true, nil
    97  }
    98  
    99  func recover(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
   100  	duration, err := m.Chaos.GetDuration()
   101  	if err != nil {
   102  		m.Log.Error(err, "failed to get chaos duration")
   103  		return false, err
   104  	}
   105  	if duration == nil {
   106  		zero := time.Duration(0)
   107  		duration = &zero
   108  	}
   109  
   110  	currentPhase := m.Chaos.GetStatus().Experiment.Phase
   111  	status := m.Chaos.GetStatus()
   112  
   113  	m.Log.Info("recovering", "current phase", currentPhase, "target phase", targetPhase)
   114  	if err := m.Recover(ctx, m.Req, m.Chaos); err != nil {
   115  		status.FailedMessage = err.Error()
   116  
   117  		m.Log.Error(err, "fail to recover")
   118  		return true, err
   119  	}
   120  
   121  	status.Experiment.Phase = targetPhase
   122  	status.Experiment.EndTime = &metav1.Time{
   123  		Time: now,
   124  	}
   125  
   126  	if status.Experiment.StartTime != nil {
   127  		status.Experiment.Duration = now.Sub(status.Experiment.StartTime.Time).String()
   128  	}
   129  
   130  	// If this recover action is not called by pause action, reset recover time
   131  	if !now.Before(m.Chaos.GetNextRecover()) {
   132  		m.Chaos.SetNextRecover(m.Chaos.GetNextStart().Add(*duration))
   133  	}
   134  
   135  	return true, nil
   136  }
   137  
   138  func resume(ctx context.Context, m *chaosStateMachine, _ v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
   139  	startTime := now
   140  	duration, err := m.Chaos.GetDuration()
   141  	if err != nil {
   142  		m.Log.Error(err, "failed to get chaos duration")
   143  		return false, err
   144  	}
   145  	if duration == nil {
   146  		zero := time.Duration(0)
   147  		duration = &zero
   148  	}
   149  	status := m.Chaos.GetStatus()
   150  
   151  	nextStart := m.Chaos.GetNextStart()
   152  	nextRecover := m.Chaos.GetNextRecover()
   153  	var lastStart time.Time
   154  	if status.Experiment.StartTime == nil {
   155  		// in this condition, the experiment has never executed
   156  		nextStart = now
   157  		lastStart = now
   158  	} else {
   159  		lastStart = status.Experiment.StartTime.Time
   160  	}
   161  
   162  	defer func() {
   163  		m.Chaos.SetNextStart(nextStart)
   164  		m.Chaos.SetNextRecover(nextRecover)
   165  	}()
   166  
   167  	counter := 0
   168  	for {
   169  		if nextRecover.After(now) && nextRecover.Before(nextStart) {
   170  			startTime = lastStart
   171  
   172  			return apply(ctx, m, v1alpha1.ExperimentPhaseRunning, startTime)
   173  		}
   174  
   175  		if nextStart.After(now) {
   176  			return noop(ctx, m, v1alpha1.ExperimentPhaseWaiting, now)
   177  		}
   178  
   179  		lastStart = nextStart
   180  		start, recover, err := m.IterateNextTime(nextStart, *duration)
   181  		if err != nil {
   182  			m.Log.Error(err, "failed to get the next start time and recover time")
   183  
   184  			return false, err
   185  		}
   186  
   187  		nextStart = *start
   188  		nextRecover = *recover
   189  
   190  		counter++
   191  		if counter > iterMax {
   192  			// If counter > iterMax, it means that chaos has been suspended for a long time,
   193  			// then directly restart chaos and set startTime to now.
   194  			startTime = now
   195  			start, recover, err = m.IterateNextTime(startTime, *duration)
   196  			if err != nil {
   197  				m.Log.Error(err, "failed to get the next start time and recover time")
   198  				return false, err
   199  			}
   200  			nextStart = *start
   201  			nextRecover = *recover
   202  
   203  			return apply(ctx, m, v1alpha1.ExperimentPhaseRunning, startTime)
   204  		}
   205  	}
   206  }
   207  
   208  // This method changes the phase of an object and do some side effects
   209  // There are 6 different phases, so there could be 6 * 6 = 36 branches
   210  func (m *chaosStateMachine) run(ctx context.Context, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error) {
   211  	currentPhase := m.Chaos.GetStatus().Experiment.Phase
   212  	m.Log.Info("change phase", "current phase", currentPhase, "target phase", targetPhase)
   213  
   214  	if phaseTransitionMap[currentPhase] == nil {
   215  		err := errors.Errorf("unexpected current phase '%s'", currentPhase)
   216  		return false, err
   217  	}
   218  
   219  	if phaseTransitionMap[currentPhase][targetPhase] == nil {
   220  		err := errors.Errorf("unexpected target phase '%s'", targetPhase)
   221  		return false, err
   222  	}
   223  
   224  	return phaseTransitionMap[currentPhase][targetPhase](ctx, m, targetPhase, now)
   225  }
   226  
   227  func (m *chaosStateMachine) Into(ctx context.Context, targetPhase v1alpha1.ExperimentPhase, now time.Time) error {
   228  	updated, err := m.run(ctx, targetPhase, now)
   229  	if err != nil {
   230  		m.Log.Error(err, "error while executing state machine")
   231  	}
   232  
   233  	if updated {
   234  
   235  		updateError := m.Update(ctx, m.Chaos)
   236  		if updateError == nil {
   237  			return err
   238  		}
   239  
   240  		if !strings.Contains(updateError.Error(), "the object has been modified; please apply your changes to the latest version and try again") {
   241  			return updateError
   242  		}
   243  
   244  		m.Log.Error(updateError, "fail to update, and will retry on conflict")
   245  
   246  		// avoid panic
   247  		if m.Chaos.GetChaos() == nil || m.Chaos.GetStatus() == nil {
   248  			return updateError
   249  		}
   250  
   251  		namespacedName := types.NamespacedName{
   252  			Namespace: m.Chaos.GetChaos().Namespace,
   253  			Name:      m.Chaos.GetChaos().Name,
   254  		}
   255  
   256  		updateError = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   257  			// Fetch the resource
   258  			_chaos := m.Object()
   259  			if err := m.Client.Get(ctx, namespacedName, _chaos); err != nil {
   260  				m.Log.Error(err, "unable to get chaos")
   261  				return err
   262  			}
   263  			chaos := _chaos.(v1alpha1.InnerSchedulerObject)
   264  
   265  			// Make updates to the resource
   266  			status := chaos.GetStatus()
   267  			status.FailedMessage = m.Chaos.GetStatus().FailedMessage
   268  			status.Scheduler = m.Chaos.GetStatus().Scheduler
   269  			status.Experiment = m.Chaos.GetStatus().Experiment
   270  
   271  			// Try to update
   272  			return m.Update(ctx, chaos)
   273  		})
   274  
   275  		if updateError != nil {
   276  			return updateError
   277  		}
   278  	}
   279  
   280  	return err
   281  }
   282  
   283  func (m *chaosStateMachine) IterateNextTime(startTime time.Time, duration time.Duration) (*time.Time, *time.Time, error) {
   284  	scheduler := m.Chaos.GetScheduler()
   285  	if scheduler == nil {
   286  		return nil, nil, errors.Errorf("misdefined scheduler")
   287  	}
   288  	m.Log.Info("iterate nextStart and nextRecover", "startTime", startTime, "duration", duration, "scheduler", scheduler)
   289  	nextStart, err := nextTime(*scheduler, startTime)
   290  
   291  	if err != nil {
   292  		m.Log.Error(err, "failed to get the next start time")
   293  		return nil, nil, err
   294  	}
   295  	nextRecover := startTime.Add(duration)
   296  
   297  	counter := 0
   298  	// if the duration is too long, `nextRecover` could be after `nextStart`
   299  	// we can jump over a start to make sure `nextRecover` is before `nextStart`
   300  	for nextRecover.After(*nextStart) {
   301  		nextStart, err = nextTime(*scheduler, *nextStart)
   302  		if err != nil {
   303  			m.Log.Error(err, "failed to get the next start time")
   304  			return nil, nil, err
   305  		}
   306  
   307  		counter++
   308  		if counter > iterMax {
   309  			err = errors.Errorf("the number of iterations exceeded with nextRecover(%s) nextStart(%s)", nextRecover, nextStart)
   310  			return nil, nil, err
   311  		}
   312  	}
   313  
   314  	return nextStart, &nextRecover, nil
   315  }
   316  
   317  var phaseTransitionMap = map[v1alpha1.ExperimentPhase]map[v1alpha1.ExperimentPhase]func(ctx context.Context, m *chaosStateMachine, targetPhase v1alpha1.ExperimentPhase, now time.Time) (bool, error){
   318  	v1alpha1.ExperimentPhaseUninitialized: {
   319  		v1alpha1.ExperimentPhaseUninitialized: noop,
   320  		v1alpha1.ExperimentPhaseRunning:       apply,
   321  		v1alpha1.ExperimentPhaseWaiting:       noop,
   322  		v1alpha1.ExperimentPhasePaused:        noop,
   323  		v1alpha1.ExperimentPhaseFailed:        unexpected,
   324  		v1alpha1.ExperimentPhaseFinished:      noop,
   325  	},
   326  	v1alpha1.ExperimentPhaseRunning: {
   327  		v1alpha1.ExperimentPhaseUninitialized: unexpected,
   328  		v1alpha1.ExperimentPhaseRunning:       noop,
   329  		v1alpha1.ExperimentPhaseWaiting:       recover,
   330  		v1alpha1.ExperimentPhasePaused:        recover,
   331  		v1alpha1.ExperimentPhaseFailed:        unexpected,
   332  		v1alpha1.ExperimentPhaseFinished:      recover,
   333  	},
   334  	v1alpha1.ExperimentPhaseWaiting: {
   335  		v1alpha1.ExperimentPhaseUninitialized: unexpected,
   336  		v1alpha1.ExperimentPhaseRunning:       apply,
   337  		v1alpha1.ExperimentPhaseWaiting:       noop,
   338  		v1alpha1.ExperimentPhasePaused:        noop,
   339  		v1alpha1.ExperimentPhaseFailed:        unexpected,
   340  		v1alpha1.ExperimentPhaseFinished:      noop,
   341  	},
   342  	v1alpha1.ExperimentPhasePaused: {
   343  		v1alpha1.ExperimentPhaseUninitialized: unexpected,
   344  		v1alpha1.ExperimentPhaseRunning:       resume,
   345  		v1alpha1.ExperimentPhaseWaiting:       resume,
   346  		v1alpha1.ExperimentPhasePaused:        noop,
   347  		v1alpha1.ExperimentPhaseFailed:        unexpected,
   348  		v1alpha1.ExperimentPhaseFinished:      noop,
   349  	},
   350  	v1alpha1.ExperimentPhaseFailed: {
   351  		v1alpha1.ExperimentPhaseUninitialized: unexpected,
   352  		v1alpha1.ExperimentPhaseRunning:       apply,
   353  		v1alpha1.ExperimentPhaseWaiting:       noop,
   354  		v1alpha1.ExperimentPhasePaused:        noop,
   355  		v1alpha1.ExperimentPhaseFailed:        noop,
   356  		v1alpha1.ExperimentPhaseFinished:      recover,
   357  	},
   358  	v1alpha1.ExperimentPhaseFinished: {
   359  		v1alpha1.ExperimentPhaseUninitialized: unexpected,
   360  		v1alpha1.ExperimentPhaseRunning:       unexpected,
   361  		v1alpha1.ExperimentPhaseWaiting:       unexpected,
   362  		v1alpha1.ExperimentPhasePaused:        unexpected,
   363  		v1alpha1.ExperimentPhaseFailed:        unexpected,
   364  		v1alpha1.ExperimentPhaseFinished:      noop,
   365  	},
   366  }
   367