1
2
3
4
5
6
7
8
9
10
11
12
13
14 package workflow
15
16 import (
17 "encoding/json"
18 "fmt"
19 "net/http"
20 "sort"
21
22 "github.com/gin-gonic/gin"
23 apierrors "k8s.io/apimachinery/pkg/api/errors"
24 ctrl "sigs.k8s.io/controller-runtime"
25
26 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
27 "github.com/chaos-mesh/chaos-mesh/pkg/apiserver/utils"
28 "github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
29 config "github.com/chaos-mesh/chaos-mesh/pkg/config/dashboard"
30 "github.com/chaos-mesh/chaos-mesh/pkg/core"
31 )
32
33 var log = ctrl.Log.WithName("workflow api")
34
35
36 type StatusResponse struct {
37 Status string `json:"status"`
38 }
39
40 func Register(r *gin.RouterGroup, s *Service) {
41 endpoint := r.Group("/workflows")
42 endpoint.GET("", s.listWorkflows)
43 endpoint.POST("", s.createWorkflow)
44 endpoint.GET("/:uid", s.getWorkflowDetailByUID)
45 endpoint.PUT("/:uid", s.updateWorkflow)
46 endpoint.DELETE("/:uid", s.deleteWorkflow)
47 }
48
49
50 type Service struct {
51 conf *config.ChaosDashboardConfig
52 store core.WorkflowStore
53 }
54
55 func NewService(conf *config.ChaosDashboardConfig, store core.WorkflowStore) *Service {
56 return &Service{conf: conf, store: store}
57 }
58
59
60
61
62
63
64
65
66
67
68 func (it *Service) listWorkflows(c *gin.Context) {
69 namespace := c.Query("namespace")
70 if len(namespace) == 0 && !it.conf.ClusterScoped &&
71 len(it.conf.TargetNamespace) != 0 {
72 namespace = it.conf.TargetNamespace
73 }
74
75 result := make([]core.WorkflowMeta, 0)
76
77 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
78 if err != nil {
79 _ = c.Error(utils.ErrInvalidRequest.WrapWithNoMessage(err))
80 return
81 }
82 repo := core.NewKubeWorkflowRepository(kubeClient)
83
84 if namespace != "" {
85 workflowFromNs, err := repo.ListByNamespace(c.Request.Context(), namespace)
86 if err != nil {
87 utils.SetErrorForGinCtx(c, err)
88 return
89 }
90 result = append(result, workflowFromNs...)
91 } else {
92 allWorkflow, err := repo.List(c.Request.Context())
93 if err != nil {
94 utils.SetErrorForGinCtx(c, err)
95 return
96 }
97 result = append(result, allWorkflow...)
98 }
99
100
101 for index, item := range result {
102 entity, err := it.store.FindByUID(c.Request.Context(), string(item.UID))
103 if err != nil {
104 log.Info("warning: workflow does not have a record in database",
105 "namespaced name", fmt.Sprintf("%s/%s", item.Namespace, item.Name),
106 "uid", item.UID,
107 )
108 }
109
110 if entity != nil {
111 result[index].ID = entity.ID
112 }
113 }
114
115 sort.Slice(result, func(i, j int) bool {
116 return result[i].CreatedAt.After(result[i].CreatedAt)
117 })
118
119 c.JSON(http.StatusOK, result)
120 }
121
122
123
124
125
126
127
128
129
130
131 func (it *Service) getWorkflowDetailByUID(c *gin.Context) {
132 uid := c.Param("uid")
133
134 entity, err := it.store.FindByUID(c.Request.Context(), uid)
135 if err != nil {
136 utils.SetErrorForGinCtx(c, err)
137 return
138 }
139
140 namespace := entity.Namespace
141 name := entity.Name
142
143 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
144 if err != nil {
145 if apierrors.IsNotFound(err) {
146
147 workflowDetail, err := core.WorkflowEntity2WorkflowDetail(entity)
148 if err != nil {
149 utils.SetErrorForGinCtx(c, err)
150 return
151 }
152 c.JSON(http.StatusOK, workflowDetail)
153 return
154 }
155 _ = c.Error(utils.ErrInvalidRequest.WrapWithNoMessage(err))
156 return
157 }
158
159
160 repo := core.NewKubeWorkflowRepository(kubeClient)
161
162 workflowCRInKubernetes, err := repo.Get(c.Request.Context(), namespace, name)
163 if err != nil {
164 utils.SetErrorForGinCtx(c, err)
165 return
166 }
167 result, err := core.WorkflowEntity2WorkflowDetail(entity)
168 if err != nil {
169 utils.SetErrorForGinCtx(c, err)
170 return
171 }
172 result.Topology = workflowCRInKubernetes.Topology
173 result.KubeObject = workflowCRInKubernetes.KubeObject
174
175 c.JSON(http.StatusOK, result)
176 }
177
178
179
180
181
182
183
184
185
186
187 func (it *Service) createWorkflow(c *gin.Context) {
188 payload := v1alpha1.Workflow{}
189
190 err := json.NewDecoder(c.Request.Body).Decode(&payload)
191 if err != nil {
192 _ = c.Error(utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
193 return
194 }
195
196 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
197 if err != nil {
198 _ = c.Error(utils.ErrInvalidRequest.WrapWithNoMessage(err))
199 return
200 }
201
202 repo := core.NewKubeWorkflowRepository(kubeClient)
203
204 result, err := repo.Create(c.Request.Context(), payload)
205 if err != nil {
206 _ = c.Error(utils.ErrInternalServer.WrapWithNoMessage(err))
207 return
208 }
209 c.JSON(http.StatusOK, result)
210 }
211
212
213
214
215
216
217
218
219
220
221
222 func (it *Service) deleteWorkflow(c *gin.Context) {
223 uid := c.Param("uid")
224
225 entity, err := it.store.FindByUID(c.Request.Context(), uid)
226 if err != nil {
227 utils.SetErrorForGinCtx(c, err)
228 return
229 }
230
231 namespace := entity.Namespace
232 name := entity.Name
233
234 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
235 if err != nil {
236 _ = c.Error(utils.ErrInvalidRequest.WrapWithNoMessage(err))
237 return
238 }
239
240 repo := core.NewKubeWorkflowRepository(kubeClient)
241
242 err = repo.Delete(c.Request.Context(), namespace, name)
243 if err != nil {
244 utils.SetErrorForGinCtx(c, err)
245 return
246 }
247 c.JSON(http.StatusOK, StatusResponse{Status: "success"})
248 }
249
250
251
252
253
254
255
256
257
258
259
260 func (it *Service) updateWorkflow(c *gin.Context) {
261 payload := v1alpha1.Workflow{}
262
263 err := json.NewDecoder(c.Request.Body).Decode(&payload)
264 if err != nil {
265 _ = c.Error(utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
266 return
267 }
268 uid := c.Param("uid")
269 entity, err := it.store.FindByUID(c.Request.Context(), uid)
270 if err != nil {
271 utils.SetErrorForGinCtx(c, err)
272 return
273 }
274
275 namespace := entity.Namespace
276 name := entity.Name
277
278 if namespace != payload.Namespace {
279 _ = c.Error(utils.ErrInvalidRequest.Wrap(err,
280 "namespace is not consistent, pathParameter: %s, metaInRaw: %s",
281 namespace,
282 payload.Namespace),
283 )
284 return
285 }
286 if name != payload.Name {
287 _ = c.Error(utils.ErrInvalidRequest.Wrap(err,
288 "name is not consistent, pathParameter: %s, metaInRaw: %s",
289 name,
290 payload.Name),
291 )
292 return
293 }
294
295 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
296 if err != nil {
297 _ = c.Error(utils.ErrInvalidRequest.WrapWithNoMessage(err))
298 return
299 }
300
301 repo := core.NewKubeWorkflowRepository(kubeClient)
302
303 result, err := repo.Update(c.Request.Context(), namespace, name, payload)
304 if err != nil {
305 _ = c.Error(utils.ErrInternalServer.WrapWithNoMessage(err))
306 return
307 }
308
309 c.JSON(http.StatusOK, result)
310 }
311