1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package workflow
17
18 import (
19 "encoding/json"
20 "fmt"
21 "net/http"
22 "sort"
23
24 "github.com/gin-gonic/gin"
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 ctrl "sigs.k8s.io/controller-runtime"
27
28 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
29 "github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
30 config "github.com/chaos-mesh/chaos-mesh/pkg/config/dashboard"
31 "github.com/chaos-mesh/chaos-mesh/pkg/curl"
32 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
33 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
34 )
35
36 var log = ctrl.Log.WithName("workflow api")
37
38
39 type StatusResponse struct {
40 Status string `json:"status"`
41 }
42
43 func Register(r *gin.RouterGroup, s *Service) {
44 endpoint := r.Group("/workflows")
45 endpoint.GET("", s.listWorkflows)
46 endpoint.POST("", s.createWorkflow)
47 endpoint.GET("/:uid", s.getWorkflowDetailByUID)
48 endpoint.PUT("/:uid", s.updateWorkflow)
49 endpoint.DELETE("/:uid", s.deleteWorkflow)
50 endpoint.POST("/render-task/http", s.renderHTTPTask)
51 endpoint.POST("/parse-task/http", s.parseHTTPTask)
52 endpoint.POST("/validate-task/http", s.isValidRenderedHTTPTask)
53 }
54
55
56 type Service struct {
57 conf *config.ChaosDashboardConfig
58 store core.WorkflowStore
59 }
60
61 func NewService(conf *config.ChaosDashboardConfig, store core.WorkflowStore) *Service {
62 return &Service{conf: conf, store: store}
63 }
64
65
66
67
68
69
70
71
72
73
74 func (it *Service) renderHTTPTask(c *gin.Context) {
75 requestBody := curl.RequestForm{}
76 if err := c.ShouldBindJSON(&requestBody); err != nil {
77 utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
78 return
79 }
80 result, err := curl.RenderWorkflowTaskTemplate(requestBody)
81 if err != nil {
82 utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
83 return
84 }
85 c.JSON(http.StatusOK, result)
86 }
87
88
89
90
91
92
93
94
95
96
97 func (it *Service) isValidRenderedHTTPTask(c *gin.Context) {
98 requestBody := v1alpha1.Template{}
99 if err := c.ShouldBindJSON(&requestBody); err != nil {
100 utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
101 return
102 }
103 result := curl.IsValidRenderedTask(&requestBody)
104 c.JSON(http.StatusOK, result)
105 }
106
107
108
109
110
111
112
113
114
115
116 func (it *Service) parseHTTPTask(c *gin.Context) {
117 requestBody := v1alpha1.Template{}
118 if err := c.ShouldBindJSON(&requestBody); err != nil {
119 utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err, "failed to parse request body"))
120 return
121 }
122 result, err := curl.ParseWorkflowTaskTemplate(&requestBody)
123 if err != nil {
124 utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
125 return
126 }
127 c.JSON(http.StatusOK, result)
128 }
129
130
131
132
133
134
135
136
137
138
139 func (it *Service) listWorkflows(c *gin.Context) {
140 namespace := c.Query("namespace")
141 if len(namespace) == 0 && !it.conf.ClusterScoped &&
142 len(it.conf.TargetNamespace) != 0 {
143 namespace = it.conf.TargetNamespace
144 }
145
146 result := make([]core.WorkflowMeta, 0)
147
148 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
149 if err != nil {
150 _ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
151 return
152 }
153 repo := core.NewKubeWorkflowRepository(kubeClient)
154
155 if namespace != "" {
156 workflowFromNs, err := repo.ListByNamespace(c.Request.Context(), namespace)
157 if err != nil {
158 utils.SetAPImachineryError(c, err)
159 return
160 }
161 result = append(result, workflowFromNs...)
162 } else {
163 allWorkflow, err := repo.List(c.Request.Context())
164 if err != nil {
165 utils.SetAPImachineryError(c, err)
166 return
167 }
168 result = append(result, allWorkflow...)
169 }
170
171
172 for index, item := range result {
173 entity, err := it.store.FindByUID(c.Request.Context(), string(item.UID))
174 if err != nil {
175 log.Info("warning: workflow does not have a record in database",
176 "namespaced name", fmt.Sprintf("%s/%s", item.Namespace, item.Name),
177 "uid", item.UID,
178 )
179 }
180
181 if entity != nil {
182 result[index].ID = entity.ID
183 }
184 }
185
186 sort.Slice(result, func(i, j int) bool {
187 return result[i].CreatedAt.After(result[i].CreatedAt)
188 })
189
190 c.JSON(http.StatusOK, result)
191 }
192
193
194
195
196
197
198
199
200
201
202 func (it *Service) getWorkflowDetailByUID(c *gin.Context) {
203 uid := c.Param("uid")
204
205 entity, err := it.store.FindByUID(c.Request.Context(), uid)
206 if err != nil {
207 utils.SetAPImachineryError(c, err)
208 return
209 }
210
211 namespace := entity.Namespace
212 name := entity.Name
213
214 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
215 if err != nil {
216 if apierrors.IsNotFound(err) {
217
218 workflowDetail, err := core.WorkflowEntity2WorkflowDetail(entity)
219 if err != nil {
220 utils.SetAPImachineryError(c, err)
221 return
222 }
223 c.JSON(http.StatusOK, workflowDetail)
224 return
225 }
226 _ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
227 return
228 }
229
230
231 repo := core.NewKubeWorkflowRepository(kubeClient)
232
233 workflowCRInKubernetes, err := repo.Get(c.Request.Context(), namespace, name)
234 if err != nil {
235 utils.SetAPImachineryError(c, err)
236 return
237 }
238 result, err := core.WorkflowEntity2WorkflowDetail(entity)
239 if err != nil {
240 utils.SetAPImachineryError(c, err)
241 return
242 }
243 result.Topology = workflowCRInKubernetes.Topology
244 result.KubeObject = workflowCRInKubernetes.KubeObject
245
246 c.JSON(http.StatusOK, result)
247 }
248
249
250
251
252
253
254
255
256
257
258 func (it *Service) createWorkflow(c *gin.Context) {
259 payload := v1alpha1.Workflow{}
260
261 err := json.NewDecoder(c.Request.Body).Decode(&payload)
262 if err != nil {
263 _ = c.Error(utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
264 return
265 }
266
267 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
268 if err != nil {
269 utils.SetAPImachineryError(c, err)
270 return
271 }
272
273 repo := core.NewKubeWorkflowRepository(kubeClient)
274
275 result, err := repo.Create(c.Request.Context(), payload)
276 if err != nil {
277 utils.SetAPImachineryError(c, err)
278 return
279 }
280 c.JSON(http.StatusOK, result)
281 }
282
283
284
285
286
287
288
289
290
291
292
293 func (it *Service) deleteWorkflow(c *gin.Context) {
294 uid := c.Param("uid")
295
296 entity, err := it.store.FindByUID(c.Request.Context(), uid)
297 if err != nil {
298 utils.SetAPImachineryError(c, err)
299 return
300 }
301
302 namespace := entity.Namespace
303 name := entity.Name
304
305 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
306 if err != nil {
307 _ = c.Error(utils.ErrBadRequest.WrapWithNoMessage(err))
308 return
309 }
310
311 repo := core.NewKubeWorkflowRepository(kubeClient)
312
313 err = repo.Delete(c.Request.Context(), namespace, name)
314 if err != nil {
315 utils.SetAPImachineryError(c, err)
316 return
317 }
318 c.JSON(http.StatusOK, StatusResponse{Status: "success"})
319 }
320
321
322
323
324
325
326
327
328
329
330
331 func (it *Service) updateWorkflow(c *gin.Context) {
332 payload := v1alpha1.Workflow{}
333
334 err := json.NewDecoder(c.Request.Body).Decode(&payload)
335 if err != nil {
336 utils.SetAPIError(c, utils.ErrInternalServer.Wrap(err, "failed to parse request body"))
337 return
338 }
339 uid := c.Param("uid")
340 entity, err := it.store.FindByUID(c.Request.Context(), uid)
341 if err != nil {
342 utils.SetAPImachineryError(c, err)
343 return
344 }
345
346 namespace := entity.Namespace
347 name := entity.Name
348
349 if namespace != payload.Namespace {
350 utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err,
351 "namespace is not consistent, pathParameter: %s, metaInRaw: %s",
352 namespace,
353 payload.Namespace))
354 return
355 }
356 if name != payload.Name {
357 utils.SetAPIError(c, utils.ErrBadRequest.Wrap(err,
358 "name is not consistent, pathParameter: %s, metaInRaw: %s",
359 name,
360 payload.Name))
361 return
362 }
363
364 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
365 if err != nil {
366 utils.SetAPImachineryError(c, err)
367 return
368 }
369
370 repo := core.NewKubeWorkflowRepository(kubeClient)
371
372 result, err := repo.Update(c.Request.Context(), namespace, name, payload)
373 if err != nil {
374 utils.SetAPImachineryError(c, err)
375 return
376 }
377
378 c.JSON(http.StatusOK, result)
379 }
380