1
2
3
4
5
6
7
8
9
10
11
12
13
14 package event
15
16 import (
17 "context"
18 "fmt"
19 "sort"
20 "strconv"
21 "strings"
22 "time"
23
24 "github.com/jinzhu/gorm"
25
26 "github.com/chaos-mesh/chaos-mesh/pkg/core"
27 "github.com/chaos-mesh/chaos-mesh/pkg/store/dbstore"
28
29 ctrl "sigs.k8s.io/controller-runtime"
30 )
31
32 var log = ctrl.Log.WithName("store/event")
33
34
35 func NewStore(db *dbstore.DB) core.EventStore {
36 db.AutoMigrate(&core.Event{})
37 db.AutoMigrate(&core.PodRecord{})
38
39 es := &eventStore{db}
40
41 if err := es.DeleteIncompleteEvents(context.Background()); err != nil && !gorm.IsRecordNotFoundError(err) {
42 log.Error(err, "failed to delete all incomplete events")
43 }
44
45 return es
46 }
47
48 type eventStore struct {
49 db *dbstore.DB
50 }
51
52 func min(x, y int) int {
53 if x > y {
54 return y
55 }
56 return x
57 }
58
59
60 func (e *eventStore) findPodRecordsByEventID(_ context.Context, id uint) ([]*core.PodRecord, error) {
61 pods := make([]*core.PodRecord, 0)
62 if err := e.db.Where(
63 "event_id = ?", id).
64 Find(&pods).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
65 return nil, err
66 }
67 return pods, nil
68 }
69
70
71 func (e *eventStore) List(_ context.Context) ([]*core.Event, error) {
72 var resList []core.Event
73 eventList := make([]*core.Event, 0)
74
75 if err := e.db.Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
76 return nil, err
77 }
78
79 for _, et := range resList {
80 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
81 if err != nil {
82 return nil, err
83 }
84 var event core.Event = et
85 event.Pods = pods
86 eventList = append(eventList, &event)
87 }
88
89 return eventList, nil
90 }
91
92
93 func (e *eventStore) ListByUID(_ context.Context, uid string) ([]*core.Event, error) {
94 var resList []core.Event
95 eventList := make([]*core.Event, 0)
96
97 if err := e.db.Where(
98 "experiment_id = ?", uid).
99 Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
100 return nil, err
101 }
102
103 for _, et := range resList {
104 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
105 if err != nil {
106 return nil, err
107 }
108 var event core.Event = et
109 event.Pods = pods
110 eventList = append(eventList, &event)
111 }
112
113 return eventList, nil
114 }
115
116
117 func (e *eventStore) ListByUIDs(_ context.Context, uids []string) ([]*core.Event, error) {
118 var resList []core.Event
119 eventList := make([]*core.Event, 0)
120
121 if err := e.db.Table("events").Where(
122 "experiment_id IN (?)", uids).
123 Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
124 return nil, err
125 }
126
127 for _, et := range resList {
128 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
129 if err != nil {
130 return nil, err
131 }
132 var event core.Event = et
133 event.Pods = pods
134 eventList = append(eventList, &event)
135 }
136
137 return eventList, nil
138 }
139
140
141 func (e *eventStore) ListByExperiment(_ context.Context, namespace string, experiment string) ([]*core.Event, error) {
142 var resList []core.Event
143
144 if err := e.db.Where(
145 "namespace = ? and experiment = ? ",
146 namespace, experiment).
147 Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
148 return nil, err
149 }
150
151 eventList := make([]*core.Event, 0, len(resList))
152 for _, et := range resList {
153 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
154 if err != nil {
155 return nil, err
156 }
157 var event core.Event = et
158 event.Pods = pods
159 eventList = append(eventList, &event)
160 }
161
162 return eventList, nil
163 }
164
165
166 func (e *eventStore) ListByNamespace(_ context.Context, namespace string) ([]*core.Event, error) {
167 podRecords := make([]*core.PodRecord, 0)
168
169 if err := e.db.Where(
170 &core.PodRecord{Namespace: namespace}).
171 Find(&podRecords).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
172 return nil, err
173 }
174
175 eventList := make([]*core.Event, 0, len(podRecords))
176 for _, pr := range podRecords {
177 et := new(core.Event)
178 if err := e.db.Where(
179 "id = ?", pr.EventID).
180 First(et).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
181 return nil, err
182 }
183 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
184 if err != nil {
185 return nil, err
186 }
187 et.Pods = pods
188 eventList = append(eventList, et)
189 }
190 return eventList, nil
191 }
192
193
194 func (e *eventStore) ListByPod(_ context.Context, namespace string, name string) ([]*core.Event, error) {
195 podRecords := make([]*core.PodRecord, 0)
196
197 if err := e.db.Where(
198 &core.PodRecord{PodName: name, Namespace: namespace}).
199 Find(&podRecords).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
200 return nil, err
201 }
202
203 eventList := make([]*core.Event, 0, len(podRecords))
204 for _, pr := range podRecords {
205 et := new(core.Event)
206 if err := e.db.Where(
207 "id = ?", pr.EventID).
208 First(et).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
209 return nil, err
210 }
211
212 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
213 if err != nil {
214 return nil, err
215 }
216 et.Pods = pods
217 eventList = append(eventList, et)
218 }
219 return eventList, nil
220 }
221
222
223 func (e *eventStore) Find(_ context.Context, id uint) (*core.Event, error) {
224 et := new(core.Event)
225 if err := e.db.Where(
226 "id = ?", id).
227 First(et).Error; err != nil {
228 return nil, err
229 }
230 pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
231 if err != nil {
232 return nil, err
233 }
234 et.Pods = pods
235 return et, nil
236 }
237
238 func (e *eventStore) FindByExperimentAndStartTime(
239 _ context.Context,
240 name, namespace string,
241 startTime *time.Time,
242 ) (*core.Event, error) {
243 et := new(core.Event)
244 if err := e.db.Where(
245 "namespace = ? and experiment = ? and start_time = ?",
246 namespace, name, startTime).
247 First(et).Error; err != nil {
248 return nil, err
249 }
250
251 var pods []*core.PodRecord
252
253 if err := e.db.Where(
254 "event_id = ?", et.ID).
255 Find(&pods).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
256 return nil, err
257 }
258
259 return et, nil
260 }
261
262
263 func (e *eventStore) Create(_ context.Context, et *core.Event) error {
264 if err := e.db.Create(et).Error; err != nil {
265 return err
266 }
267
268 for _, pod := range et.Pods {
269 pod.EventID = et.ID
270 if err := e.db.Create(pod).Error; err != nil {
271 return err
272 }
273 }
274
275 return nil
276 }
277
278
279 func (e *eventStore) Update(_ context.Context, et *core.Event) error {
280 return e.db.Model(core.Event{}).
281 Where(
282 "namespace = ? and experiment = ? and start_time = ?",
283 et.Namespace, et.Experiment, et.StartTime).
284 Update("finish_time", et.FinishTime).
285 Error
286 }
287
288
289 func (e *eventStore) DeleteIncompleteEvents(_ context.Context) error {
290 return e.db.Where("finish_time IS NULL").Unscoped().
291 Delete(core.Event{}).Error
292 }
293
294
295 func (e *eventStore) ListByFilter(_ context.Context, filter core.Filter) ([]*core.Event, error) {
296 var (
297 resList []*core.Event
298 err error
299 startTime, finishTime time.Time
300 limit int
301 )
302
303 if filter.LimitStr != "" {
304 limit, err = strconv.Atoi(filter.LimitStr)
305 if err != nil {
306 return nil, fmt.Errorf("the format of the limitStr is wrong")
307 }
308 }
309 if filter.StartTimeStr != "" {
310 startTime, err = time.Parse(time.RFC3339, strings.Replace(filter.StartTimeStr, " ", "+", -1))
311 if err != nil {
312 return nil, fmt.Errorf("the format of the startTime is wrong")
313 }
314 }
315 if filter.FinishTimeStr != "" {
316 finishTime, err = time.Parse(time.RFC3339, strings.Replace(filter.FinishTimeStr, " ", "+", -1))
317 if err != nil {
318 return nil, fmt.Errorf("the format of the finishTime is wrong")
319 }
320 }
321
322 if filter.PodName != "" {
323 resList, err = e.ListByPod(context.Background(), filter.PodNamespace, filter.PodName)
324 if err == nil && filter.LimitStr != "" {
325 sort.Slice(resList, func(i, j int) bool {
326 return resList[i].CreatedAt.After(resList[j].CreatedAt)
327 })
328 resList = resList[:min(limit, len(resList))]
329 }
330 } else if filter.PodNamespace != "" {
331 resList, err = e.ListByNamespace(context.Background(), filter.PodNamespace)
332 if err == nil && filter.LimitStr != "" {
333 sort.Slice(resList, func(i, j int) bool {
334 return resList[i].CreatedAt.After(resList[j].CreatedAt)
335 })
336 resList = resList[:min(limit, len(resList))]
337 }
338 } else {
339 resList, err = e.DryListByFilter(context.Background(), filter)
340 }
341 if err != nil {
342 return resList, err
343 }
344
345 eventList := make([]*core.Event, 0)
346 for _, event := range resList {
347 if filter.PodName != "" || filter.PodNamespace != "" {
348 if filter.ExperimentName != "" && event.Experiment != filter.ExperimentName {
349 continue
350 }
351 if filter.ExperimentNamespace != "" && event.Namespace != filter.ExperimentNamespace {
352 continue
353 }
354 if filter.UID != "" && event.ExperimentID != filter.UID {
355 continue
356 }
357 if filter.Kind != "" && event.Kind != filter.Kind {
358 continue
359 }
360 if filter.StartTimeStr != "" && event.StartTime.Before(startTime) && !event.StartTime.Equal(startTime) {
361 continue
362 }
363 if filter.FinishTimeStr != "" && event.FinishTime.After(finishTime) && !event.FinishTime.Equal(finishTime) {
364 continue
365 }
366 }
367 if filter.PodNamespace == "" {
368 pods, err := e.findPodRecordsByEventID(context.Background(), event.ID)
369 if err != nil {
370 return nil, err
371 }
372 event.Pods = pods
373 }
374 eventList = append(eventList, event)
375 }
376 return eventList, nil
377 }
378
379
380 func (e *eventStore) DryListByFilter(_ context.Context, filter core.Filter) ([]*core.Event, error) {
381 var (
382 resList []*core.Event
383 err error
384 db *dbstore.DB
385 limit int
386 )
387
388 if filter.LimitStr != "" {
389 limit, err = strconv.Atoi(filter.LimitStr)
390 if err != nil {
391 return nil, fmt.Errorf("the format of the limitStr is wrong")
392 }
393 }
394 if filter.StartTimeStr != "" {
395 _, err = time.Parse(time.RFC3339, strings.Replace(filter.StartTimeStr, " ", "+", -1))
396 if err != nil {
397 return nil, fmt.Errorf("the format of the startTime is wrong")
398 }
399 }
400 if filter.FinishTimeStr != "" {
401 _, err = time.Parse(time.RFC3339, strings.Replace(filter.FinishTimeStr, " ", "+", -1))
402 if err != nil {
403 return nil, fmt.Errorf("the format of the finishTime is wrong")
404 }
405 }
406
407 query, args := constructQueryArgs(filter.ExperimentName, filter.ExperimentNamespace, filter.UID, filter.Kind, filter.StartTimeStr, filter.FinishTimeStr)
408
409 if len(args) == 0 {
410 db = e.db
411 } else {
412 db = &dbstore.DB{DB: e.db.Where(query, args...)}
413 }
414 if filter.LimitStr != "" {
415 db = &dbstore.DB{DB: db.Order("created_at desc").Limit(limit)}
416 }
417 if err := db.Find(&resList).Error; err != nil &&
418 !gorm.IsRecordNotFoundError(err) {
419 return resList, err
420 }
421
422 return resList, err
423 }
424
425
426 func (e *eventStore) DeleteByFinishTime(_ context.Context, ttl time.Duration) error {
427 eventList, err := e.List(context.Background())
428 if err != nil {
429 return err
430 }
431 nowTime := time.Now()
432 for _, et := range eventList {
433 if et.FinishTime == nil {
434 continue
435 }
436 if et.FinishTime.Add(ttl).Before(nowTime) {
437 if err := e.db.Model(core.Event{}).Unscoped().Delete(*et).Error; err != nil {
438 return err
439 }
440
441 if err := e.db.Model(core.PodRecord{}).
442 Where(
443 "event_id = ? ",
444 et.ID).Unscoped().Delete(core.PodRecord{}).Error; err != nil {
445 return err
446 }
447 }
448 }
449 return nil
450 }
451
452
453 func (e *eventStore) DeleteByUID(_ context.Context, uid string) error {
454 eventList, err := e.ListByUID(context.Background(), uid)
455 if err != nil {
456 return err
457 }
458 for _, et := range eventList {
459 if err := e.db.Model(core.PodRecord{}).
460 Where(
461 "event_id = ? ",
462 et.ID).Unscoped().Delete(core.PodRecord{}).Error; err != nil {
463 return err
464 }
465 }
466 return e.db.Where("experiment_id = ?", uid).Unscoped().
467 Delete(core.Event{}).Error
468 }
469
470
471 func (e *eventStore) DeleteByUIDs(_ context.Context, uids []string) error {
472 eventList, err := e.ListByUIDs(context.Background(), uids)
473 if err != nil {
474 return err
475 }
476 eventIDList := make([]uint, len(eventList))
477 for _, et := range eventList {
478 eventIDList = append(eventIDList, et.ID)
479 }
480 if err = e.db.Model(core.PodRecord{}).Where("event_id IN (?)", eventIDList).Unscoped().Delete(core.PodRecord{}).Error; err != nil {
481 return err
482 }
483 return e.db.Where("experiment_id IN (?)", uids).Unscoped().Delete(core.Event{}).Error
484 }
485
486 func (e *eventStore) getUID(_ context.Context, ns, name string) (string, error) {
487 events := make([]*core.Event, 0)
488
489 if err := e.db.Where(
490 &core.Event{Experiment: name, Namespace: ns}).
491 Find(&events).Error; err != nil {
492 return "", err
493 }
494
495 if len(events) == 0 {
496 return "", fmt.Errorf("get UID failure, maybe name or namespace is wrong")
497 }
498
499 UID := events[0].ExperimentID
500 st := events[0].StartTime
501
502 for _, et := range events {
503 if st.Before(*et.StartTime) {
504 st = et.StartTime
505 UID = et.ExperimentID
506 }
507 }
508 return UID, nil
509 }
510
511
512 func (e *eventStore) UpdateIncompleteEvents(_ context.Context, ns, name string) error {
513 return e.db.Model(core.Event{}).
514 Where(
515 "namespace = ? and experiment = ? and finish_time IS NULL",
516 ns, name).
517 Update("finish_time", time.Now()).
518 Error
519 }
520
521 func constructQueryArgs(experimentName, experimentNamespace, uid, kind, startTime, finishTime string) (string, []interface{}) {
522 args := make([]interface{}, 0)
523 query := ""
524 if experimentName != "" {
525 query += "experiment = ?"
526 args = append(args, experimentName)
527 }
528 if experimentNamespace != "" {
529 if len(args) > 0 {
530 query += " AND namespace = ?"
531 } else {
532 query += "namespace = ?"
533 }
534 args = append(args, experimentNamespace)
535 }
536 if uid != "" {
537 if len(args) > 0 {
538 query += " AND experiment_id = ?"
539 } else {
540 query += "experiment_id = ?"
541 }
542 args = append(args, uid)
543 }
544 if kind != "" {
545 if len(args) > 0 {
546 query += " AND kind = ?"
547 } else {
548 query += "kind = ?"
549 }
550 args = append(args, kind)
551 }
552 if startTime != "" {
553 if len(args) > 0 {
554 query += " AND start_time >= ?"
555 } else {
556 query += "start_time >= ?"
557 }
558 args = append(args, strings.Replace(startTime, "T", " ", -1))
559 }
560 if finishTime != "" {
561 if len(args) > 0 {
562 query += " AND finish_time <= ?"
563 } else {
564 query += "finish_time <= ?"
565 }
566 args = append(args, strings.Replace(finishTime, "T", " ", -1))
567 }
568
569 return query, args
570 }
571