...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/store/event/event.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/store/event

     1  // Copyright 2020 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package event
    15  
    16  import (
    17  	"context"
    18  	"fmt"
    19  	"sort"
    20  	"strconv"
    21  	"strings"
    22  	"time"
    23  
    24  	"github.com/jinzhu/gorm"
    25  
    26  	"github.com/chaos-mesh/chaos-mesh/pkg/core"
    27  	"github.com/chaos-mesh/chaos-mesh/pkg/store/dbstore"
    28  
    29  	ctrl "sigs.k8s.io/controller-runtime"
    30  )
    31  
    32  var log = ctrl.Log.WithName("store/event")
    33  
    34  // NewStore return a new EventStore.
    35  func NewStore(db *dbstore.DB) core.EventStore {
    36  	db.AutoMigrate(&core.Event{})
    37  	db.AutoMigrate(&core.PodRecord{})
    38  
    39  	es := &eventStore{db}
    40  
    41  	if err := es.DeleteIncompleteEvents(context.Background()); err != nil && !gorm.IsRecordNotFoundError(err) {
    42  		log.Error(err, "failed to delete all incomplete events")
    43  	}
    44  
    45  	return es
    46  }
    47  
    48  type eventStore struct {
    49  	db *dbstore.DB
    50  }
    51  
    52  func min(x, y int) int {
    53  	if x > y {
    54  		return y
    55  	}
    56  	return x
    57  }
    58  
    59  // findPodRecordsByEventID returns the list of PodRecords according to the eventID
    60  func (e *eventStore) findPodRecordsByEventID(_ context.Context, id uint) ([]*core.PodRecord, error) {
    61  	pods := make([]*core.PodRecord, 0)
    62  	if err := e.db.Where(
    63  		"event_id = ?", id).
    64  		Find(&pods).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
    65  		return nil, err
    66  	}
    67  	return pods, nil
    68  }
    69  
    70  // List returns the list of events
    71  func (e *eventStore) List(_ context.Context) ([]*core.Event, error) {
    72  	var resList []core.Event
    73  	eventList := make([]*core.Event, 0)
    74  
    75  	if err := e.db.Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
    76  		return nil, err
    77  	}
    78  
    79  	for _, et := range resList {
    80  		pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
    81  		if err != nil {
    82  			return nil, err
    83  		}
    84  		var event core.Event
    85  		event = et
    86  		event.Pods = pods
    87  		eventList = append(eventList, &event)
    88  	}
    89  
    90  	return eventList, nil
    91  }
    92  
    93  // ListByUID returns an event list by the uid of the experiment.
    94  func (e *eventStore) ListByUID(_ context.Context, uid string) ([]*core.Event, error) {
    95  	var resList []core.Event
    96  	eventList := make([]*core.Event, 0)
    97  
    98  	if err := e.db.Where(
    99  		"experiment_id = ?", uid).
   100  		Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   101  		return nil, err
   102  	}
   103  
   104  	for _, et := range resList {
   105  		pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
   106  		if err != nil {
   107  			return nil, err
   108  		}
   109  		var event core.Event
   110  		event = et
   111  		event.Pods = pods
   112  		eventList = append(eventList, &event)
   113  	}
   114  
   115  	return eventList, nil
   116  }
   117  
   118  // ListByExperiment returns an event list by the name and namespace of the experiment.
   119  func (e *eventStore) ListByExperiment(_ context.Context, namespace string, experiment string) ([]*core.Event, error) {
   120  	var resList []core.Event
   121  
   122  	if err := e.db.Where(
   123  		"namespace = ? and experiment = ? ",
   124  		namespace, experiment).
   125  		Find(&resList).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   126  		return nil, err
   127  	}
   128  
   129  	eventList := make([]*core.Event, 0, len(resList))
   130  	for _, et := range resList {
   131  		pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
   132  		if err != nil {
   133  			return nil, err
   134  		}
   135  		var event core.Event
   136  		event = et
   137  		event.Pods = pods
   138  		eventList = append(eventList, &event)
   139  	}
   140  
   141  	return eventList, nil
   142  }
   143  
   144  // ListByNamespace returns the list of events according to the namespace
   145  func (e *eventStore) ListByNamespace(_ context.Context, namespace string) ([]*core.Event, error) {
   146  	podRecords := make([]*core.PodRecord, 0)
   147  
   148  	if err := e.db.Where(
   149  		&core.PodRecord{Namespace: namespace}).
   150  		Find(&podRecords).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   151  		return nil, err
   152  	}
   153  
   154  	eventList := make([]*core.Event, 0, len(podRecords))
   155  	for _, pr := range podRecords {
   156  		et := new(core.Event)
   157  		if err := e.db.Where(
   158  			"id = ?", pr.EventID).
   159  			First(et).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   160  			return nil, err
   161  		}
   162  		pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
   163  		if err != nil {
   164  			return nil, err
   165  		}
   166  		et.Pods = pods
   167  		eventList = append(eventList, et)
   168  	}
   169  	return eventList, nil
   170  }
   171  
   172  // ListByPod returns the list of events according to the pod
   173  func (e *eventStore) ListByPod(_ context.Context, namespace string, name string) ([]*core.Event, error) {
   174  	podRecords := make([]*core.PodRecord, 0)
   175  
   176  	if err := e.db.Where(
   177  		&core.PodRecord{PodName: name, Namespace: namespace}).
   178  		Find(&podRecords).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   179  		return nil, err
   180  	}
   181  
   182  	eventList := make([]*core.Event, 0, len(podRecords))
   183  	for _, pr := range podRecords {
   184  		et := new(core.Event)
   185  		if err := e.db.Where(
   186  			"id = ?", pr.EventID).
   187  			First(et).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   188  			return nil, err
   189  		}
   190  
   191  		pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
   192  		if err != nil {
   193  			return nil, err
   194  		}
   195  		et.Pods = pods
   196  		eventList = append(eventList, et)
   197  	}
   198  	return eventList, nil
   199  }
   200  
   201  // Find returns an event from the datastore by ID.
   202  func (e *eventStore) Find(_ context.Context, id uint) (*core.Event, error) {
   203  	et := new(core.Event)
   204  	if err := e.db.Where(
   205  		"id = ?", id).
   206  		First(et).Error; err != nil {
   207  		return nil, err
   208  	}
   209  	pods, err := e.findPodRecordsByEventID(context.Background(), et.ID)
   210  	if err != nil {
   211  		return nil, err
   212  	}
   213  	et.Pods = pods
   214  	return et, nil
   215  }
   216  
   217  func (e *eventStore) FindByExperimentAndStartTime(
   218  	_ context.Context,
   219  	name, namespace string,
   220  	startTime *time.Time,
   221  ) (*core.Event, error) {
   222  	et := new(core.Event)
   223  	if err := e.db.Where(
   224  		"namespace = ? and experiment = ? and start_time = ?",
   225  		namespace, name, startTime).
   226  		First(et).Error; err != nil {
   227  		return nil, err
   228  	}
   229  
   230  	var pods []*core.PodRecord
   231  
   232  	if err := e.db.Where(
   233  		"event_id = ?", et.ID).
   234  		Find(&pods).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   235  		return nil, err
   236  	}
   237  
   238  	return et, nil
   239  }
   240  
   241  // Create persists a new event to the datastore.
   242  func (e *eventStore) Create(_ context.Context, et *core.Event) error {
   243  	if err := e.db.Create(et).Error; err != nil {
   244  		return err
   245  	}
   246  
   247  	for _, pod := range et.Pods {
   248  		pod.EventID = et.ID
   249  		if err := e.db.Create(pod).Error; err != nil {
   250  			return err
   251  		}
   252  	}
   253  
   254  	return nil
   255  }
   256  
   257  // Update persists an updated event to the datastore.
   258  func (e *eventStore) Update(_ context.Context, et *core.Event) error {
   259  	return e.db.Model(core.Event{}).
   260  		Where(
   261  			"namespace = ? and experiment = ? and start_time = ?",
   262  			et.Namespace, et.Experiment, et.StartTime).
   263  		Update("finish_time", et.FinishTime).
   264  		Error
   265  }
   266  
   267  // DeleteIncompleteEvents implement core.EventStore interface.
   268  func (e *eventStore) DeleteIncompleteEvents(_ context.Context) error {
   269  	return e.db.Where("finish_time IS NULL").Unscoped().
   270  		Delete(core.Event{}).Error
   271  }
   272  
   273  // ListByFilter returns an event list by podName, podNamespace, experimentName, experimentNamespace, uid, kind, startTime and finishTime.
   274  func (e *eventStore) ListByFilter(_ context.Context, filter core.Filter) ([]*core.Event, error) {
   275  	var (
   276  		resList               []*core.Event
   277  		err                   error
   278  		startTime, finishTime time.Time
   279  		limit                 int
   280  	)
   281  
   282  	if filter.LimitStr != "" {
   283  		limit, err = strconv.Atoi(filter.LimitStr)
   284  		if err != nil {
   285  			return nil, fmt.Errorf("the format of the limitStr is wrong")
   286  		}
   287  	}
   288  	if filter.StartTimeStr != "" {
   289  		startTime, err = time.Parse(time.RFC3339, strings.Replace(filter.StartTimeStr, " ", "+", -1))
   290  		if err != nil {
   291  			return nil, fmt.Errorf("the format of the startTime is wrong")
   292  		}
   293  	}
   294  	if filter.FinishTimeStr != "" {
   295  		finishTime, err = time.Parse(time.RFC3339, strings.Replace(filter.FinishTimeStr, " ", "+", -1))
   296  		if err != nil {
   297  			return nil, fmt.Errorf("the format of the finishTime is wrong")
   298  		}
   299  	}
   300  
   301  	if filter.PodName != "" {
   302  		resList, err = e.ListByPod(context.Background(), filter.PodNamespace, filter.PodName)
   303  		if err == nil && filter.LimitStr != "" {
   304  			sort.Slice(resList, func(i, j int) bool {
   305  				return resList[i].CreatedAt.After(resList[j].CreatedAt)
   306  			})
   307  			resList = resList[:min(limit, len(resList))]
   308  		}
   309  	} else if filter.PodNamespace != "" {
   310  		resList, err = e.ListByNamespace(context.Background(), filter.PodNamespace)
   311  		if err == nil && filter.LimitStr != "" {
   312  			sort.Slice(resList, func(i, j int) bool {
   313  				return resList[i].CreatedAt.After(resList[j].CreatedAt)
   314  			})
   315  			resList = resList[:min(limit, len(resList))]
   316  		}
   317  	} else {
   318  		resList, err = e.DryListByFilter(context.Background(), filter)
   319  	}
   320  	if err != nil {
   321  		return resList, err
   322  	}
   323  
   324  	eventList := make([]*core.Event, 0)
   325  	for _, event := range resList {
   326  		if filter.PodName != "" || filter.PodNamespace != "" {
   327  			if filter.ExperimentName != "" && event.Experiment != filter.ExperimentName {
   328  				continue
   329  			}
   330  			if filter.ExperimentNamespace != "" && event.Namespace != filter.ExperimentNamespace {
   331  				continue
   332  			}
   333  			if filter.UID != "" && event.ExperimentID != filter.UID {
   334  				continue
   335  			}
   336  			if filter.Kind != "" && event.Kind != filter.Kind {
   337  				continue
   338  			}
   339  			if filter.StartTimeStr != "" && event.StartTime.Before(startTime) && !event.StartTime.Equal(startTime) {
   340  				continue
   341  			}
   342  			if filter.FinishTimeStr != "" && event.FinishTime.After(finishTime) && !event.FinishTime.Equal(finishTime) {
   343  				continue
   344  			}
   345  		}
   346  		if filter.PodNamespace == "" {
   347  			pods, err := e.findPodRecordsByEventID(context.Background(), event.ID)
   348  			if err != nil {
   349  				return nil, err
   350  			}
   351  			event.Pods = pods
   352  		}
   353  		eventList = append(eventList, event)
   354  	}
   355  	return eventList, nil
   356  }
   357  
   358  // DryListByFilter returns an event list by experimentName, experimentNamespace, uid, kind, startTime and finishTime.
   359  func (e *eventStore) DryListByFilter(_ context.Context, filter core.Filter) ([]*core.Event, error) {
   360  	var (
   361  		resList []*core.Event
   362  		err     error
   363  		db      *dbstore.DB
   364  		limit   int
   365  	)
   366  
   367  	if filter.LimitStr != "" {
   368  		limit, err = strconv.Atoi(filter.LimitStr)
   369  		if err != nil {
   370  			return nil, fmt.Errorf("the format of the limitStr is wrong")
   371  		}
   372  	}
   373  	if filter.StartTimeStr != "" {
   374  		_, err = time.Parse(time.RFC3339, strings.Replace(filter.StartTimeStr, " ", "+", -1))
   375  		if err != nil {
   376  			return nil, fmt.Errorf("the format of the startTime is wrong")
   377  		}
   378  	}
   379  	if filter.FinishTimeStr != "" {
   380  		_, err = time.Parse(time.RFC3339, strings.Replace(filter.FinishTimeStr, " ", "+", -1))
   381  		if err != nil {
   382  			return nil, fmt.Errorf("the format of the finishTime is wrong")
   383  		}
   384  	}
   385  
   386  	query, args := constructQueryArgs(filter.ExperimentName, filter.ExperimentNamespace, filter.UID, filter.Kind, filter.StartTimeStr, filter.FinishTimeStr)
   387  	// List all events
   388  	if len(args) == 0 {
   389  		db = e.db
   390  	} else {
   391  		db = &dbstore.DB{DB: e.db.Where(query, args...)}
   392  	}
   393  	if filter.LimitStr != "" {
   394  		db = &dbstore.DB{DB: db.Order("created_at desc").Limit(limit)}
   395  	}
   396  	if err := db.Find(&resList).Error; err != nil &&
   397  		!gorm.IsRecordNotFoundError(err) {
   398  		return resList, err
   399  	}
   400  
   401  	return resList, err
   402  }
   403  
   404  // DeleteByFinishTime deletes events and podrecords whose time difference is greater than the given time from FinishTime.
   405  func (e *eventStore) DeleteByFinishTime(_ context.Context, ttl time.Duration) error {
   406  	eventList, err := e.List(context.Background())
   407  	if err != nil {
   408  		return err
   409  	}
   410  	nowTime := time.Now()
   411  	for _, et := range eventList {
   412  		if et.FinishTime == nil {
   413  			continue
   414  		}
   415  		if et.FinishTime.Add(ttl).Before(nowTime) {
   416  			if err := e.db.Model(core.Event{}).Unscoped().Delete(*et).Error; err != nil {
   417  				return err
   418  			}
   419  
   420  			if err := e.db.Model(core.PodRecord{}).
   421  				Where(
   422  					"event_id = ? ",
   423  					et.ID).Unscoped().Delete(core.PodRecord{}).Error; err != nil {
   424  				return err
   425  			}
   426  		}
   427  	}
   428  	return nil
   429  }
   430  
   431  // DeleteByUID deletes events by the uid of the experiment.
   432  func (e *eventStore) DeleteByUID(_ context.Context, uid string) error {
   433  	eventList, err := e.ListByUID(context.Background(), uid)
   434  	if err != nil {
   435  		return err
   436  	}
   437  	for _, et := range eventList {
   438  		if err := e.db.Model(core.PodRecord{}).
   439  			Where(
   440  				"event_id = ? ",
   441  				et.ID).Unscoped().Delete(core.PodRecord{}).Error; err != nil {
   442  			return err
   443  		}
   444  	}
   445  	return e.db.Where("experiment_id = ?", uid).Unscoped().
   446  		Delete(core.Event{}).Error
   447  }
   448  
   449  func (e *eventStore) getUID(_ context.Context, ns, name string) (string, error) {
   450  	events := make([]*core.Event, 0)
   451  
   452  	if err := e.db.Where(
   453  		&core.Event{Experiment: name, Namespace: ns}).
   454  		Find(&events).Error; err != nil {
   455  		return "", err
   456  	}
   457  
   458  	if len(events) == 0 {
   459  		return "", fmt.Errorf("get UID failure, maybe name or namespace is wrong")
   460  	}
   461  
   462  	UID := events[0].ExperimentID
   463  	st := events[0].StartTime
   464  
   465  	for _, et := range events {
   466  		if st.Before(*et.StartTime) {
   467  			st = et.StartTime
   468  			UID = et.ExperimentID
   469  		}
   470  	}
   471  	return UID, nil
   472  }
   473  
   474  // UpdateIncompleteEvents updates the incomplete event by the namespace and name
   475  func (e *eventStore) UpdateIncompleteEvents(_ context.Context, ns, name string) error {
   476  	return e.db.Model(core.Event{}).
   477  		Where(
   478  			"namespace = ? and experiment = ? and finish_time IS NULL",
   479  			ns, name).
   480  		Update("finish_time", time.Now()).
   481  		Error
   482  }
   483  
   484  func constructQueryArgs(experimentName, experimentNamespace, uid, kind, startTime, finishTime string) (string, []interface{}) {
   485  	args := make([]interface{}, 0)
   486  	query := ""
   487  	if experimentName != "" {
   488  		query += "experiment = ?"
   489  		args = append(args, experimentName)
   490  	}
   491  	if experimentNamespace != "" {
   492  		if len(args) > 0 {
   493  			query += " AND namespace = ?"
   494  		} else {
   495  			query += "namespace = ?"
   496  		}
   497  		args = append(args, experimentNamespace)
   498  	}
   499  	if uid != "" {
   500  		if len(args) > 0 {
   501  			query += " AND experiment_id = ?"
   502  		} else {
   503  			query += "experiment_id = ?"
   504  		}
   505  		args = append(args, uid)
   506  	}
   507  	if kind != "" {
   508  		if len(args) > 0 {
   509  			query += " AND kind = ?"
   510  		} else {
   511  			query += "kind = ?"
   512  		}
   513  		args = append(args, kind)
   514  	}
   515  	if startTime != "" {
   516  		if len(args) > 0 {
   517  			query += " AND start_time >= ?"
   518  		} else {
   519  			query += "start_time >= ?"
   520  		}
   521  		args = append(args, strings.Replace(startTime, "T", " ", -1))
   522  	}
   523  	if finishTime != "" {
   524  		if len(args) > 0 {
   525  			query += " AND finish_time <= ?"
   526  		} else {
   527  			query += "finish_time <= ?"
   528  		}
   529  		args = append(args, strings.Replace(finishTime, "T", " ", -1))
   530  	}
   531  
   532  	return query, args
   533  }
   534