...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/tasks/task_manager.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/tasks

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package tasks
    17  
    18  import (
    19  	"github.com/go-logr/logr"
    20  	"github.com/pkg/errors"
    21  
    22  	"github.com/chaos-mesh/chaos-mesh/pkg/cerr"
    23  )
    24  
    25  // Injectable stand for the base behavior of task : inject a process with IsID.
    26  type Injectable interface {
    27  	Inject(pid IsID) error
    28  }
    29  
    30  // Recoverable introduce the task recovering ability.
    31  // Used in Recover.
    32  type Recoverable interface {
    33  	Recover(pid IsID) error
    34  }
    35  
    36  // Creator init an Injectable with values.
    37  // We use it in a case that TaskConfig.Data init an Injectable task here.
    38  type Creator interface {
    39  	New(values interface{}) (Injectable, error)
    40  }
    41  
    42  // Assign change some of an Injectable task with its own values.
    43  // We use it in a case that we use TaskConfig.Data
    44  // to update an Injectable task.
    45  type Assign interface {
    46  	Assign(Injectable) error
    47  }
    48  
    49  // TaskExecutor indicate that the type can be used
    50  // for execute task here as a task config.
    51  // Mergeable means we can sum many task config in to one for apply.
    52  // Creator means we can use the task config to create a running task
    53  // which can Inject on IsID.
    54  // Assign means we can use the task config to update an existing running task.
    55  type TaskExecutor interface {
    56  	Object
    57  	Mergeable
    58  	Creator
    59  	Assign
    60  }
    61  
    62  // TaskManager is a Manager for chaos tasks.
    63  // A task base on a target marked with its IsID.
    64  // We assume task should implement Injectable.
    65  // We use TaskConfig.Data which implement TaskExecutor to:
    66  //
    67  //	Sum task configs on same IsID into one.
    68  //	Create task.
    69  //	Assign or update task.
    70  //
    71  // SO if developers wants to use functions in TaskManager ,
    72  // their imported TaskConfig need to implement interface TaskExecutor.
    73  // If developers wants to recover task successfully,
    74  // the task must implement Recoverable.
    75  // If not implement ,
    76  // the Recover function will return a ErrNotImplement("Recoverable") error.
    77  // IMPORTANT: We assume task config obey that TaskConfig.Data A,B. A.Merge(B)
    78  // is approximately equal to B.Merge(A)
    79  type TaskManager struct {
    80  	taskConfigManager TaskConfigManager
    81  	taskMap           map[IsID]Injectable
    82  
    83  	logger logr.Logger
    84  }
    85  
    86  func NewTaskManager(logger logr.Logger) TaskManager {
    87  	return TaskManager{
    88  		NewTaskConfigManager(),
    89  		make(map[IsID]Injectable),
    90  		logger,
    91  	}
    92  }
    93  
    94  func (cm TaskManager) CopyTaskConfigManager() TaskConfigManager {
    95  	tm := NewTaskConfigManager()
    96  	for uid, task := range cm.taskConfigManager.TaskConfigMap {
    97  		tm.TaskConfigMap[uid] = task
    98  	}
    99  	return tm
   100  }
   101  
   102  func (cm TaskManager) CopyTaskMap() map[IsID]Injectable {
   103  	pm := make(map[IsID]Injectable)
   104  	for pid, chaosOnProcess := range cm.taskMap {
   105  		cm.taskMap[pid] = chaosOnProcess
   106  	}
   107  	return pm
   108  }
   109  
   110  func (cm TaskManager) GetConfigWithUID(id TaskID) (TaskConfig, error) {
   111  	return cm.taskConfigManager.GetConfigWithUID(id)
   112  }
   113  
   114  func (cm TaskManager) GetTaskWithPID(pid IsID) (Injectable, error) {
   115  	p, ok := cm.taskMap[pid]
   116  	if !ok {
   117  		return nil, ErrNotFoundID.WrapInput(pid).Err()
   118  	}
   119  	return p, nil
   120  }
   121  
   122  func (cm TaskManager) GetUIDsWithPID(pid IsID) []TaskID {
   123  	return cm.taskConfigManager.GetUIDsWithPID(pid)
   124  }
   125  
   126  func (cm TaskManager) CheckTasks(uid TaskID, pid IsID) error {
   127  	config, err := cm.GetConfigWithUID(uid)
   128  	if err != nil {
   129  		return err
   130  	}
   131  	if config.Id != pid {
   132  		return ErrDiffID.Wrapf("expected: %v, input: %v", config.Id, pid).Err()
   133  	}
   134  	return nil
   135  }
   136  
   137  // Create the first task,
   138  // the New function of TaskExecutor:Creator will only be used here.
   139  // values is only the import parameter of New function in TaskExecutor:Creator.
   140  // If it comes a task are already be injected on the IsID,
   141  // Create will return ChaosErr.ErrDuplicateEntity.
   142  func (cm TaskManager) Create(uid TaskID, pid IsID, config TaskExecutor, values interface{}) error {
   143  	if _, ok := cm.taskMap[pid]; ok {
   144  		return errors.Wrapf(cerr.ErrDuplicateEntity, "create")
   145  	}
   146  
   147  	err := cm.taskConfigManager.AddTaskConfig(uid, NewTaskConfig(pid, config))
   148  	if err != nil {
   149  		return err
   150  	}
   151  
   152  	processTask, err := config.New(values)
   153  	if err != nil {
   154  		_ = cm.taskConfigManager.DeleteTaskConfig(uid)
   155  		return errors.Wrapf(err, "New task: %v", config)
   156  	}
   157  
   158  	cm.taskMap[pid] = processTask
   159  	err = cm.commit(uid, pid)
   160  	if err != nil {
   161  		_ = cm.taskConfigManager.DeleteTaskConfig(uid)
   162  		delete(cm.taskMap, pid)
   163  		return errors.Wrapf(err, "update new task")
   164  	}
   165  	return nil
   166  }
   167  
   168  // Apply the task when the target pid of task is already be Created.
   169  // If it comes a TaskID injected , Apply will return ChaosErr.ErrDuplicateEntity.
   170  // If the Process has not been Created , Apply will return ChaosErr.NotFound("IsID").
   171  func (cm TaskManager) Apply(uid TaskID, pid IsID, config TaskExecutor) error {
   172  	err := cm.taskConfigManager.AddTaskConfig(uid, NewTaskConfig(pid, config))
   173  	if err != nil {
   174  		return err
   175  	}
   176  	err = cm.commit(uid, pid)
   177  	if err != nil {
   178  		_ = cm.taskConfigManager.DeleteTaskConfig(uid)
   179  		return err
   180  	}
   181  	return nil
   182  }
   183  
   184  // Update the task with a same TaskID, IsID and new task config.
   185  // If it comes a TaskID not injected , Update will return ChaosErr.NotFound("TaskID").
   186  // If it comes the import IsID of task do not equal to the last one,
   187  // Update will return ErrDiffID.
   188  func (cm TaskManager) Update(uid TaskID, pid IsID, config TaskExecutor) error {
   189  	oldTask, err := cm.taskConfigManager.UpdateTaskConfig(uid, NewTaskConfig(pid, config))
   190  	if err != nil {
   191  		return err
   192  	}
   193  	err = cm.commit(uid, pid)
   194  	if err != nil {
   195  		_, _ = cm.taskConfigManager.UpdateTaskConfig(uid, oldTask)
   196  		return err
   197  	}
   198  	return nil
   199  }
   200  
   201  // Recover the task when there is no task config on IsID or
   202  // recovering the task with last task config on IsID.
   203  // Recover in Recoverable will be used here,
   204  // if it runs failed it will just return the error.
   205  // If Recover is failed but developer wants to clear it,
   206  // just run : TaskManager.ClearTask(pid, true).
   207  // If IsID is already recovered successfully, Recover will return ChaosErr.NotFound("IsID").
   208  // If TaskID is not Applied or Created or the target IsID of TaskID is not the import pid,
   209  // Recover will return ChaosErr.NotFound("TaskID").
   210  func (cm TaskManager) Recover(uid TaskID, pid IsID) error {
   211  	uIDs := cm.taskConfigManager.GetUIDsWithPID(pid)
   212  	if len(uIDs) == 0 {
   213  		return ErrNotFoundTaskID.WrapInput(pid).Err()
   214  	}
   215  	if len(uIDs) == 1 {
   216  		if uIDs[0] != uid {
   217  			return ErrNotFoundTaskID.WrapInput(uid).Err()
   218  		}
   219  		err := cm.ClearTask(pid, false)
   220  		if err != nil {
   221  			return err
   222  		}
   223  		err = cm.taskConfigManager.DeleteTaskConfig(uid)
   224  		if err != nil {
   225  			cm.logger.Error(err, "recover task with error")
   226  		}
   227  		return nil
   228  	}
   229  
   230  	err := cm.taskConfigManager.DeleteTaskConfig(uid)
   231  	if err != nil {
   232  		cm.logger.Error(err, "recover task with error")
   233  		return nil
   234  	}
   235  
   236  	uIDs = cm.taskConfigManager.GetUIDsWithPID(pid)
   237  
   238  	err = cm.commit(uIDs[0], pid)
   239  	if err != nil {
   240  		return errors.Wrapf(err, "update new task")
   241  	}
   242  	return nil
   243  }
   244  
   245  func (cm TaskManager) commit(uid TaskID, pid IsID) error {
   246  	task, err := cm.taskConfigManager.MergeTaskConfig(uid)
   247  	if err != nil {
   248  		return errors.Wrapf(err, "unknown recovering in the taskConfigManager, TaskID: %v", uid)
   249  	}
   250  	process, ok := cm.taskMap[pid]
   251  	if !ok {
   252  		return ErrNotFoundID.WrapInput(pid).Err()
   253  	}
   254  	tasker, ok := task.Data.(TaskExecutor)
   255  	if !ok {
   256  		return errors.New("task.Data here must implement TaskExecutor")
   257  	}
   258  	err = tasker.Assign(process)
   259  	if err != nil {
   260  		return err
   261  	}
   262  	err = process.Inject(pid)
   263  	if err != nil {
   264  		return errors.Wrapf(err, "inject existing process IsID : %v", pid)
   265  	}
   266  	return nil
   267  }
   268  
   269  // ClearTask clear the task totally.
   270  // IMPORTANT: Developer should only use this function
   271  // when want to force clear task with ignoreRecoverErr==true.
   272  func (cm TaskManager) ClearTask(pid IsID, ignoreRecoverErr bool) error {
   273  	if process, ok := cm.taskMap[pid]; ok {
   274  		pRecover, ok := process.(Recoverable)
   275  		if !ok {
   276  			return cerr.NotImpl[Recoverable]().WrapInput(process).Err()
   277  		}
   278  		err := pRecover.Recover(pid)
   279  		if err != nil {
   280  			if ignoreRecoverErr {
   281  				cm.logger.Error(errors.Wrapf(err, "recover chaos"), "ERR IGNORED")
   282  			} else {
   283  				return errors.Wrapf(err, "recover chaos")
   284  			}
   285  
   286  		}
   287  		delete(cm.taskMap, pid)
   288  		return nil
   289  	}
   290  	return ErrNotFoundID.WrapInput(pid).Err()
   291  }
   292