...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/store/workflow/workflow.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/store/workflow

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package workflow
    17  
    18  import (
    19  	"context"
    20  	"time"
    21  
    22  	"github.com/jinzhu/gorm"
    23  	ctrl "sigs.k8s.io/controller-runtime"
    24  
    25  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
    26  )
    27  
    28  var log = ctrl.Log.WithName("store/workflow")
    29  
    30  type WorkflowStore struct {
    31  	db *gorm.DB
    32  }
    33  
    34  func NewStore(db *gorm.DB) core.WorkflowStore {
    35  	db.AutoMigrate(&core.WorkflowEntity{})
    36  
    37  	return &WorkflowStore{db}
    38  }
    39  
    40  func (it *WorkflowStore) List(ctx context.Context, namespace, name string, archived bool) ([]*core.WorkflowEntity, error) {
    41  	var entities []core.WorkflowEntity
    42  	query, args := constructQueryArgs(namespace, name, "")
    43  
    44  	err := it.db.Where(query, args).Where("archived = ?", archived).Find(&entities).Error
    45  	if err != nil && !gorm.IsRecordNotFoundError(err) {
    46  		return nil, err
    47  	}
    48  
    49  	var result []*core.WorkflowEntity
    50  	for _, item := range entities {
    51  		item := item
    52  		result = append(result, &item)
    53  	}
    54  	return result, nil
    55  }
    56  
    57  func (it *WorkflowStore) ListMeta(ctx context.Context, namespace, name string, archived bool) ([]*core.WorkflowMeta, error) {
    58  	entities, err := it.List(ctx, namespace, name, archived)
    59  	if err != nil {
    60  		return nil, err
    61  	}
    62  	var result []*core.WorkflowMeta
    63  	for _, item := range entities {
    64  		item := item
    65  		result = append(result, &item.WorkflowMeta)
    66  	}
    67  	return result, nil
    68  }
    69  
    70  func (it *WorkflowStore) FindByID(ctx context.Context, id uint) (*core.WorkflowEntity, error) {
    71  	result := new(core.WorkflowEntity)
    72  	if err := it.db.Where(
    73  		"id = ?", id).
    74  		First(result).Error; err != nil {
    75  		return nil, err
    76  	}
    77  
    78  	return result, nil
    79  }
    80  
    81  func (it *WorkflowStore) FindByUID(ctx context.Context, uid string) (*core.WorkflowEntity, error) {
    82  	result := new(core.WorkflowEntity)
    83  	if err := it.db.Where(
    84  		"uid = ?", uid).
    85  		First(result).Error; err != nil {
    86  		return nil, err
    87  	}
    88  
    89  	return result, nil
    90  }
    91  
    92  func (it *WorkflowStore) FindMetaByUID(ctx context.Context, UID string) (*core.WorkflowMeta, error) {
    93  	entity, err := it.FindByUID(ctx, UID)
    94  	if err != nil {
    95  		return nil, err
    96  	}
    97  	return &entity.WorkflowMeta, nil
    98  }
    99  
   100  func (it *WorkflowStore) Save(ctx context.Context, entity *core.WorkflowEntity) error {
   101  	return it.db.Model(core.WorkflowEntity{}).Save(entity).Error
   102  }
   103  
   104  func (it *WorkflowStore) DeleteByUID(ctx context.Context, uid string) error {
   105  	return it.db.Where("uid = ?", uid).Unscoped().
   106  		Delete(core.WorkflowEntity{}).Error
   107  }
   108  
   109  func (it *WorkflowStore) DeleteByUIDs(ctx context.Context, uids []string) error {
   110  	return it.db.Where("uid IN (?)", uids).Unscoped().Delete(core.WorkflowEntity{}).Error
   111  }
   112  
   113  func (it *WorkflowStore) DeleteByFinishTime(ctx context.Context, ttl time.Duration) error {
   114  	workflows, err := it.ListMeta(context.Background(), "", "", true)
   115  	if err != nil {
   116  		return err
   117  	}
   118  
   119  	nowTime := time.Now()
   120  	for _, wfl := range workflows {
   121  		if wfl.FinishTime == nil {
   122  			log.Error(nil, "workflow finish time is nil when deleting archived workflow reocrds, skip it", "workflow", wfl)
   123  			continue
   124  		}
   125  		if wfl.FinishTime.Add(ttl).Before(nowTime) {
   126  			if err := it.db.Where("uid = ?", wfl.UID).Unscoped().Delete(*it).Error; err != nil {
   127  				return err
   128  			}
   129  		}
   130  	}
   131  
   132  	return nil
   133  }
   134  
   135  func (it *WorkflowStore) MarkAsArchived(ctx context.Context, namespace, name string) error {
   136  	if err := it.db.Model(core.WorkflowEntity{}).
   137  		Where("namespace = ? AND name = ? AND archived = ?", namespace, name, false).
   138  		Updates(map[string]interface{}{"archived": true, "finish_time": time.Now().Format(time.RFC3339)}).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   139  		return err
   140  	}
   141  	return nil
   142  }
   143  
   144  func (it *WorkflowStore) MarkAsArchivedWithUID(ctx context.Context, uid string) error {
   145  	if err := it.db.Model(core.WorkflowEntity{}).
   146  		Where("uid = ? AND archived = ?", uid, false).
   147  		Updates(map[string]interface{}{"archived": true, "end_time": time.Now().Format(time.RFC3339)}).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
   148  		return err
   149  	}
   150  	return nil
   151  }
   152  func constructQueryArgs(ns, name, uid string) (string, []string) {
   153  	query := ""
   154  	args := make([]string, 0)
   155  
   156  	if ns != "" {
   157  		if len(args) > 0 {
   158  			query += " AND namespace = ?"
   159  		} else {
   160  			query += "namespace = ?"
   161  		}
   162  		args = append(args, ns)
   163  	}
   164  
   165  	if name != "" {
   166  		if len(args) > 0 {
   167  			query += " AND name = ?"
   168  		} else {
   169  			query += "name = ?"
   170  		}
   171  		args = append(args, name)
   172  	}
   173  
   174  	if uid != "" {
   175  		if len(args) > 0 {
   176  			query += " AND uid = ?"
   177  		} else {
   178  			query += "uid = ?"
   179  		}
   180  		args = append(args, uid)
   181  	}
   182  
   183  	return query, args
   184  }
   185