1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package tasks
17
18 import (
19 "github.com/go-logr/logr"
20 "github.com/pkg/errors"
21
22 "github.com/chaos-mesh/chaos-mesh/pkg/cerr"
23 )
24
25
26 type Injectable interface {
27 Inject(pid IsID) error
28 }
29
30
31
32 type Recoverable interface {
33 Recover(pid IsID) error
34 }
35
36
37
38 type Creator interface {
39 New(values interface{}) (Injectable, error)
40 }
41
42
43
44
45 type Assign interface {
46 Assign(Injectable) error
47 }
48
49
50
51
52
53
54
55 type TaskExecutor interface {
56 Object
57 Mergeable
58 Creator
59 Assign
60 }
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 type TaskManager struct {
80 taskConfigManager TaskConfigManager
81 taskMap map[IsID]Injectable
82
83 logger logr.Logger
84 }
85
86 func NewTaskManager(logger logr.Logger) TaskManager {
87 return TaskManager{
88 NewTaskConfigManager(),
89 make(map[IsID]Injectable),
90 logger,
91 }
92 }
93
94 func (cm TaskManager) CopyTaskConfigManager() TaskConfigManager {
95 tm := NewTaskConfigManager()
96 for uid, task := range cm.taskConfigManager.TaskConfigMap {
97 tm.TaskConfigMap[uid] = task
98 }
99 return tm
100 }
101
102 func (cm TaskManager) CopyTaskMap() map[IsID]Injectable {
103 pm := make(map[IsID]Injectable)
104 for pid, chaosOnProcess := range cm.taskMap {
105 cm.taskMap[pid] = chaosOnProcess
106 }
107 return pm
108 }
109
110 func (cm TaskManager) GetConfigWithUID(id TaskID) (TaskConfig, error) {
111 return cm.taskConfigManager.GetConfigWithUID(id)
112 }
113
114 func (cm TaskManager) GetTaskWithPID(pid IsID) (Injectable, error) {
115 p, ok := cm.taskMap[pid]
116 if !ok {
117 return nil, ErrNotFoundID.WrapInput(pid).Err()
118 }
119 return p, nil
120 }
121
122 func (cm TaskManager) GetUIDsWithPID(pid IsID) []TaskID {
123 return cm.taskConfigManager.GetUIDsWithPID(pid)
124 }
125
126 func (cm TaskManager) CheckTasks(uid TaskID, pid IsID) error {
127 config, err := cm.GetConfigWithUID(uid)
128 if err != nil {
129 return err
130 }
131 if config.Id != pid {
132 return ErrDiffID.Wrapf("expected: %v, input: %v", config.Id, pid).Err()
133 }
134 return nil
135 }
136
137
138
139
140
141
142 func (cm TaskManager) Create(uid TaskID, pid IsID, config TaskExecutor, values interface{}) error {
143 if _, ok := cm.taskMap[pid]; ok {
144 return errors.Wrapf(cerr.ErrDuplicateEntity, "create")
145 }
146
147 err := cm.taskConfigManager.AddTaskConfig(uid, NewTaskConfig(pid, config))
148 if err != nil {
149 return err
150 }
151
152 processTask, err := config.New(values)
153 if err != nil {
154 _ = cm.taskConfigManager.DeleteTaskConfig(uid)
155 return errors.Wrapf(err, "New task: %v", config)
156 }
157
158 cm.taskMap[pid] = processTask
159 err = cm.commit(uid, pid)
160 if err != nil {
161 _ = cm.taskConfigManager.DeleteTaskConfig(uid)
162 delete(cm.taskMap, pid)
163 return errors.Wrapf(err, "update new task")
164 }
165 return nil
166 }
167
168
169
170
171 func (cm TaskManager) Apply(uid TaskID, pid IsID, config TaskExecutor) error {
172 err := cm.taskConfigManager.AddTaskConfig(uid, NewTaskConfig(pid, config))
173 if err != nil {
174 return err
175 }
176 err = cm.commit(uid, pid)
177 if err != nil {
178 _ = cm.taskConfigManager.DeleteTaskConfig(uid)
179 return err
180 }
181 return nil
182 }
183
184
185
186
187
188 func (cm TaskManager) Update(uid TaskID, pid IsID, config TaskExecutor) error {
189 oldTask, err := cm.taskConfigManager.UpdateTaskConfig(uid, NewTaskConfig(pid, config))
190 if err != nil {
191 return err
192 }
193 err = cm.commit(uid, pid)
194 if err != nil {
195 _, _ = cm.taskConfigManager.UpdateTaskConfig(uid, oldTask)
196 return err
197 }
198 return nil
199 }
200
201
202
203
204
205
206
207
208
209
210 func (cm TaskManager) Recover(uid TaskID, pid IsID) error {
211 uIDs := cm.taskConfigManager.GetUIDsWithPID(pid)
212 if len(uIDs) == 0 {
213 return ErrNotFoundTaskID.WrapInput(pid).Err()
214 }
215 if len(uIDs) == 1 {
216 if uIDs[0] != uid {
217 return ErrNotFoundTaskID.WrapInput(uid).Err()
218 }
219 err := cm.ClearTask(pid, false)
220 if err != nil {
221 return err
222 }
223 err = cm.taskConfigManager.DeleteTaskConfig(uid)
224 if err != nil {
225 cm.logger.Error(err, "recover task with error")
226 }
227 return nil
228 }
229
230 err := cm.taskConfigManager.DeleteTaskConfig(uid)
231 if err != nil {
232 cm.logger.Error(err, "recover task with error")
233 return nil
234 }
235
236 uIDs = cm.taskConfigManager.GetUIDsWithPID(pid)
237
238 err = cm.commit(uIDs[0], pid)
239 if err != nil {
240 return errors.Wrapf(err, "update new task")
241 }
242 return nil
243 }
244
245 func (cm TaskManager) commit(uid TaskID, pid IsID) error {
246 task, err := cm.taskConfigManager.MergeTaskConfig(uid)
247 if err != nil {
248 return errors.Wrapf(err, "unknown recovering in the taskConfigManager, TaskID: %v", uid)
249 }
250 process, ok := cm.taskMap[pid]
251 if !ok {
252 return ErrNotFoundID.WrapInput(pid).Err()
253 }
254 tasker, ok := task.Data.(TaskExecutor)
255 if !ok {
256 return errors.New("task.Data here must implement TaskExecutor")
257 }
258 err = tasker.Assign(process)
259 if err != nil {
260 return err
261 }
262 err = process.Inject(pid)
263 if err != nil {
264 return errors.Wrapf(err, "inject existing process IsID : %v", pid)
265 }
266 return nil
267 }
268
269
270
271
272 func (cm TaskManager) ClearTask(pid IsID, ignoreRecoverErr bool) error {
273 if process, ok := cm.taskMap[pid]; ok {
274 pRecover, ok := process.(Recoverable)
275 if !ok {
276 return cerr.NotImpl[Recoverable]().WrapInput(process).Err()
277 }
278 err := pRecover.Recover(pid)
279 if err != nil {
280 if ignoreRecoverErr {
281 cm.logger.Error(errors.Wrapf(err, "recover chaos"), "ERR IGNORED")
282 } else {
283 return errors.Wrapf(err, "recover chaos")
284 }
285
286 }
287 delete(cm.taskMap, pid)
288 return nil
289 }
290 return ErrNotFoundID.WrapInput(pid).Err()
291 }
292