1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package schedule
17
18 import (
19 "context"
20 "encoding/json"
21 "net/http"
22 "sort"
23 "strings"
24 "time"
25
26 "github.com/gin-gonic/gin"
27 "github.com/jinzhu/gorm"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/apimachinery/pkg/types"
31 "sigs.k8s.io/controller-runtime/pkg/client"
32 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
33
34 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
35 "github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
36 config "github.com/chaos-mesh/chaos-mesh/pkg/config/dashboard"
37 u "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
38 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
39 "github.com/chaos-mesh/chaos-mesh/pkg/status"
40 )
41
42 var log = u.Log.WithName("schedules")
43
44
45 type Service struct {
46 schedule core.ScheduleStore
47 event core.EventStore
48 config *config.ChaosDashboardConfig
49 scheme *runtime.Scheme
50 }
51
52 func NewService(
53 schedule core.ScheduleStore,
54 event core.EventStore,
55 config *config.ChaosDashboardConfig,
56 scheme *runtime.Scheme,
57 ) *Service {
58 return &Service{
59 schedule: schedule,
60 event: event,
61 config: config,
62 scheme: scheme,
63 }
64 }
65
66
67 func Register(r *gin.RouterGroup, s *Service) {
68 endpoint := r.Group("/schedules")
69
70 endpoint.GET("", s.list)
71 endpoint.POST("", s.create)
72 endpoint.GET("/:uid", s.get)
73 endpoint.DELETE("/:uid", s.delete)
74 endpoint.DELETE("", s.batchDelete)
75 endpoint.PUT("/pause/:uid", s.pauseSchedule)
76 endpoint.PUT("/start/:uid", s.startSchedule)
77 }
78
79
80 type Schedule struct {
81 core.ObjectBase
82 Status status.ScheduleStatus `json:"status"`
83 }
84
85
86 type Detail struct {
87 Schedule
88 ExperimentUIDs []string `json:"experiment_uids"`
89 KubeObject core.KubeObjectDesc `json:"kube_object"`
90 }
91
92
93
94
95
96
97
98
99
100
101
102 func (s *Service) list(c *gin.Context) {
103 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
104 if err != nil {
105 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
106
107 return
108 }
109
110 ns, name := c.Query("namespace"), c.Query("name")
111
112 if ns == "" && !s.config.ClusterScoped && s.config.TargetNamespace != "" {
113 ns = s.config.TargetNamespace
114
115 log.V(1).Info("Replace query namespace with", ns)
116 }
117
118 ScheduleList := v1alpha1.ScheduleList{}
119 if err = kubeCli.List(context.Background(), &ScheduleList, &client.ListOptions{Namespace: ns}); err != nil {
120 u.SetAPImachineryError(c, err)
121
122 return
123 }
124
125 sches := make([]*Schedule, 0)
126 for _, schedule := range ScheduleList.Items {
127 if name != "" && schedule.Name != name {
128 continue
129 }
130
131 sches = append(sches, &Schedule{
132 ObjectBase: core.ObjectBase{
133 Namespace: schedule.Namespace,
134 Name: schedule.Name,
135 Kind: string(schedule.Spec.Type),
136 UID: string(schedule.UID),
137 Created: schedule.CreationTimestamp.Format(time.RFC3339),
138 },
139 Status: status.GetScheduleStatus(schedule),
140 })
141 }
142
143 sort.Slice(sches, func(i, j int) bool {
144 return sches[i].Created > sches[j].Created
145 })
146
147 c.JSON(http.StatusOK, sches)
148 }
149
150
151
152
153
154
155
156
157
158
159
160 func (s *Service) create(c *gin.Context) {
161 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
162 if err != nil {
163 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
164
165 return
166 }
167
168 var sch v1alpha1.Schedule
169 if err = u.ShouldBindBodyWithJSON(c, &sch); err != nil {
170 return
171 }
172
173 if err = kubeCli.Create(context.Background(), &sch); err != nil {
174 u.SetAPImachineryError(c, err)
175
176 return
177 }
178
179 c.JSON(http.StatusOK, sch)
180 }
181
182
183
184
185
186
187
188
189
190
191
192 func (s *Service) get(c *gin.Context) {
193 var (
194 sch *core.Schedule
195 schDetail *Detail
196 )
197
198 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
199 if err != nil {
200 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
201
202 return
203 }
204
205 uid := c.Param("uid")
206 if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
207 if gorm.IsRecordNotFoundError(err) {
208 u.SetAPIError(c, u.ErrBadRequest.New("Schedule "+uid+"not found"))
209 } else {
210 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
211 }
212
213 return
214 }
215
216 ns, name := sch.Namespace, sch.Name
217 schDetail = s.findScheduleInCluster(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name})
218 if schDetail == nil {
219 return
220 }
221
222 c.JSON(http.StatusOK, schDetail)
223 }
224
225 func (s *Service) findScheduleInCluster(c *gin.Context, kubeCli client.Client, namespacedName types.NamespacedName) *Detail {
226 var sch v1alpha1.Schedule
227
228 if err := kubeCli.Get(context.Background(), namespacedName, &sch); err != nil {
229 u.SetAPImachineryError(c, err)
230
231 return nil
232 }
233
234 gvk, err := apiutil.GVKForObject(&sch, s.scheme)
235 if err != nil {
236 u.SetAPImachineryError(c, err)
237
238 return nil
239 }
240
241 UIDList := make([]string, 0)
242 schType := string(sch.Spec.Type)
243 chaosKind, ok := v1alpha1.AllScheduleItemKinds()[schType]
244 if !ok {
245 u.SetAPIError(c, u.ErrInternalServer.New("Kind "+schType+" is not supported"))
246
247 return nil
248 }
249
250 selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
251 MatchLabels: map[string]string{v1alpha1.LabelManagedBy: sch.Name},
252 })
253 if err != nil {
254 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
255
256 return nil
257 }
258
259 chaosList := chaosKind.SpawnList()
260 err = kubeCli.List(context.Background(), chaosList, &client.ListOptions{
261 Namespace: sch.Namespace,
262 LabelSelector: selector,
263 })
264 if err != nil {
265 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
266
267 return nil
268 }
269
270 items := chaosList.GetItems()
271 for _, item := range items {
272 UIDList = append(UIDList, string(item.GetUID()))
273 }
274
275 return &Detail{
276 Schedule: Schedule{
277 ObjectBase: core.ObjectBase{
278 Namespace: sch.Namespace,
279 Name: sch.Name,
280 Kind: string(sch.Spec.Type),
281 UID: string(sch.UID),
282 Created: sch.CreationTimestamp.Format(time.RFC3339),
283 },
284 Status: status.GetScheduleStatus(sch),
285 },
286 ExperimentUIDs: UIDList,
287 KubeObject: core.KubeObjectDesc{
288 TypeMeta: metav1.TypeMeta{
289 APIVersion: gvk.GroupVersion().String(),
290 Kind: gvk.Kind,
291 },
292 Meta: core.KubeObjectMeta{
293 Namespace: sch.Namespace,
294 Name: sch.Name,
295 Labels: sch.Labels,
296 Annotations: sch.Annotations,
297 },
298 Spec: sch.Spec,
299 },
300 }
301 }
302
303
304
305
306
307
308
309
310
311
312
313 func (s *Service) delete(c *gin.Context) {
314 var (
315 sch *core.Schedule
316 )
317
318 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
319 if err != nil {
320 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
321
322 return
323 }
324
325 uid := c.Param("uid")
326 if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
327 if gorm.IsRecordNotFoundError(err) {
328 u.SetAPIError(c, u.ErrNotFound.New("Schedule "+uid+" not found"))
329 } else {
330 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
331 }
332
333 return
334 }
335
336 ns, name := sch.Namespace, sch.Name
337 if err = checkAndDeleteSchedule(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}); err != nil {
338 u.SetAPImachineryError(c, err)
339
340 return
341 }
342
343 c.JSON(http.StatusOK, u.ResponseSuccess)
344 }
345
346
347
348
349
350
351
352
353
354
355
356 func (s *Service) batchDelete(c *gin.Context) {
357 var (
358 sch *core.Schedule
359 )
360
361 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
362 if err != nil {
363 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
364
365 return
366 }
367
368 uids := c.Query("uids")
369 if uids == "" {
370 u.SetAPIError(c, u.ErrInternalServer.New("The uids cannot be empty"))
371
372 return
373 }
374
375 uidSlice := strings.Split(uids, ",")
376
377 if len(uidSlice) > 100 {
378 u.SetAPIError(c, u.ErrInternalServer.New("Too many uids, please delete less than 100 at a time"))
379
380 return
381 }
382
383 for _, uid := range uidSlice {
384 if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
385 if gorm.IsRecordNotFoundError(err) {
386 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
387 } else {
388 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
389 }
390
391 return
392 }
393
394 ns, name := sch.Namespace, sch.Name
395 if err = checkAndDeleteSchedule(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}); err != nil {
396 u.SetAPImachineryError(c, err)
397
398 return
399 }
400
401 }
402
403 c.JSON(http.StatusOK, u.ResponseSuccess)
404 }
405
406 func checkAndDeleteSchedule(c *gin.Context, kubeCli client.Client, namespacedName types.NamespacedName) (err error) {
407 ctx := context.Background()
408 var sch v1alpha1.Schedule
409
410 if err = kubeCli.Get(ctx, namespacedName, &sch); err != nil {
411 return
412 }
413
414 if err = kubeCli.Delete(ctx, &sch); err != nil {
415 return
416 }
417
418 return
419 }
420
421
422
423
424
425
426
427
428
429
430
431 func (s *Service) pauseSchedule(c *gin.Context) {
432 var sch *core.Schedule
433
434 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
435 if err != nil {
436 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
437
438 return
439 }
440
441 uid := c.Param("uid")
442 if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
443 if gorm.IsRecordNotFoundError(err) {
444 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
445 } else {
446 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
447 }
448
449 return
450 }
451
452 annotations := map[string]string{
453 v1alpha1.PauseAnnotationKey: "true",
454 }
455 if err = patchSchedule(kubeCli, sch, annotations); err != nil {
456 u.SetAPImachineryError(c, err)
457
458 return
459 }
460 c.JSON(http.StatusOK, u.ResponseSuccess)
461 }
462
463
464
465
466
467
468
469
470
471
472
473 func (s *Service) startSchedule(c *gin.Context) {
474 var sch *core.Schedule
475
476 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
477 if err != nil {
478 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
479
480 return
481 }
482
483 uid := c.Param("uid")
484 if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
485 if gorm.IsRecordNotFoundError(err) {
486 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
487 } else {
488 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
489 }
490
491 return
492 }
493
494 annotations := map[string]string{
495 v1alpha1.PauseAnnotationKey: "false",
496 }
497 if err = patchSchedule(kubeCli, sch, annotations); err != nil {
498 u.SetAPImachineryError(c, err)
499
500 return
501 }
502 c.JSON(http.StatusOK, u.ResponseSuccess)
503 }
504
505 func patchSchedule(kubeCli client.Client, sch *core.Schedule, annotations map[string]string) error {
506 var tmp v1alpha1.Schedule
507
508 if err := kubeCli.Get(context.Background(), types.NamespacedName{Namespace: sch.Namespace, Name: sch.Name}, &tmp); err != nil {
509 return err
510 }
511
512 var mergePatch []byte
513 mergePatch, _ = json.Marshal(map[string]interface{}{
514 "metadata": map[string]interface{}{
515 "annotations": annotations,
516 },
517 })
518
519 return kubeCli.Patch(context.Background(), &tmp, client.RawPatch(types.MergePatchType, mergePatch))
520 }
521