...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/statuscheck/worker.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/statuscheck

     1  // Copyright Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package statuscheck
    17  
    18  import (
    19  	"sync"
    20  	"time"
    21  
    22  	"github.com/go-logr/logr"
    23  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    24  	"k8s.io/apimachinery/pkg/types"
    25  
    26  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    27  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    28  )
    29  
    30  type Executor interface {
    31  	// Do will execute according to the status check configuration,
    32  	// returns:
    33  	// 1. the result status (true for success, false for failure).
    34  	// 2. output of execution.
    35  	// 3. errors if any, it will lead to throw away the result of the execution.
    36  	Do() (bool, string, error)
    37  	// Type provides the type of executor
    38  	Type() string
    39  }
    40  
    41  type worker struct {
    42  	logger        logr.Logger
    43  	eventRecorder recorder.ChaosRecorder
    44  
    45  	// stopCh is a channel for stopping the worker.
    46  	stopCh chan struct{}
    47  	once   sync.Once
    48  
    49  	manager *manager
    50  	// Describes the status check configuration (read-only)
    51  	statusCheck v1alpha1.StatusCheck
    52  	executor    Executor
    53  
    54  	lastResult      bool
    55  	sameResultCount int
    56  }
    57  
    58  func newWorker(logger logr.Logger, eventRecorder recorder.ChaosRecorder,
    59  	manager *manager, statusCheck v1alpha1.StatusCheck, executor Executor) *worker {
    60  	return &worker{
    61  		logger:        logger,
    62  		eventRecorder: eventRecorder,
    63  		manager:       manager,
    64  		statusCheck:   statusCheck,
    65  		executor:      executor,
    66  		stopCh:        make(chan struct{}, 1), // non-blocking
    67  	}
    68  }
    69  
    70  // run periodically execute the status check.
    71  func (w *worker) run() {
    72  	w.logger.V(1).Info("worker start")
    73  	interval := time.Duration(w.statusCheck.Spec.IntervalSeconds) * time.Second
    74  	ticker := time.NewTicker(interval)
    75  	defer func() {
    76  		w.logger.V(1).Info("worker stop")
    77  		ticker.Stop()
    78  		key := types.NamespacedName{Namespace: w.statusCheck.Namespace, Name: w.statusCheck.Name}
    79  		// delete worker from manager cache
    80  		w.manager.workers.delete(key)
    81  	}()
    82  
    83  	for {
    84  		select {
    85  		case <-ticker.C:
    86  			if !w.execute() {
    87  				return
    88  			}
    89  		case <-w.stopCh:
    90  			return
    91  		}
    92  	}
    93  }
    94  
    95  // stop stops the worker, it is safe to call stop multiple times.
    96  func (w *worker) stop() {
    97  	w.once.Do(func() {
    98  		close(w.stopCh)
    99  	})
   100  }
   101  
   102  // execute the status check once and records the result.
   103  // Returns whether the worker should continue.
   104  func (w *worker) execute() bool {
   105  	startTime := time.Now()
   106  	result, output, err := w.executor.Do()
   107  	if err != nil {
   108  		// executor error, throw away the result.
   109  		w.logger.Error(err, "executor internal error")
   110  		return true
   111  	}
   112  
   113  	if w.lastResult == result {
   114  		w.sameResultCount++
   115  	} else {
   116  		w.lastResult = result
   117  		w.sameResultCount = 1
   118  	}
   119  
   120  	key := types.NamespacedName{Namespace: w.statusCheck.Namespace, Name: w.statusCheck.Name}
   121  	if result {
   122  		w.logger.V(1).Info("status check execution succeed", "msg", output)
   123  		w.eventRecorder.Event(&w.statusCheck, recorder.StatusCheckExecutionSucceed{ExecutorType: w.executor.Type()})
   124  		w.manager.results.append(key, v1alpha1.StatusCheckRecord{
   125  			StartTime: &metav1.Time{Time: startTime},
   126  			Outcome:   v1alpha1.StatusCheckOutcomeSuccess,
   127  		})
   128  
   129  		// check if the success threshold is exceeded
   130  		// Notice: the function `setSuccessThresholdExceedCondition` in `controllers/statuscheck/conditions.go`
   131  		// also checks the success threshold, so if you want to modify the logic here, don't forget to modify that
   132  		// function as well.
   133  		if w.statusCheck.Spec.Mode == v1alpha1.StatusCheckSynchronous &&
   134  			w.sameResultCount >= w.statusCheck.Spec.SuccessThreshold {
   135  			w.logger.Info("exceed the success threshold")
   136  			// if status check mode is Synchronous, and it exceeds the SuccessThreshold,
   137  			// then stop the worker
   138  			return false
   139  		}
   140  	} else {
   141  		w.logger.Info("status check execution failed", "msg", output)
   142  		w.eventRecorder.Event(&w.statusCheck, recorder.StatusCheckExecutionFailed{ExecutorType: w.executor.Type(), Msg: output})
   143  		w.manager.results.append(key, v1alpha1.StatusCheckRecord{
   144  			StartTime: &metav1.Time{Time: startTime},
   145  			Outcome:   v1alpha1.StatusCheckOutcomeFailure,
   146  		})
   147  
   148  		// check if the failure threshold is exceeded
   149  		// Notice: the function `setFailureThresholdExceedCondition` in `controllers/statuscheck/conditions.go`
   150  		// also checks the failure threshold, so if you want to modify the logic here, don't forget to modify that
   151  		// function as well.
   152  		if w.sameResultCount >= w.statusCheck.Spec.FailureThreshold {
   153  			w.logger.Info("exceed the failure threshold")
   154  			// if it exceeds the FailureThreshold, stop the worker
   155  			return false
   156  		}
   157  	}
   158  
   159  	return true
   160  }
   161