1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package experiment
17
18 import (
19 "context"
20 "encoding/json"
21 "net/http"
22 "reflect"
23 "sort"
24 "strings"
25 "sync"
26 "time"
27
28 "github.com/gin-gonic/gin"
29 "github.com/jinzhu/gorm"
30 "golang.org/x/sync/errgroup"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/runtime"
33 "k8s.io/apimachinery/pkg/types"
34 "k8s.io/client-go/util/retry"
35 "sigs.k8s.io/controller-runtime/pkg/client"
36 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
37
38 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
39 "github.com/chaos-mesh/chaos-mesh/controllers/common/finalizers"
40 "github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
41 config "github.com/chaos-mesh/chaos-mesh/pkg/config/dashboard"
42 u "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
43 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
44 "github.com/chaos-mesh/chaos-mesh/pkg/status"
45 )
46
47 var log = u.Log.WithName("experiments")
48
49
50 type Service struct {
51 archive core.ExperimentStore
52 event core.EventStore
53 config *config.ChaosDashboardConfig
54 scheme *runtime.Scheme
55 }
56
57 func NewService(
58 archive core.ExperimentStore,
59 event core.EventStore,
60 config *config.ChaosDashboardConfig,
61 scheme *runtime.Scheme,
62 ) *Service {
63 return &Service{
64 archive: archive,
65 event: event,
66 config: config,
67 scheme: scheme,
68 }
69 }
70
71
72 func Register(r *gin.RouterGroup, s *Service) {
73 endpoint := r.Group("/experiments")
74
75 endpoint.GET("", s.list)
76 endpoint.POST("", s.create)
77 endpoint.GET("/:uid", s.get)
78 endpoint.DELETE("/:uid", s.delete)
79 endpoint.DELETE("", s.batchDelete)
80 endpoint.PUT("/pause/:uid", s.pause)
81 endpoint.PUT("/start/:uid", s.start)
82 endpoint.GET("/state", s.state)
83 }
84
85
86 type Experiment struct {
87 core.ObjectBase
88 Status status.ChaosStatus `json:"status"`
89 FailedMessage string `json:"failed_message,omitempty"`
90 }
91
92
93 type Detail struct {
94 Experiment
95 KubeObject core.KubeObjectDesc `json:"kube_object"`
96 }
97
98
99
100
101
102
103
104
105
106
107
108
109
110 func (s *Service) list(c *gin.Context) {
111 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
112 if err != nil {
113 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
114
115 return
116 }
117
118 ns, name, kind := c.Query("namespace"), c.Query("name"), c.Query("kind")
119
120 if ns == "" && !s.config.ClusterScoped && s.config.TargetNamespace != "" {
121 ns = s.config.TargetNamespace
122
123 log.V(1).Info("Replace query namespace with", ns)
124 }
125
126 exps := make([]*Experiment, 0)
127 for k, chaosKind := range v1alpha1.AllKinds() {
128 if kind != "" && k != kind {
129 continue
130 }
131
132 list := chaosKind.SpawnList()
133 if err := kubeCli.List(context.Background(), list, &client.ListOptions{Namespace: ns}); err != nil {
134 u.SetAPImachineryError(c, err)
135
136 return
137 }
138
139 for _, item := range list.GetItems() {
140 chaosName := item.GetName()
141
142 if name != "" && chaosName != name {
143 continue
144 }
145
146 exps = append(exps, &Experiment{
147 ObjectBase: core.ObjectBase{
148 Namespace: item.GetNamespace(),
149 Name: chaosName,
150 Kind: item.GetObjectKind().GroupVersionKind().Kind,
151 UID: string(item.GetUID()),
152 Created: item.GetCreationTimestamp().Format(time.RFC3339),
153 },
154 Status: status.GetChaosStatus(item.(v1alpha1.InnerObject)),
155 })
156 }
157 }
158
159 sort.Slice(exps, func(i, j int) bool {
160 return exps[i].Created > exps[j].Created
161 })
162
163 c.JSON(http.StatusOK, exps)
164 }
165
166
167
168
169
170
171
172
173
174
175
176 func (s *Service) create(c *gin.Context) {
177 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
178 if err != nil {
179 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
180
181 return
182 }
183
184 var exp map[string]interface{}
185 if err = u.ShouldBindBodyWithJSON(c, &exp); err != nil {
186 return
187 }
188 kind := exp["kind"].(string)
189
190 if chaosKind, ok := v1alpha1.AllKinds()[kind]; ok {
191 chaos := chaosKind.SpawnObject()
192 reflect.ValueOf(chaos).Elem().FieldByName("ObjectMeta").Set(reflect.ValueOf(metav1.ObjectMeta{}))
193
194 if err = u.ShouldBindBodyWithJSON(c, chaos); err != nil {
195 return
196 }
197
198 if err = kubeCli.Create(context.Background(), chaos); err != nil {
199 u.SetAPImachineryError(c, err)
200
201 return
202 }
203 } else {
204 u.SetAPIError(c, u.ErrBadRequest.New("Kind "+kind+" is not supported"))
205
206 return
207 }
208
209 c.JSON(http.StatusOK, exp)
210 }
211
212
213
214
215
216
217
218
219
220
221
222 func (s *Service) get(c *gin.Context) {
223 var (
224 exp *core.Experiment
225 expDetail *Detail
226 )
227
228 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
229 if err != nil {
230 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
231
232 return
233 }
234
235 uid := c.Param("uid")
236 if exp, err = s.archive.FindByUID(context.Background(), uid); err != nil {
237 if gorm.IsRecordNotFoundError(err) {
238 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
239 } else {
240 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
241 }
242
243 return
244 }
245
246 ns, name, kind := exp.Namespace, exp.Name, exp.Kind
247
248 if chaosKind, ok := v1alpha1.AllKinds()[kind]; ok {
249 expDetail = s.findChaosInCluster(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}, chaosKind.SpawnObject())
250
251 if expDetail == nil {
252 return
253 }
254 } else {
255 u.SetAPIError(c, u.ErrBadRequest.New("Kind "+kind+" is not supported"))
256
257 return
258 }
259
260 c.JSON(http.StatusOK, expDetail)
261 }
262
263 func (s *Service) findChaosInCluster(c *gin.Context, kubeCli client.Client, namespacedName types.NamespacedName, chaos client.Object) *Detail {
264 if err := kubeCli.Get(context.Background(), namespacedName, chaos); err != nil {
265 u.SetAPImachineryError(c, err)
266
267 return nil
268 }
269
270 gvk, err := apiutil.GVKForObject(chaos, s.scheme)
271 if err != nil {
272 u.SetAPImachineryError(c, err)
273
274 return nil
275 }
276
277 kind := gvk.Kind
278
279 return &Detail{
280 Experiment: Experiment{
281 ObjectBase: core.ObjectBase{
282 Namespace: reflect.ValueOf(chaos).MethodByName("GetNamespace").Call(nil)[0].String(),
283 Name: reflect.ValueOf(chaos).MethodByName("GetName").Call(nil)[0].String(),
284 Kind: kind,
285 UID: reflect.ValueOf(chaos).MethodByName("GetUID").Call(nil)[0].String(),
286 Created: reflect.ValueOf(chaos).MethodByName("GetCreationTimestamp").Call(nil)[0].Interface().(metav1.Time).Format(time.RFC3339),
287 },
288 Status: status.GetChaosStatus(chaos.(v1alpha1.InnerObject)),
289 },
290 KubeObject: core.KubeObjectDesc{
291 TypeMeta: metav1.TypeMeta{
292 APIVersion: gvk.GroupVersion().String(),
293 Kind: kind,
294 },
295 Meta: core.KubeObjectMeta{
296 Namespace: reflect.ValueOf(chaos).Elem().FieldByName("Namespace").String(),
297 Name: reflect.ValueOf(chaos).Elem().FieldByName("Name").String(),
298 Labels: reflect.ValueOf(chaos).Elem().FieldByName("Labels").Interface().(map[string]string),
299 Annotations: reflect.ValueOf(chaos).Elem().FieldByName("Annotations").Interface().(map[string]string),
300 },
301 Spec: reflect.ValueOf(chaos).Elem().FieldByName("Spec").Interface(),
302 },
303 }
304 }
305
306
307
308
309
310
311
312
313
314
315
316
317 func (s *Service) delete(c *gin.Context) {
318 var (
319 exp *core.Experiment
320 )
321
322 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
323 if err != nil {
324 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
325
326 return
327 }
328
329 uid := c.Param("uid")
330 if exp, err = s.archive.FindByUID(context.Background(), uid); err != nil {
331 if gorm.IsRecordNotFoundError(err) {
332 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
333 } else {
334 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
335 }
336
337 return
338 }
339
340 ns, name, kind, force := exp.Namespace, exp.Name, exp.Kind, c.DefaultQuery("force", "false")
341 if ok := checkAndDeleteChaos(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}, kind, force); !ok {
342 return
343 }
344
345 c.JSON(http.StatusOK, u.ResponseSuccess)
346 }
347
348
349
350
351
352
353
354
355
356
357
358
359 func (s *Service) batchDelete(c *gin.Context) {
360 var (
361 exp *core.Experiment
362 )
363
364 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
365 if err != nil {
366 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
367
368 return
369 }
370
371 uids := c.Query("uids")
372 if uids == "" {
373 u.SetAPIError(c, u.ErrInternalServer.New("The uids cannot be empty"))
374
375 return
376 }
377
378 uidSlice, force := strings.Split(uids, ","), c.DefaultQuery("force", "false")
379
380 if len(uidSlice) > 100 {
381 u.SetAPIError(c, u.ErrInternalServer.New("Too many uids, please delete less than 100 at a time"))
382
383 return
384 }
385
386 for _, uid := range uidSlice {
387 if exp, err = s.archive.FindByUID(context.Background(), uid); err != nil {
388 if gorm.IsRecordNotFoundError(err) {
389 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
390 } else {
391 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
392 }
393
394 return
395 }
396
397 ns, name, kind := exp.Namespace, exp.Name, exp.Kind
398 if ok := checkAndDeleteChaos(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}, kind, force); !ok {
399 return
400 }
401 }
402
403 c.JSON(http.StatusOK, u.ResponseSuccess)
404 }
405
406 func checkAndDeleteChaos(c *gin.Context, kubeCli client.Client, namespacedName types.NamespacedName, kind string, force string) bool {
407 var (
408 chaosKind *v1alpha1.ChaosKind
409 ok bool
410 err error
411 )
412
413 if chaosKind, ok = v1alpha1.AllKinds()[kind]; !ok {
414 u.SetAPIError(c, u.ErrBadRequest.New("Kind "+kind+" is not supported"))
415
416 return false
417 }
418
419 ctx := context.Background()
420 chaos := chaosKind.SpawnObject()
421
422 if err = kubeCli.Get(ctx, namespacedName, chaos); err != nil {
423 u.SetAPImachineryError(c, err)
424
425 return false
426 }
427
428 if force == "true" {
429 if err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
430 return forceClean(kubeCli, chaos)
431 }); err != nil {
432 u.SetAPIError(c, u.ErrInternalServer.New("Forced deletion failed"))
433
434 return false
435 }
436 }
437
438 if err := kubeCli.Delete(ctx, chaos); err != nil {
439 u.SetAPImachineryError(c, err)
440
441 return false
442 }
443
444 return true
445 }
446
447 func forceClean(kubeCli client.Client, chaos client.Object) error {
448 annotations := chaos.(metav1.Object).GetAnnotations()
449 if annotations == nil {
450 annotations = make(map[string]string)
451 }
452
453 annotations[finalizers.AnnotationCleanFinalizer] = finalizers.AnnotationCleanFinalizerForced
454 chaos.(metav1.Object).SetAnnotations(annotations)
455
456 return kubeCli.Update(context.Background(), chaos)
457 }
458
459
460
461
462
463
464
465
466
467
468
469 func (s *Service) pause(c *gin.Context) {
470 var exp *core.Experiment
471
472 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
473 if err != nil {
474 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
475
476 return
477 }
478
479 uid := c.Param("uid")
480 if exp, err = s.archive.FindByUID(context.Background(), uid); err != nil {
481 if gorm.IsRecordNotFoundError(err) {
482 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
483 } else {
484 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
485 }
486
487 return
488 }
489
490 annotations := map[string]string{
491 v1alpha1.PauseAnnotationKey: "true",
492 }
493 if err = patchExperiment(kubeCli, exp, annotations); err != nil {
494 u.SetAPImachineryError(c, err)
495
496 return
497 }
498
499 c.JSON(http.StatusOK, u.ResponseSuccess)
500 }
501
502
503
504
505
506
507
508
509
510
511
512 func (s *Service) start(c *gin.Context) {
513 var exp *core.Experiment
514
515 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
516 if err != nil {
517 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
518
519 return
520 }
521
522 uid := c.Param("uid")
523 if exp, err = s.archive.FindByUID(context.Background(), uid); err != nil {
524 if gorm.IsRecordNotFoundError(err) {
525 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
526 } else {
527 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
528 }
529
530 return
531 }
532
533 annotations := map[string]string{
534 v1alpha1.PauseAnnotationKey: "false",
535 }
536 if err = patchExperiment(kubeCli, exp, annotations); err != nil {
537 u.SetAPImachineryError(c, err)
538
539 return
540 }
541
542 c.JSON(http.StatusOK, u.ResponseSuccess)
543 }
544
545 func patchExperiment(kubeCli client.Client, exp *core.Experiment, annotations map[string]string) error {
546 chaos := v1alpha1.AllKinds()[exp.Kind].SpawnObject()
547
548 if err := kubeCli.Get(context.Background(), types.NamespacedName{Namespace: exp.Namespace, Name: exp.Name}, chaos); err != nil {
549 return err
550 }
551
552 var mergePatch []byte
553 mergePatch, _ = json.Marshal(map[string]interface{}{
554 "metadata": map[string]interface{}{
555 "annotations": annotations,
556 },
557 })
558
559 return kubeCli.Patch(context.Background(), chaos, client.RawPatch(types.MergePatchType, mergePatch))
560 }
561
562
563
564
565
566
567
568
569
570
571 func (s *Service) state(c *gin.Context) {
572 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
573 if err != nil {
574 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
575
576 return
577 }
578
579 ns := c.Query("namespace")
580 if ns == "" && !s.config.ClusterScoped && s.config.TargetNamespace != "" {
581 ns = s.config.TargetNamespace
582
583 log.V(1).Info("Replace query namespace with", ns)
584 }
585
586 allChaosStatus := status.AllChaosStatus{}
587
588 g, ctx := errgroup.WithContext(context.Background())
589 m := &sync.Mutex{}
590
591 var listOptions []client.ListOption
592 listOptions = append(listOptions, &client.ListOptions{Namespace: ns})
593
594 for _, chaosKind := range v1alpha1.AllKinds() {
595 list := chaosKind.SpawnList()
596
597 g.Go(func() error {
598 if err := kubeCli.List(ctx, list, listOptions...); err != nil {
599 return err
600 }
601 m.Lock()
602
603 for _, item := range list.GetItems() {
604 s := status.GetChaosStatus(item.(v1alpha1.InnerObject))
605
606 switch s {
607 case status.Injecting:
608 allChaosStatus.Injecting++
609 case status.Running:
610 allChaosStatus.Running++
611 case status.Finished:
612 allChaosStatus.Finished++
613 case status.Paused:
614 allChaosStatus.Paused++
615 }
616 }
617
618 m.Unlock()
619 return nil
620 })
621 }
622
623 if err = g.Wait(); err != nil {
624 u.SetAPImachineryError(c, err)
625
626 return
627 }
628
629 c.JSON(http.StatusOK, allChaosStatus)
630 }
631