...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/store/event/event.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/store/event

     1  // Copyright 2020 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package event
    15  
    16  import (
    17  	"context"
    18  	"fmt"
    19  	"sort"
    20  	"strconv"
    21  	"strings"
    22  	"time"
    23  
    24  	"github.com/jinzhu/gorm"
    25  
    26  	"github.com/chaos-mesh/chaos-mesh/pkg/core"
    27  	"github.com/chaos-mesh/chaos-mesh/pkg/store/dbstore"
    28  
    29  	ctrl "sigs.k8s.io/controller-runtime"
    30  )
    31  
    32  var log = ctrl.Log.WithName("store/event")
    33  
    34  // NewStore return a new EventStore.
    35  func NewStore(db *dbstore.DB) core.EventStore {
    36  	db.AutoMigrate(&core.Event{})
    37  	db.AutoMigrate(&core.PodRecord{})
    38  
    39  	es := &eventStore{db}
    40  
    41  	if err := es.DeleteIncompleteEvents(context.Background()); err != nil && !gorm.IsRecordNotFoundError(err) {
    42  		log.Error(err, "failed to delete all incomplete events")
    43  	}
    44  
    45  	return es
    46  }
    47  
    48  type eventStore struct {
    49  	db *dbstore.DB
    50  }
    51  
    52  func min(x, y int) int {
    53  	if x > y {
    54  		return y
    55  	}
    56  	return x
    57  }
    58  
    59  // findPodRecordsByEventID returns the list of PodRecords according to the eventID
    60  func (e *eventStore) findPodRecordsByEventID(_ context.Context, id uint) ([]*core.PodRecord, error) {
    61  	pods := make([]*core.PodRecord, 0)
    62  	if err := e.db.Where(
    63  		"event_id = ?", id).
    64  		Find(&pods).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
    65  		return nil, err
    66  	}
    67  	return pods, nil
    68  }
    69  
    70  // List returns the list of events
    71  func (e *eventStore) List(_ context.Context) ([]*core.Event, error) {
    72  	var resList []core.Event
    73  	eventList := make([]*core.Event, 0)
    74  
    75  	if err := e.db.Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
    76  		return nil, err
    77  	}
    78  
    79  	for _, et := range resList {
    80  		pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
    81  		if err != nil {
    82  			return nil, err
    83  		}
    84  		var event core.Event = et
    85  		event.Pods = pods
    86  		eventList = append(eventList, &event)
    87  	}
    88  
    89  	return eventList, nil
    90  }
    91  
    92  // ListByUID returns an event list by the uid of the experiment.
    93  func (e *eventStore) ListByUID(_ context.Context, uid string) ([]*core.Event, error) {
    94  	var resList []core.Event
    95  	eventList := make([]*core.Event, 0)
    96  
    97  	if err := e.db.Where(
    98  		"experiment_id = ?", uid).
    99  		Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   100  		return nil, err
   101  	}
   102  
   103  	for _, et := range resList {
   104  		pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
   105  		if err != nil {
   106  			return nil, err
   107  		}
   108  		var event core.Event = et
   109  		event.Pods = pods
   110  		eventList = append(eventList, &event)
   111  	}
   112  
   113  	return eventList, nil
   114  }
   115  
   116  // ListByUID returns an event list by the uid of the experiment.
   117  func (e *eventStore) ListByUIDs(_ context.Context, uids []string) ([]*core.Event, error) {
   118  	var resList []core.Event
   119  	eventList := make([]*core.Event, 0)
   120  
   121  	if err := e.db.Table("events").Where(
   122  		"experiment_id IN (?)", uids).
   123  		Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   124  		return nil, err
   125  	}
   126  
   127  	for _, et := range resList {
   128  		pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
   129  		if err != nil {
   130  			return nil, err
   131  		}
   132  		var event core.Event = et
   133  		event.Pods = pods
   134  		eventList = append(eventList, &event)
   135  	}
   136  
   137  	return eventList, nil
   138  }
   139  
   140  // ListByExperiment returns an event list by the name and namespace of the experiment.
   141  func (e *eventStore) ListByExperiment(_ context.Context, namespace string, experiment string) ([]*core.Event, error) {
   142  	var resList []core.Event
   143  
   144  	if err := e.db.Where(
   145  		"namespace = ? and experiment = ? ",
   146  		namespace, experiment).
   147  		Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   148  		return nil, err
   149  	}
   150  
   151  	eventList := make([]*core.Event, 0, len(resList))
   152  	for _, et := range resList {
   153  		pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
   154  		if err != nil {
   155  			return nil, err
   156  		}
   157  		var event core.Event = et
   158  		event.Pods = pods
   159  		eventList = append(eventList, &event)
   160  	}
   161  
   162  	return eventList, nil
   163  }
   164  
   165  // ListByNamespace returns the list of events according to the namespace
   166  func (e *eventStore) ListByNamespace(_ context.Context, namespace string) ([]*core.Event, error) {
   167  	podRecords := make([]*core.PodRecord, 0)
   168  
   169  	if err := e.db.Where(
   170  		&core.PodRecord{Namespace: namespace}).
   171  		Find(&podRecords).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   172  		return nil, err
   173  	}
   174  
   175  	eventList := make([]*core.Event, 0, len(podRecords))
   176  	for _, pr := range podRecords {
   177  		et := new(core.Event)
   178  		if err := e.db.Where(
   179  			"id = ?", pr.EventID).
   180  			First(et).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   181  			return nil, err
   182  		}
   183  		pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
   184  		if err != nil {
   185  			return nil, err
   186  		}
   187  		et.Pods = pods
   188  		eventList = append(eventList, et)
   189  	}
   190  	return eventList, nil
   191  }
   192  
   193  // ListByPod returns the list of events according to the pod
   194  func (e *eventStore) ListByPod(_ context.Context, namespace string, name string) ([]*core.Event, error) {
   195  	podRecords := make([]*core.PodRecord, 0)
   196  
   197  	if err := e.db.Where(
   198  		&core.PodRecord{PodName: name, Namespace: namespace}).
   199  		Find(&podRecords).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   200  		return nil, err
   201  	}
   202  
   203  	eventList := make([]*core.Event, 0, len(podRecords))
   204  	for _, pr := range podRecords {
   205  		et := new(core.Event)
   206  		if err := e.db.Where(
   207  			"id = ?", pr.EventID).
   208  			First(et).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   209  			return nil, err
   210  		}
   211  
   212  		pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
   213  		if err != nil {
   214  			return nil, err
   215  		}
   216  		et.Pods = pods
   217  		eventList = append(eventList, et)
   218  	}
   219  	return eventList, nil
   220  }
   221  
   222  // Find returns an event from the datastore by ID.
   223  func (e *eventStore) Find(_ context.Context, id uint) (*core.Event, error) {
   224  	et := new(core.Event)
   225  	if err := e.db.Where(
   226  		"id = ?", id).
   227  		First(et).Error; err != nil {
   228  		return nil, err
   229  	}
   230  	pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
   231  	if err != nil {
   232  		return nil, err
   233  	}
   234  	et.Pods = pods
   235  	return et, nil
   236  }
   237  
   238  func (e *eventStore) FindByExperimentAndStartTime(
   239  	_ context.Context,
   240  	name, namespace string,
   241  	startTime *time.Time,
   242  ) (*core.Event, error) {
   243  	et := new(core.Event)
   244  	if err := e.db.Where(
   245  		"namespace = ? and experiment = ? and start_time = ?",
   246  		namespace, name, startTime).
   247  		First(et).Error; err != nil {
   248  		return nil, err
   249  	}
   250  
   251  	var pods []*core.PodRecord
   252  
   253  	if err := e.db.Where(
   254  		"event_id = ?", et.ID).
   255  		Find(&pods).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   256  		return nil, err
   257  	}
   258  
   259  	return et, nil
   260  }
   261  
   262  // Create persists a new event to the datastore.
   263  func (e *eventStore) Create(_ context.Context, et *core.Event) error {
   264  	if err := e.db.Create(et).Error; err != nil {
   265  		return err
   266  	}
   267  
   268  	for _, pod := range et.Pods {
   269  		pod.EventID = et.ID
   270  		if err := e.db.Create(pod).Error; err != nil {
   271  			return err
   272  		}
   273  	}
   274  
   275  	return nil
   276  }
   277  
   278  // Update persists an updated event to the datastore.
   279  func (e *eventStore) Update(_ context.Context, et *core.Event) error {
   280  	return e.db.Model(core.Event{}).
   281  		Where(
   282  			"namespace = ? and experiment = ? and start_time = ?",
   283  			et.Namespace, et.Experiment, et.StartTime).
   284  		Update("finish_time", et.FinishTime).
   285  		Error
   286  }
   287  
   288  // DeleteIncompleteEvents implement core.EventStore interface.
   289  func (e *eventStore) DeleteIncompleteEvents(_ context.Context) error {
   290  	return e.db.Where("finish_time IS NULL").Unscoped().
   291  		Delete(core.Event{}).Error
   292  }
   293  
   294  // ListByFilter returns an event list by podName, podNamespace, experimentName, experimentNamespace, uid, kind, startTime and finishTime.
   295  func (e *eventStore) ListByFilter(_ context.Context, filter core.Filter) ([]*core.Event, error) {
   296  	var (
   297  		resList               []*core.Event
   298  		err                   error
   299  		startTime, finishTime time.Time
   300  		limit                 int
   301  	)
   302  
   303  	if filter.LimitStr != "" {
   304  		limit, err = strconv.Atoi(filter.LimitStr)
   305  		if err != nil {
   306  			return nil, fmt.Errorf("the format of the limitStr is wrong")
   307  		}
   308  	}
   309  	if filter.StartTimeStr != "" {
   310  		startTime, err = time.Parse(time.RFC3339, strings.Replace(filter.StartTimeStr, " ", "+", -1))
   311  		if err != nil {
   312  			return nil, fmt.Errorf("the format of the startTime is wrong")
   313  		}
   314  	}
   315  	if filter.FinishTimeStr != "" {
   316  		finishTime, err = time.Parse(time.RFC3339, strings.Replace(filter.FinishTimeStr, " ", "+", -1))
   317  		if err != nil {
   318  			return nil, fmt.Errorf("the format of the finishTime is wrong")
   319  		}
   320  	}
   321  
   322  	if filter.PodName != "" {
   323  		resList, err = e.ListByPod(context.Background(), filter.PodNamespace, filter.PodName)
   324  		if err == nil && filter.LimitStr != "" {
   325  			sort.Slice(resList, func(i, j int) bool {
   326  				return resList[i].CreatedAt.After(resList[j].CreatedAt)
   327  			})
   328  			resList = resList[:min(limit, len(resList))]
   329  		}
   330  	} else if filter.PodNamespace != "" {
   331  		resList, err = e.ListByNamespace(context.Background(), filter.PodNamespace)
   332  		if err == nil && filter.LimitStr != "" {
   333  			sort.Slice(resList, func(i, j int) bool {
   334  				return resList[i].CreatedAt.After(resList[j].CreatedAt)
   335  			})
   336  			resList = resList[:min(limit, len(resList))]
   337  		}
   338  	} else {
   339  		resList, err = e.DryListByFilter(context.Background(), filter)
   340  	}
   341  	if err != nil {
   342  		return resList, err
   343  	}
   344  
   345  	eventList := make([]*core.Event, 0)
   346  	for _, event := range resList {
   347  		if filter.PodName != "" || filter.PodNamespace != "" {
   348  			if filter.ExperimentName != "" && event.Experiment != filter.ExperimentName {
   349  				continue
   350  			}
   351  			if filter.ExperimentNamespace != "" && event.Namespace != filter.ExperimentNamespace {
   352  				continue
   353  			}
   354  			if filter.UID != "" && event.ExperimentID != filter.UID {
   355  				continue
   356  			}
   357  			if filter.Kind != "" && event.Kind != filter.Kind {
   358  				continue
   359  			}
   360  			if filter.StartTimeStr != "" && event.StartTime.Before(startTime) && !event.StartTime.Equal(startTime) {
   361  				continue
   362  			}
   363  			if filter.FinishTimeStr != "" && event.FinishTime.After(finishTime) && !event.FinishTime.Equal(finishTime) {
   364  				continue
   365  			}
   366  		}
   367  		if filter.PodNamespace == "" {
   368  			pods, err := e.findPodRecordsByEventID(context.Background(), event.ID)
   369  			if err != nil {
   370  				return nil, err
   371  			}
   372  			event.Pods = pods
   373  		}
   374  		eventList = append(eventList, event)
   375  	}
   376  	return eventList, nil
   377  }
   378  
   379  // DryListByFilter returns an event list by experimentName, experimentNamespace, uid, kind, startTime and finishTime.
   380  func (e *eventStore) DryListByFilter(_ context.Context, filter core.Filter) ([]*core.Event, error) {
   381  	var (
   382  		resList []*core.Event
   383  		err     error
   384  		db      *dbstore.DB
   385  		limit   int
   386  	)
   387  
   388  	if filter.LimitStr != "" {
   389  		limit, err = strconv.Atoi(filter.LimitStr)
   390  		if err != nil {
   391  			return nil, fmt.Errorf("the format of the limitStr is wrong")
   392  		}
   393  	}
   394  	if filter.StartTimeStr != "" {
   395  		_, err = time.Parse(time.RFC3339, strings.Replace(filter.StartTimeStr, " ", "+", -1))
   396  		if err != nil {
   397  			return nil, fmt.Errorf("the format of the startTime is wrong")
   398  		}
   399  	}
   400  	if filter.FinishTimeStr != "" {
   401  		_, err = time.Parse(time.RFC3339, strings.Replace(filter.FinishTimeStr, " ", "+", -1))
   402  		if err != nil {
   403  			return nil, fmt.Errorf("the format of the finishTime is wrong")
   404  		}
   405  	}
   406  
   407  	query, args := constructQueryArgs(filter.ExperimentName, filter.ExperimentNamespace, filter.UID, filter.Kind, filter.StartTimeStr, filter.FinishTimeStr)
   408  	// List all events
   409  	if len(args) == 0 {
   410  		db = e.db
   411  	} else {
   412  		db = &dbstore.DB{DB: e.db.Where(query, args...)}
   413  	}
   414  	if filter.LimitStr != "" {
   415  		db = &dbstore.DB{DB: db.Order("created_at desc").Limit(limit)}
   416  	}
   417  	if err := db.Find(&resList).Error; err != nil &&
   418  		!gorm.IsRecordNotFoundError(err) {
   419  		return resList, err
   420  	}
   421  
   422  	return resList, err
   423  }
   424  
   425  // DeleteByFinishTime deletes events and podrecords whose time difference is greater than the given time from FinishTime.
   426  func (e *eventStore) DeleteByFinishTime(_ context.Context, ttl time.Duration) error {
   427  	eventList, err := e.List(context.Background())
   428  	if err != nil {
   429  		return err
   430  	}
   431  	nowTime := time.Now()
   432  	for _, et := range eventList {
   433  		if et.FinishTime == nil {
   434  			continue
   435  		}
   436  		if et.FinishTime.Add(ttl).Before(nowTime) {
   437  			if err := e.db.Model(core.Event{}).Unscoped().Delete(*et).Error; err != nil {
   438  				return err
   439  			}
   440  
   441  			if err := e.db.Model(core.PodRecord{}).
   442  				Where(
   443  					"event_id = ? ",
   444  					et.ID).Unscoped().Delete(core.PodRecord{}).Error; err != nil {
   445  				return err
   446  			}
   447  		}
   448  	}
   449  	return nil
   450  }
   451  
   452  // DeleteByUID deletes events by the uid of the experiment.
   453  func (e *eventStore) DeleteByUID(_ context.Context, uid string) error {
   454  	eventList, err := e.ListByUID(context.Background(), uid)
   455  	if err != nil {
   456  		return err
   457  	}
   458  	for _, et := range eventList {
   459  		if err := e.db.Model(core.PodRecord{}).
   460  			Where(
   461  				"event_id = ? ",
   462  				et.ID).Unscoped().Delete(core.PodRecord{}).Error; err != nil {
   463  			return err
   464  		}
   465  	}
   466  	return e.db.Where("experiment_id = ?", uid).Unscoped().
   467  		Delete(core.Event{}).Error
   468  }
   469  
   470  // DeleteByUIDs deletes events by the uid list of the experiment.
   471  func (e *eventStore) DeleteByUIDs(_ context.Context, uids []string) error {
   472  	eventList, err := e.ListByUIDs(context.Background(), uids)
   473  	if err != nil {
   474  		return err
   475  	}
   476  	eventIDList := make([]uint, len(eventList))
   477  	for _, et := range eventList {
   478  		eventIDList = append(eventIDList, et.ID)
   479  	}
   480  	if err = e.db.Model(core.PodRecord{}).Where("event_id IN (?)", eventIDList).Unscoped().Delete(core.PodRecord{}).Error; err != nil {
   481  		return err
   482  	}
   483  	return e.db.Where("experiment_id IN (?)", uids).Unscoped().Delete(core.Event{}).Error
   484  }
   485  
   486  func (e *eventStore) getUID(_ context.Context, ns, name string) (string, error) {
   487  	events := make([]*core.Event, 0)
   488  
   489  	if err := e.db.Where(
   490  		&core.Event{Experiment: name, Namespace: ns}).
   491  		Find(&events).Error; err != nil {
   492  		return "", err
   493  	}
   494  
   495  	if len(events) == 0 {
   496  		return "", fmt.Errorf("get UID failure, maybe name or namespace is wrong")
   497  	}
   498  
   499  	UID := events[0].ExperimentID
   500  	st := events[0].StartTime
   501  
   502  	for _, et := range events {
   503  		if st.Before(*et.StartTime) {
   504  			st = et.StartTime
   505  			UID = et.ExperimentID
   506  		}
   507  	}
   508  	return UID, nil
   509  }
   510  
   511  // UpdateIncompleteEvents updates the incomplete event by the namespace and name
   512  func (e *eventStore) UpdateIncompleteEvents(_ context.Context, ns, name string) error {
   513  	return e.db.Model(core.Event{}).
   514  		Where(
   515  			"namespace = ? and experiment = ? and finish_time IS NULL",
   516  			ns, name).
   517  		Update("finish_time", time.Now()).
   518  		Error
   519  }
   520  
   521  func constructQueryArgs(experimentName, experimentNamespace, uid, kind, startTime, finishTime string) (string, []interface{}) {
   522  	args := make([]interface{}, 0)
   523  	query := ""
   524  	if experimentName != "" {
   525  		query += "experiment = ?"
   526  		args = append(args, experimentName)
   527  	}
   528  	if experimentNamespace != "" {
   529  		if len(args) > 0 {
   530  			query += " AND namespace = ?"
   531  		} else {
   532  			query += "namespace = ?"
   533  		}
   534  		args = append(args, experimentNamespace)
   535  	}
   536  	if uid != "" {
   537  		if len(args) > 0 {
   538  			query += " AND experiment_id = ?"
   539  		} else {
   540  			query += "experiment_id = ?"
   541  		}
   542  		args = append(args, uid)
   543  	}
   544  	if kind != "" {
   545  		if len(args) > 0 {
   546  			query += " AND kind = ?"
   547  		} else {
   548  			query += "kind = ?"
   549  		}
   550  		args = append(args, kind)
   551  	}
   552  	if startTime != "" {
   553  		if len(args) > 0 {
   554  			query += " AND start_time >= ?"
   555  		} else {
   556  			query += "start_time >= ?"
   557  		}
   558  		args = append(args, strings.Replace(startTime, "T", " ", -1))
   559  	}
   560  	if finishTime != "" {
   561  		if len(args) > 0 {
   562  			query += " AND finish_time <= ?"
   563  		} else {
   564  			query += "finish_time <= ?"
   565  		}
   566  		args = append(args, strings.Replace(finishTime, "T", " ", -1))
   567  	}
   568  
   569  	return query, args
   570  }
   571