1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package workflow
17
18 import (
19 "encoding/json"
20 "fmt"
21 "net/http"
22 "sort"
23
24 "github.com/gin-gonic/gin"
25 "github.com/go-logr/logr"
26 apierrors "k8s.io/apimachinery/pkg/api/errors"
27
28 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
29 "github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
30 config "github.com/chaos-mesh/chaos-mesh/pkg/config"
31 "github.com/chaos-mesh/chaos-mesh/pkg/curl"
32 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
33 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
34 )
35
36 func Register(r *gin.RouterGroup, s *Service) {
37 endpoint := r.Group("/workflows")
38 endpoint.GET("", s.listWorkflows)
39 endpoint.POST("", s.createWorkflow)
40 endpoint.GET("/:uid", s.getWorkflowDetailByUID)
41 endpoint.PUT("/:uid", s.updateWorkflow)
42 endpoint.DELETE("/:uid", s.deleteWorkflow)
43 endpoint.POST("/render-task/http", s.renderHTTPTask)
44 endpoint.POST("/parse-task/http", s.parseHTTPTask)
45 endpoint.POST("/validate-task/http", s.isValidRenderedHTTPTask)
46 }
47
48
49 type Service struct {
50 conf *config.ChaosDashboardConfig
51 store core.WorkflowStore
52 logger logr.Logger
53 }
54
55 func NewService(conf *config.ChaosDashboardConfig, store core.WorkflowStore, logger logr.Logger) *Service {
56 return &Service{conf: conf, store: store, logger: logger}
57 }
58
59
60
61
62
63
64
65
66
67
68 func (it *Service) renderHTTPTask(c *gin.Context) {
69 requestBody := curl.RequestForm{}
70 if err := c.ShouldBindJSON(&requestBody); err != nil {
71 utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
72 return
73 }
74 result, err := curl.RenderWorkflowTaskTemplate(requestBody)
75 if err != nil {
76 utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
77 return
78 }
79 c.JSON(http.StatusOK, result)
80 }
81
82
83
84
85
86
87
88
89
90
91 func (it *Service) isValidRenderedHTTPTask(c *gin.Context) {
92 requestBody := v1alpha1.Template{}
93 if err := c.ShouldBindJSON(&requestBody); err != nil {
94 utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
95 return
96 }
97 result := curl.IsValidRenderedTask(&requestBody)
98 c.JSON(http.StatusOK, result)
99 }
100
101
102
103
104
105
106
107
108
109
110 func (it *Service) parseHTTPTask(c *gin.Context) {
111 requestBody := v1alpha1.Template{}
112 if err := c.ShouldBindJSON(&requestBody); err != nil {
113 utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
114 return
115 }
116 result, err := curl.ParseWorkflowTaskTemplate(&requestBody)
117 if err != nil {
118 utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
119 return
120 }
121 c.JSON(http.StatusOK, result)
122 }
123
124
125
126
127
128
129
130
131
132
133 func (it *Service) listWorkflows(c *gin.Context) {
134 namespace := c.Query("namespace")
135 if len(namespace) == 0 && !it.conf.ClusterScoped &&
136 len(it.conf.TargetNamespace) != 0 {
137 namespace = it.conf.TargetNamespace
138 }
139
140 result := make([]core.WorkflowMeta, 0)
141
142 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
143 if err != nil {
144 _ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
145 return
146 }
147 repo := core.NewKubeWorkflowRepository(kubeClient)
148
149 if namespace != "" {
150 workflowFromNs, err := repo.ListByNamespace(c.Request.Context(), namespace)
151 if err != nil {
152 utils.SetAPImachineryError(c, err)
153 return
154 }
155 result = append(result, workflowFromNs...)
156 } else {
157 allWorkflow, err := repo.List(c.Request.Context())
158 if err != nil {
159 utils.SetAPImachineryError(c, err)
160 return
161 }
162 result = append(result, allWorkflow...)
163 }
164
165
166 for index, item := range result {
167 entity, err := it.store.FindByUID(c.Request.Context(), string(item.UID))
168 if err != nil {
169 it.logger.Info("warning: workflow does not have a record in database",
170 "namespaced name", fmt.Sprintf("%s/%s", item.Namespace, item.Name),
171 "uid", item.UID,
172 )
173 }
174
175 if entity != nil {
176 result[index].ID = entity.ID
177 }
178 }
179
180 sort.Slice(result, func(i, j int) bool {
181 return result[i].CreatedAt.After(result[i].CreatedAt)
182 })
183
184 c.JSON(http.StatusOK, result)
185 }
186
187
188
189
190
191
192
193
194
195
196 func (it *Service) getWorkflowDetailByUID(c *gin.Context) {
197 uid := c.Param("uid")
198
199 entity, err := it.store.FindByUID(c.Request.Context(), uid)
200 if err != nil {
201 utils.SetAPImachineryError(c, err)
202 return
203 }
204
205 namespace := entity.Namespace
206 name := entity.Name
207
208 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
209 if err != nil {
210 if apierrors.IsNotFound(err) {
211
212 workflowDetail, err := core.WorkflowEntity2WorkflowDetail(entity)
213 if err != nil {
214 utils.SetAPImachineryError(c, err)
215 return
216 }
217 c.JSON(http.StatusOK, workflowDetail)
218 return
219 }
220 _ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
221 return
222 }
223
224
225 repo := core.NewKubeWorkflowRepository(kubeClient)
226
227 workflowCRInKubernetes, err := repo.Get(c.Request.Context(), namespace, name)
228 if err != nil {
229 utils.SetAPImachineryError(c, err)
230 return
231 }
232 result, err := core.WorkflowEntity2WorkflowDetail(entity)
233 if err != nil {
234 utils.SetAPImachineryError(c, err)
235 return
236 }
237 result.Topology = workflowCRInKubernetes.Topology
238 result.KubeObject = workflowCRInKubernetes.KubeObject
239
240 c.JSON(http.StatusOK, result)
241 }
242
243
244
245
246
247
248
249
250
251
252 func (it *Service) createWorkflow(c *gin.Context) {
253 payload := v1alpha1.Workflow{}
254
255 err := json.NewDecoder(c.Request.Body).Decode(&payload)
256 if err != nil {
257 _ = c.Error(utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
258 return
259 }
260
261 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
262 if err != nil {
263 utils.SetAPImachineryError(c, err)
264 return
265 }
266
267 repo := core.NewKubeWorkflowRepository(kubeClient)
268
269 result, err := repo.Create(c.Request.Context(), payload)
270 if err != nil {
271 utils.SetAPImachineryError(c, err)
272 return
273 }
274 c.JSON(http.StatusOK, result)
275 }
276
277
278
279
280
281
282
283
284
285
286
287 func (it *Service) deleteWorkflow(c *gin.Context) {
288 uid := c.Param("uid")
289
290 entity, err := it.store.FindByUID(c.Request.Context(), uid)
291 if err != nil {
292 utils.SetAPImachineryError(c, err)
293 return
294 }
295
296 namespace := entity.Namespace
297 name := entity.Name
298
299 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
300 if err != nil {
301 _ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
302 return
303 }
304
305 repo := core.NewKubeWorkflowRepository(kubeClient)
306
307 err = repo.Delete(c.Request.Context(), namespace, name)
308 if err != nil {
309 utils.SetAPImachineryError(c, err)
310 return
311 }
312 c.JSON(http.StatusOK, utils.ResponseSuccess)
313 }
314
315
316
317
318
319
320
321
322
323
324
325 func (it *Service) updateWorkflow(c *gin.Context) {
326 payload := v1alpha1.Workflow{}
327
328 err := json.NewDecoder(c.Request.Body).Decode(&payload)
329 if err != nil {
330 utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
331 return
332 }
333 uid := c.Param("uid")
334 entity, err := it.store.FindByUID(c.Request.Context(), uid)
335 if err != nil {
336 utils.SetAPImachineryError(c, err)
337 return
338 }
339
340 namespace := entity.Namespace
341 name := entity.Name
342
343 if namespace != payload.Namespace {
344 utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err,
345 "namespace is not consistent, pathParameter: %s, metaInRaw: %s",
346 namespace,
347 payload.Namespace))
348 return
349 }
350 if name != payload.Name {
351 utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err,
352 "name is not consistent, pathParameter: %s, metaInRaw: %s",
353 name,
354 payload.Name))
355 return
356 }
357
358 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
359 if err != nil {
360 utils.SetAPImachineryError(c, err)
361 return
362 }
363
364 repo := core.NewKubeWorkflowRepository(kubeClient)
365
366 result, err := repo.Update(c.Request.Context(), namespace, name, payload)
367 if err != nil {
368 utils.SetAPImachineryError(c, err)
369 return
370 }
371
372 c.JSON(http.StatusOK, result)
373 }
374