1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package workflow
17
18 import (
19 "context"
20 "time"
21
22 "github.com/jinzhu/gorm"
23 ctrl "sigs.k8s.io/controller-runtime"
24
25 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
26 )
27
28 var log = ctrl.Log.WithName("store/workflow")
29
30 type WorkflowStore struct {
31 db *gorm.DB
32 }
33
34 func NewStore(db *gorm.DB) core.WorkflowStore {
35 db.AutoMigrate(&core.WorkflowEntity{})
36
37 return &WorkflowStore{db}
38 }
39
40 func (it *WorkflowStore) List(ctx context.Context, namespace, name string, archived bool) ([]*core.WorkflowEntity, error) {
41 var entities []core.WorkflowEntity
42 query, args := constructQueryArgs(namespace, name, "")
43
44 err := it.db.Where(query, args).Where("archived = ?", archived).Find(&entities).Error
45 if err != nil && !gorm.IsRecordNotFoundError(err) {
46 return nil, err
47 }
48
49 var result []*core.WorkflowEntity
50 for _, item := range entities {
51 item := item
52 result = append(result, &item)
53 }
54 return result, nil
55 }
56
57 func (it *WorkflowStore) ListMeta(ctx context.Context, namespace, name string, archived bool) ([]*core.WorkflowMeta, error) {
58 entities, err := it.List(ctx, namespace, name, archived)
59 if err != nil {
60 return nil, err
61 }
62 var result []*core.WorkflowMeta
63 for _, item := range entities {
64 item := item
65 result = append(result, &item.WorkflowMeta)
66 }
67 return result, nil
68 }
69
70 func (it *WorkflowStore) FindByID(ctx context.Context, id uint) (*core.WorkflowEntity, error) {
71 result := new(core.WorkflowEntity)
72 if err := it.db.Where(
73 "id = ?", id).
74 First(result).Error; err != nil {
75 return nil, err
76 }
77
78 return result, nil
79 }
80
81 func (it *WorkflowStore) FindByUID(ctx context.Context, uid string) (*core.WorkflowEntity, error) {
82 result := new(core.WorkflowEntity)
83 if err := it.db.Where(
84 "uid = ?", uid).
85 First(result).Error; err != nil {
86 return nil, err
87 }
88
89 return result, nil
90 }
91
92 func (it *WorkflowStore) FindMetaByUID(ctx context.Context, UID string) (*core.WorkflowMeta, error) {
93 entity, err := it.FindByUID(ctx, UID)
94 if err != nil {
95 return nil, err
96 }
97 return &entity.WorkflowMeta, nil
98 }
99
100 func (it *WorkflowStore) Save(ctx context.Context, entity *core.WorkflowEntity) error {
101 return it.db.Model(core.WorkflowEntity{}).Save(entity).Error
102 }
103
104 func (it *WorkflowStore) DeleteByUID(ctx context.Context, uid string) error {
105 return it.db.Where("uid = ?", uid).Unscoped().
106 Delete(core.WorkflowEntity{}).Error
107 }
108
109 func (it *WorkflowStore) DeleteByUIDs(ctx context.Context, uids []string) error {
110 return it.db.Where("uid IN (?)", uids).Unscoped().Delete(core.WorkflowEntity{}).Error
111 }
112
113 func (it *WorkflowStore) DeleteByFinishTime(ctx context.Context, ttl time.Duration) error {
114 workflows, err := it.ListMeta(context.Background(), "", "", true)
115 if err != nil {
116 return err
117 }
118
119 nowTime := time.Now()
120 for _, wfl := range workflows {
121 if wfl.FinishTime == nil {
122 log.Error(nil, "workflow finish time is nil when deleting archived workflow records, skip it", "workflow", wfl)
123 continue
124 }
125 if wfl.FinishTime.Add(ttl).Before(nowTime) {
126 if err := it.db.Where("uid = ?", wfl.UID).Unscoped().Delete(*it).Error; err != nil {
127 return err
128 }
129 }
130 }
131
132 return nil
133 }
134
135 func (it *WorkflowStore) MarkAsArchived(ctx context.Context, namespace, name string) error {
136 if err := it.db.Model(core.WorkflowEntity{}).
137 Where("namespace = ? AND name = ? AND archived = ?", namespace, name, false).
138 Updates(map[string]interface{}{"archived": true, "finish_time": time.Now().Format(time.RFC3339)}).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
139 return err
140 }
141 return nil
142 }
143
144 func (it *WorkflowStore) MarkAsArchivedWithUID(ctx context.Context, uid string) error {
145 if err := it.db.Model(core.WorkflowEntity{}).
146 Where("uid = ? AND archived = ?", uid, false).
147 Updates(map[string]interface{}{"archived": true, "end_time": time.Now().Format(time.RFC3339)}).Error; err != nil && !gorm.IsRecordNotFoundError(err) {
148 return err
149 }
150 return nil
151 }
152 func constructQueryArgs(ns, name, uid string) (string, []string) {
153 query := ""
154 args := make([]string, 0)
155
156 if ns != "" {
157 if len(args) > 0 {
158 query += " AND namespace = ?"
159 } else {
160 query += "namespace = ?"
161 }
162 args = append(args, ns)
163 }
164
165 if name != "" {
166 if len(args) > 0 {
167 query += " AND name = ?"
168 } else {
169 query += "name = ?"
170 }
171 args = append(args, name)
172 }
173
174 if uid != "" {
175 if len(args) > 0 {
176 query += " AND uid = ?"
177 } else {
178 query += "uid = ?"
179 }
180 args = append(args, uid)
181 }
182
183 return query, args
184 }
185