1
2
3
4
5
6
7
8
9
10
11
12
13
14 package event
15
16 import (
17 "context"
18 "fmt"
19 "sort"
20 "strconv"
21 "strings"
22 "time"
23
24 "github.com/jinzhu/gorm"
25
26 "github.com/chaos-mesh/chaos-mesh/pkg/core"
27 "github.com/chaos-mesh/chaos-mesh/pkg/store/dbstore"
28
29 ctrl "sigs.k8s.io/controller-runtime"
30 )
31
32 var log = ctrl.Log.WithName("store/event")
33
34
35 func NewStore(db *dbstore.DB) core.EventStore {
36 db.AutoMigrate(&core.Event{})
37 db.AutoMigrate(&core.PodRecord{})
38
39 es := &eventStore{db}
40
41 if err := es.DeleteIncompleteEvents(context.Background()); err != nil && !gorm.IsRecordNotFoundError(err) {
42 log.Error(err, "failed to delete all incomplete events")
43 }
44
45 return es
46 }
47
48 type eventStore struct {
49 db *dbstore.DB
50 }
51
52 func min(x, y int) int {
53 if x > y {
54 return y
55 }
56 return x
57 }
58
59
60 func (e *eventStore) findPodRecordsByEventID(_ context.Context, id uint) ([]*core.PodRecord, error) {
61 pods := make([]*core.PodRecord, 0)
62 if err := e.db.Where(
63 "event_id = ?", id).
64 Find(&pods).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
65 return nil, err
66 }
67 return pods, nil
68 }
69
70
71 func (e *eventStore) List(_ context.Context) ([]*core.Event, error) {
72 var resList []core.Event
73 eventList := make([]*core.Event, 0)
74
75 if err := e.db.Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
76 return nil, err
77 }
78
79 for _, et := range resList {
80 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
81 if err != nil {
82 return nil, err
83 }
84 var event core.Event
85 event = et
86 event.Pods = pods
87 eventList = append(eventList, &event)
88 }
89
90 return eventList, nil
91 }
92
93
94 func (e *eventStore) ListByUID(_ context.Context, uid string) ([]*core.Event, error) {
95 var resList []core.Event
96 eventList := make([]*core.Event, 0)
97
98 if err := e.db.Where(
99 "experiment_id = ?", uid).
100 Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
101 return nil, err
102 }
103
104 for _, et := range resList {
105 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
106 if err != nil {
107 return nil, err
108 }
109 var event core.Event
110 event = et
111 event.Pods = pods
112 eventList = append(eventList, &event)
113 }
114
115 return eventList, nil
116 }
117
118
119 func (e *eventStore) ListByExperiment(_ context.Context, namespace string, experiment string) ([]*core.Event, error) {
120 var resList []core.Event
121
122 if err := e.db.Where(
123 "namespace = ? and experiment = ? ",
124 namespace, experiment).
125 Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
126 return nil, err
127 }
128
129 eventList := make([]*core.Event, 0, len(resList))
130 for _, et := range resList {
131 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
132 if err != nil {
133 return nil, err
134 }
135 var event core.Event
136 event = et
137 event.Pods = pods
138 eventList = append(eventList, &event)
139 }
140
141 return eventList, nil
142 }
143
144
145 func (e *eventStore) ListByNamespace(_ context.Context, namespace string) ([]*core.Event, error) {
146 podRecords := make([]*core.PodRecord, 0)
147
148 if err := e.db.Where(
149 &core.PodRecord{Namespace: namespace}).
150 Find(&podRecords).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
151 return nil, err
152 }
153
154 eventList := make([]*core.Event, 0, len(podRecords))
155 for _, pr := range podRecords {
156 et := new(core.Event)
157 if err := e.db.Where(
158 "id = ?", pr.EventID).
159 First(et).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
160 return nil, err
161 }
162 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
163 if err != nil {
164 return nil, err
165 }
166 et.Pods = pods
167 eventList = append(eventList, et)
168 }
169 return eventList, nil
170 }
171
172
173 func (e *eventStore) ListByPod(_ context.Context, namespace string, name string) ([]*core.Event, error) {
174 podRecords := make([]*core.PodRecord, 0)
175
176 if err := e.db.Where(
177 &core.PodRecord{PodName: name, Namespace: namespace}).
178 Find(&podRecords).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
179 return nil, err
180 }
181
182 eventList := make([]*core.Event, 0, len(podRecords))
183 for _, pr := range podRecords {
184 et := new(core.Event)
185 if err := e.db.Where(
186 "id = ?", pr.EventID).
187 First(et).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
188 return nil, err
189 }
190
191 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
192 if err != nil {
193 return nil, err
194 }
195 et.Pods = pods
196 eventList = append(eventList, et)
197 }
198 return eventList, nil
199 }
200
201
202 func (e *eventStore) Find(_ context.Context, id uint) (*core.Event, error) {
203 et := new(core.Event)
204 if err := e.db.Where(
205 "id = ?", id).
206 First(et).Error; err != nil {
207 return nil, err
208 }
209 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
210 if err != nil {
211 return nil, err
212 }
213 et.Pods = pods
214 return et, nil
215 }
216
217 func (e *eventStore) FindByExperimentAndStartTime(
218 _ context.Context,
219 name, namespace string,
220 startTime *time.Time,
221 ) (*core.Event, error) {
222 et := new(core.Event)
223 if err := e.db.Where(
224 "namespace = ? and experiment = ? and start_time = ?",
225 namespace, name, startTime).
226 First(et).Error; err != nil {
227 return nil, err
228 }
229
230 var pods []*core.PodRecord
231
232 if err := e.db.Where(
233 "event_id = ?", et.ID).
234 Find(&pods).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
235 return nil, err
236 }
237
238 return et, nil
239 }
240
241
242 func (e *eventStore) Create(_ context.Context, et *core.Event) error {
243 if err := e.db.Create(et).Error; err != nil {
244 return err
245 }
246
247 for _, pod := range et.Pods {
248 pod.EventID = et.ID
249 if err := e.db.Create(pod).Error; err != nil {
250 return err
251 }
252 }
253
254 return nil
255 }
256
257
258 func (e *eventStore) Update(_ context.Context, et *core.Event) error {
259 return e.db.Model(core.Event{}).
260 Where(
261 "namespace = ? and experiment = ? and start_time = ?",
262 et.Namespace, et.Experiment, et.StartTime).
263 Update("finish_time", et.FinishTime).
264 Error
265 }
266
267
268 func (e *eventStore) DeleteIncompleteEvents(_ context.Context) error {
269 return e.db.Where("finish_time IS NULL").Unscoped().
270 Delete(core.Event{}).Error
271 }
272
273
274 func (e *eventStore) ListByFilter(_ context.Context, filter core.Filter) ([]*core.Event, error) {
275 var (
276 resList []*core.Event
277 err error
278 startTime, finishTime time.Time
279 limit int
280 )
281
282 if filter.LimitStr != "" {
283 limit, err = strconv.Atoi(filter.LimitStr)
284 if err != nil {
285 return nil, fmt.Errorf("the format of the limitStr is wrong")
286 }
287 }
288 if filter.StartTimeStr != "" {
289 startTime, err = time.Parse(time.RFC3339, strings.Replace(filter.StartTimeStr, " ", "+", -1))
290 if err != nil {
291 return nil, fmt.Errorf("the format of the startTime is wrong")
292 }
293 }
294 if filter.FinishTimeStr != "" {
295 finishTime, err = time.Parse(time.RFC3339, strings.Replace(filter.FinishTimeStr, " ", "+", -1))
296 if err != nil {
297 return nil, fmt.Errorf("the format of the finishTime is wrong")
298 }
299 }
300
301 if filter.PodName != "" {
302 resList, err = e.ListByPod(context.Background(), filter.PodNamespace, filter.PodName)
303 if err == nil && filter.LimitStr != "" {
304 sort.Slice(resList, func(i, j int) bool {
305 return resList[i].CreatedAt.After(resList[j].CreatedAt)
306 })
307 resList = resList[:min(limit, len(resList))]
308 }
309 } else if filter.PodNamespace != "" {
310 resList, err = e.ListByNamespace(context.Background(), filter.PodNamespace)
311 if err == nil && filter.LimitStr != "" {
312 sort.Slice(resList, func(i, j int) bool {
313 return resList[i].CreatedAt.After(resList[j].CreatedAt)
314 })
315 resList = resList[:min(limit, len(resList))]
316 }
317 } else {
318 resList, err = e.DryListByFilter(context.Background(), filter)
319 }
320 if err != nil {
321 return resList, err
322 }
323
324 eventList := make([]*core.Event, 0)
325 for _, event := range resList {
326 if filter.PodName != "" || filter.PodNamespace != "" {
327 if filter.ExperimentName != "" && event.Experiment != filter.ExperimentName {
328 continue
329 }
330 if filter.ExperimentNamespace != "" && event.Namespace != filter.ExperimentNamespace {
331 continue
332 }
333 if filter.UID != "" && event.ExperimentID != filter.UID {
334 continue
335 }
336 if filter.Kind != "" && event.Kind != filter.Kind {
337 continue
338 }
339 if filter.StartTimeStr != "" && event.StartTime.Before(startTime) && !event.StartTime.Equal(startTime) {
340 continue
341 }
342 if filter.FinishTimeStr != "" && event.FinishTime.After(finishTime) && !event.FinishTime.Equal(finishTime) {
343 continue
344 }
345 }
346 if filter.PodNamespace == "" {
347 pods, err := e.findPodRecordsByEventID(context.Background(), event.ID)
348 if err != nil {
349 return nil, err
350 }
351 event.Pods = pods
352 }
353 eventList = append(eventList, event)
354 }
355 return eventList, nil
356 }
357
358
359 func (e *eventStore) DryListByFilter(_ context.Context, filter core.Filter) ([]*core.Event, error) {
360 var (
361 resList []*core.Event
362 err error
363 db *dbstore.DB
364 limit int
365 )
366
367 if filter.LimitStr != "" {
368 limit, err = strconv.Atoi(filter.LimitStr)
369 if err != nil {
370 return nil, fmt.Errorf("the format of the limitStr is wrong")
371 }
372 }
373 if filter.StartTimeStr != "" {
374 _, err = time.Parse(time.RFC3339, strings.Replace(filter.StartTimeStr, " ", "+", -1))
375 if err != nil {
376 return nil, fmt.Errorf("the format of the startTime is wrong")
377 }
378 }
379 if filter.FinishTimeStr != "" {
380 _, err = time.Parse(time.RFC3339, strings.Replace(filter.FinishTimeStr, " ", "+", -1))
381 if err != nil {
382 return nil, fmt.Errorf("the format of the finishTime is wrong")
383 }
384 }
385
386 query, args := constructQueryArgs(filter.ExperimentName, filter.ExperimentNamespace, filter.UID, filter.Kind, filter.StartTimeStr, filter.FinishTimeStr)
387
388 if len(args) == 0 {
389 db = e.db
390 } else {
391 db = &dbstore.DB{DB: e.db.Where(query, args...)}
392 }
393 if filter.LimitStr != "" {
394 db = &dbstore.DB{DB: db.Order("created_at desc").Limit(limit)}
395 }
396 if err := db.Find(&resList).Error; err != nil &&
397 !gorm.IsRecordNotFoundError(err) {
398 return resList, err
399 }
400
401 return resList, err
402 }
403
404
405 func (e *eventStore) DeleteByFinishTime(_ context.Context, ttl time.Duration) error {
406 eventList, err := e.List(context.Background())
407 if err != nil {
408 return err
409 }
410 nowTime := time.Now()
411 for _, et := range eventList {
412 if et.FinishTime == nil {
413 continue
414 }
415 if et.FinishTime.Add(ttl).Before(nowTime) {
416 if err := e.db.Model(core.Event{}).Unscoped().Delete(*et).Error; err != nil {
417 return err
418 }
419
420 if err := e.db.Model(core.PodRecord{}).
421 Where(
422 "event_id = ? ",
423 et.ID).Unscoped().Delete(core.PodRecord{}).Error; err != nil {
424 return err
425 }
426 }
427 }
428 return nil
429 }
430
431
432 func (e *eventStore) DeleteByUID(_ context.Context, uid string) error {
433 eventList, err := e.ListByUID(context.Background(), uid)
434 if err != nil {
435 return err
436 }
437 for _, et := range eventList {
438 if err := e.db.Model(core.PodRecord{}).
439 Where(
440 "event_id = ? ",
441 et.ID).Unscoped().Delete(core.PodRecord{}).Error; err != nil {
442 return err
443 }
444 }
445 return e.db.Where("experiment_id = ?", uid).Unscoped().
446 Delete(core.Event{}).Error
447 }
448
449 func (e *eventStore) getUID(_ context.Context, ns, name string) (string, error) {
450 events := make([]*core.Event, 0)
451
452 if err := e.db.Where(
453 &core.Event{Experiment: name, Namespace: ns}).
454 Find(&events).Error; err != nil {
455 return "", err
456 }
457
458 if len(events) == 0 {
459 return "", fmt.Errorf("get UID failure, maybe name or namespace is wrong")
460 }
461
462 UID := events[0].ExperimentID
463 st := events[0].StartTime
464
465 for _, et := range events {
466 if st.Before(*et.StartTime) {
467 st = et.StartTime
468 UID = et.ExperimentID
469 }
470 }
471 return UID, nil
472 }
473
474
475 func (e *eventStore) UpdateIncompleteEvents(_ context.Context, ns, name string) error {
476 return e.db.Model(core.Event{}).
477 Where(
478 "namespace = ? and experiment = ? and finish_time IS NULL",
479 ns, name).
480 Update("finish_time", time.Now()).
481 Error
482 }
483
484 func constructQueryArgs(experimentName, experimentNamespace, uid, kind, startTime, finishTime string) (string, []interface{}) {
485 args := make([]interface{}, 0)
486 query := ""
487 if experimentName != "" {
488 query += "experiment = ?"
489 args = append(args, experimentName)
490 }
491 if experimentNamespace != "" {
492 if len(args) > 0 {
493 query += " AND namespace = ?"
494 } else {
495 query += "namespace = ?"
496 }
497 args = append(args, experimentNamespace)
498 }
499 if uid != "" {
500 if len(args) > 0 {
501 query += " AND experiment_id = ?"
502 } else {
503 query += "experiment_id = ?"
504 }
505 args = append(args, uid)
506 }
507 if kind != "" {
508 if len(args) > 0 {
509 query += " AND kind = ?"
510 } else {
511 query += "kind = ?"
512 }
513 args = append(args, kind)
514 }
515 if startTime != "" {
516 if len(args) > 0 {
517 query += " AND start_time >= ?"
518 } else {
519 query += "start_time >= ?"
520 }
521 args = append(args, strings.Replace(startTime, "T", " ", -1))
522 }
523 if finishTime != "" {
524 if len(args) > 0 {
525 query += " AND finish_time <= ?"
526 } else {
527 query += "finish_time <= ?"
528 }
529 args = append(args, strings.Replace(finishTime, "T", " ", -1))
530 }
531
532 return query, args
533 }
534