
Source file src/github.com/chaos-mesh/chaos-mesh/controllers/statuscheck/manager.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/statuscheck

     1  // Copyright Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    16  package statuscheck
    18  import (
    19  	"sync"
    21  	"github.com/go-logr/logr"
    22  	"github.com/pkg/errors"
    23  	"k8s.io/apimachinery/pkg/types"
    25  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    26  	"github.com/chaos-mesh/chaos-mesh/controllers/statuscheck/http"
    27  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    28  )
    30  type Manager interface {
    31  	// Add creates new workers for every status check.
    32  	Add(statusCheck v1alpha1.StatusCheck) error
    33  	// Get returns the cached results about the status check.
    34  	Get(statusCheck v1alpha1.StatusCheck) (Result, bool)
    35  	// Delete handles cleaning up the removed status check state, including terminating workers and
    36  	// deleting cached results.
    37  	// This should be called when StatusCheck is deleted.
    38  	Delete(key types.NamespacedName)
    39  	// Complete handles terminating workers, but not deleting cached results.
    40  	// This should be called when StatusCheck is completed.
    41  	Complete(statusCheck v1alpha1.StatusCheck)
    42  }
    44  type manager struct {
    45  	logger        logr.Logger
    46  	eventRecorder recorder.ChaosRecorder
    48  	workers     workerCache
    49  	results     resultCache
    50  	newExecutor newExecutorFunc
    51  }
    53  type newExecutorFunc func(logger logr.Logger, statusCheck v1alpha1.StatusCheck) (Executor, error)
    55  func NewManager(logger logr.Logger, eventRecorder recorder.ChaosRecorder, newExecutorFunc newExecutorFunc) Manager {
    56  	return &manager{
    57  		logger:        logger,
    58  		eventRecorder: eventRecorder,
    59  		workers:       workerCache{workers: sync.Map{}},
    60  		results:       resultCache{results: make(map[types.NamespacedName]Result)},
    61  		newExecutor:   newExecutorFunc,
    62  	}
    63  }
    65  func (m *manager) Add(statusCheck v1alpha1.StatusCheck) error {
    66  	key := types.NamespacedName{Namespace: statusCheck.Namespace, Name: statusCheck.Name}
    67  	if _, ok := m.results.get(key); ok {
    68  		return nil
    69  	}
    70  	m.results.init(key, statusCheck.Status.Records, statusCheck.Status.Count, uint(statusCheck.Spec.RecordsHistoryLimit))
    72  	if statusCheck.IsCompleted() {
    73  		// if status check is completed, there is no need to create a worker
    74  		return errors.New("status check is completed")
    75  	}
    77  	executor, err := m.newExecutor(m.logger, statusCheck)
    78  	if err != nil {
    79  		return errors.Wrap(err, "new executor")
    80  	}
    81  	worker := newWorker(m.logger.WithName("worker").WithValues("statuscheck", key), m.eventRecorder, m, statusCheck, executor)
    82  	m.workers.add(key, worker)
    83  	return nil
    84  }
    86  func (m *manager) Get(statusCheck v1alpha1.StatusCheck) (Result, bool) {
    87  	key := types.NamespacedName{Namespace: statusCheck.Namespace, Name: statusCheck.Name}
    88  	result, ok := m.results.get(key)
    89  	if !ok {
    90  		return Result{}, false
    91  	}
    92  	return result, true
    93  }
    95  func (m *manager) Delete(key types.NamespacedName) {
    96  	m.results.delete(key)
    97  	m.workers.delete(key)
    98  }
   100  func (m *manager) Complete(statusCheck v1alpha1.StatusCheck) {
   101  	key := types.NamespacedName{Namespace: statusCheck.Namespace, Name: statusCheck.Name}
   102  	m.workers.delete(key)
   103  }
   105  // workerCache provides cached workers.
   106  type workerCache struct {
   107  	// Map of NamespacedName of StatusCheck -> *worker
   108  	workers sync.Map
   109  }
   111  func (c *workerCache) add(key types.NamespacedName, worker *worker) {
   112  	_, ok := c.workers.LoadOrStore(key, worker)
   113  	if !ok {
   114  		go worker.run()
   115  	}
   116  }
   118  func (c *workerCache) delete(key types.NamespacedName) {
   119  	obj, ok := c.workers.LoadAndDelete(key)
   120  	if !ok {
   121  		return
   122  	}
   123  	worker := obj.(*worker)
   124  	worker.stop()
   125  }
   127  // resultCache provides cached status check results.
   128  type resultCache struct {
   129  	// Map of NamespacedName of StatusCheck -> *result
   130  	results map[types.NamespacedName]Result
   131  	lock    sync.RWMutex
   132  }
   134  type Result struct {
   135  	Records []v1alpha1.StatusCheckRecord
   136  	Count   int64
   137  	// recordsHistoryLimit defines the number of record to retain.
   138  	recordsHistoryLimit uint
   139  }
   141  // init should only be called when adding a new worker.
   142  func (c *resultCache) init(key types.NamespacedName, obj []v1alpha1.StatusCheckRecord, count int64, limit uint) {
   143  	c.lock.Lock()
   144  	defer c.lock.Unlock()
   146  	if _, ok := c.results[key]; ok {
   147  		return
   148  	}
   149  	if len(obj) == 0 {
   150  		obj = make([]v1alpha1.StatusCheckRecord, 0)
   151  		count = 0
   152  	}
   153  	c.results[key] = Result{
   154  		Records:             limitRecords(obj, limit),
   155  		Count:               count,
   156  		recordsHistoryLimit: limit,
   157  	}
   158  }
   160  // append will append the record to the cache.
   161  // It should be only called by worker
   162  func (c *resultCache) append(key types.NamespacedName, obj v1alpha1.StatusCheckRecord) {
   163  	c.lock.Lock()
   164  	defer c.lock.Unlock()
   166  	result := c.results[key]
   167  	result.Records = append(result.Records, obj)
   168  	result.Records = limitRecords(result.Records, result.recordsHistoryLimit)
   169  	result.Count++
   170  	c.results[key] = result
   171  }
   173  func (c *resultCache) delete(key types.NamespacedName) {
   174  	c.lock.Lock()
   175  	defer c.lock.Unlock()
   177  	delete(c.results, key)
   178  }
   180  func (c *resultCache) get(key types.NamespacedName) (Result, bool) {
   181  	c.lock.RLock()
   182  	defer c.lock.RUnlock()
   183  	result, ok := c.results[key]
   184  	return result, ok
   185  }
   187  func limitRecords(records []v1alpha1.StatusCheckRecord, limit uint) []v1alpha1.StatusCheckRecord {
   188  	length := len(records)
   189  	if length < int(limit) {
   190  		return records
   191  	}
   192  	return records[length-int(limit):]
   193  }
   195  func newExecutor(logger logr.Logger, statusCheck v1alpha1.StatusCheck) (Executor, error) {
   196  	var executor Executor
   197  	switch statusCheck.Spec.Type {
   198  	case v1alpha1.TypeHTTP:
   199  		if statusCheck.Spec.EmbedStatusCheck == nil || statusCheck.Spec.HTTPStatusCheck == nil {
   200  			// this should not happen, if the webhook works as expected
   201  			return nil, errors.New("illegal status check, http should not be empty")
   202  		}
   203  		executor = http.NewExecutor(
   204  			logger.WithName("http-executor").WithValues("url", statusCheck.Spec.HTTPStatusCheck.RequestUrl),
   205  			statusCheck.Spec.TimeoutSeconds, *statusCheck.Spec.HTTPStatusCheck)
   206  	default:
   207  		return nil, errors.Errorf("unsupported type '%s'", statusCheck.Spec.Type)
   208  	}
   209  	return executor, nil
   210  }