1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package experiment
17
18 import (
19 "context"
20 "encoding/json"
21 "net/http"
22 "reflect"
23 "sort"
24 "strings"
25 "sync"
26 "time"
27
28 "github.com/gin-gonic/gin"
29 "github.com/go-logr/logr"
30 "github.com/jinzhu/gorm"
31 "golang.org/x/sync/errgroup"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/runtime"
34 "k8s.io/apimachinery/pkg/types"
35 "k8s.io/client-go/util/retry"
36 "sigs.k8s.io/controller-runtime/pkg/client"
37 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
38
39 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
40 "github.com/chaos-mesh/chaos-mesh/controllers/common/finalizers"
41 "github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
42 config "github.com/chaos-mesh/chaos-mesh/pkg/config"
43 apiservertypes "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/types"
44 u "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
45 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
46 "github.com/chaos-mesh/chaos-mesh/pkg/status"
47 )
48
49
50 type Service struct {
51 archive core.ExperimentStore
52 event core.EventStore
53 config *config.ChaosDashboardConfig
54 scheme *runtime.Scheme
55 log logr.Logger
56 }
57
58 func NewService(
59 archive core.ExperimentStore,
60 event core.EventStore,
61 config *config.ChaosDashboardConfig,
62 scheme *runtime.Scheme,
63 log logr.Logger,
64 ) *Service {
65 return &Service{
66 archive: archive,
67 event: event,
68 config: config,
69 scheme: scheme,
70 log: log,
71 }
72 }
73
74
75 func Register(r *gin.RouterGroup, s *Service) {
76 endpoint := r.Group("/experiments")
77
78 endpoint.GET("", s.list)
79 endpoint.POST("", s.create)
80 endpoint.GET("/:uid", s.get)
81 endpoint.DELETE("/:uid", s.delete)
82 endpoint.DELETE("", s.batchDelete)
83 endpoint.PUT("/pause/:uid", s.pause)
84 endpoint.PUT("/start/:uid", s.start)
85 endpoint.GET("/state", s.state)
86 }
87
88
89
90
91
92
93
94
95
96
97
98
99
100 func (s *Service) list(c *gin.Context) {
101 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
102 if err != nil {
103 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
104
105 return
106 }
107
108 ns, name, kind := c.Query("namespace"), c.Query("name"), c.Query("kind")
109
110 if ns == "" && !s.config.ClusterScoped && s.config.TargetNamespace != "" {
111 ns = s.config.TargetNamespace
112
113 s.log.V(1).Info("Replace query namespace", "ns", ns)
114 }
115
116 exps := make([]*apiservertypes.Experiment, 0)
117 for k, chaosKind := range v1alpha1.AllKinds() {
118 if kind != "" && k != kind {
119 continue
120 }
121
122 list := chaosKind.SpawnList()
123 if err := kubeCli.List(context.Background(), list, &client.ListOptions{Namespace: ns}); err != nil {
124 u.SetAPImachineryError(c, err)
125
126 return
127 }
128
129 for _, item := range list.GetItems() {
130 chaosName := item.GetName()
131
132 if name != "" && chaosName != name {
133 continue
134 }
135
136 exps = append(exps, &apiservertypes.Experiment{
137 ObjectBase: core.ObjectBase{
138 Namespace: item.GetNamespace(),
139 Name: chaosName,
140 Kind: item.GetObjectKind().GroupVersionKind().Kind,
141 UID: string(item.GetUID()),
142 Created: item.GetCreationTimestamp().Format(time.RFC3339),
143 },
144 Status: status.GetChaosStatus(item.(v1alpha1.InnerObject)),
145 })
146 }
147 }
148
149 sort.Slice(exps, func(i, j int) bool {
150 return exps[i].Created > exps[j].Created
151 })
152
153 c.JSON(http.StatusOK, exps)
154 }
155
156
157
158
159
160
161
162
163
164
165
166 func (s *Service) create(c *gin.Context) {
167 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
168 if err != nil {
169 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
170
171 return
172 }
173
174 var exp map[string]interface{}
175 if err = u.ShouldBindBodyWithJSON(c, &exp); err != nil {
176 return
177 }
178 kind := exp["kind"].(string)
179
180 if chaosKind, ok := v1alpha1.AllKinds()[kind]; ok {
181 chaos := chaosKind.SpawnObject()
182 reflect.ValueOf(chaos).Elem().FieldByName("ObjectMeta").Set(reflect.ValueOf(metav1.ObjectMeta{}))
183
184 if err = u.ShouldBindBodyWithJSON(c, chaos); err != nil {
185 return
186 }
187
188 if err = kubeCli.Create(context.Background(), chaos); err != nil {
189 u.SetAPImachineryError(c, err)
190
191 return
192 }
193 } else {
194 u.SetAPIError(c, u.ErrBadRequest.New("Kind "+kind+" is not supported"))
195
196 return
197 }
198
199 c.JSON(http.StatusOK, exp)
200 }
201
202
203
204
205
206
207
208
209
210
211
212 func (s *Service) get(c *gin.Context) {
213 var (
214 exp *core.Experiment
215 expDetail *apiservertypes.ExperimentDetail
216 )
217
218 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
219 if err != nil {
220 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
221
222 return
223 }
224
225 uid := c.Param("uid")
226 if exp, err = s.archive.FindByUID(context.Background(), uid); err != nil {
227 if gorm.IsRecordNotFoundError(err) {
228 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
229 } else {
230 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
231 }
232
233 return
234 }
235
236 ns, name, kind := exp.Namespace, exp.Name, exp.Kind
237
238 if chaosKind, ok := v1alpha1.AllKinds()[kind]; ok {
239 expDetail = s.findChaosInCluster(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}, chaosKind.SpawnObject())
240
241 if expDetail == nil {
242 return
243 }
244 } else {
245 u.SetAPIError(c, u.ErrBadRequest.New("Kind "+kind+" is not supported"))
246
247 return
248 }
249
250 c.JSON(http.StatusOK, expDetail)
251 }
252
253 func (s *Service) findChaosInCluster(c *gin.Context, kubeCli client.Client, namespacedName types.NamespacedName, chaos client.Object) *apiservertypes.ExperimentDetail {
254 if err := kubeCli.Get(context.Background(), namespacedName, chaos); err != nil {
255 u.SetAPImachineryError(c, err)
256
257 return nil
258 }
259
260 gvk, err := apiutil.GVKForObject(chaos, s.scheme)
261 if err != nil {
262 u.SetAPImachineryError(c, err)
263
264 return nil
265 }
266
267 kind := gvk.Kind
268
269 return &apiservertypes.ExperimentDetail{
270 Experiment: apiservertypes.Experiment{
271 ObjectBase: core.ObjectBase{
272 Namespace: reflect.ValueOf(chaos).MethodByName("GetNamespace").Call(nil)[0].String(),
273 Name: reflect.ValueOf(chaos).MethodByName("GetName").Call(nil)[0].String(),
274 Kind: kind,
275 UID: reflect.ValueOf(chaos).MethodByName("GetUID").Call(nil)[0].String(),
276 Created: reflect.ValueOf(chaos).MethodByName("GetCreationTimestamp").Call(nil)[0].Interface().(metav1.Time).Format(time.RFC3339),
277 },
278 Status: status.GetChaosStatus(chaos.(v1alpha1.InnerObject)),
279 },
280 KubeObject: core.KubeObjectDesc{
281 TypeMeta: metav1.TypeMeta{
282 APIVersion: gvk.GroupVersion().String(),
283 Kind: kind,
284 },
285 Meta: core.KubeObjectMeta{
286 Namespace: reflect.ValueOf(chaos).Elem().FieldByName("Namespace").String(),
287 Name: reflect.ValueOf(chaos).Elem().FieldByName("Name").String(),
288 Labels: reflect.ValueOf(chaos).Elem().FieldByName("Labels").Interface().(map[string]string),
289 Annotations: reflect.ValueOf(chaos).Elem().FieldByName("Annotations").Interface().(map[string]string),
290 },
291 Spec: reflect.ValueOf(chaos).Elem().FieldByName("Spec").Interface(),
292 },
293 }
294 }
295
296
297
298
299
300
301
302
303
304
305
306
307 func (s *Service) delete(c *gin.Context) {
308 var (
309 exp *core.Experiment
310 )
311
312 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
313 if err != nil {
314 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
315
316 return
317 }
318
319 uid := c.Param("uid")
320 if exp, err = s.archive.FindByUID(context.Background(), uid); err != nil {
321 if gorm.IsRecordNotFoundError(err) {
322 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
323 } else {
324 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
325 }
326
327 return
328 }
329
330 ns, name, kind, force := exp.Namespace, exp.Name, exp.Kind, c.DefaultQuery("force", "false")
331 if ok := checkAndDeleteChaos(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}, kind, force); !ok {
332 return
333 }
334
335 c.JSON(http.StatusOK, u.ResponseSuccess)
336 }
337
338
339
340
341
342
343
344
345
346
347
348
349 func (s *Service) batchDelete(c *gin.Context) {
350 var (
351 exp *core.Experiment
352 )
353
354 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
355 if err != nil {
356 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
357
358 return
359 }
360
361 uids := c.Query("uids")
362 if uids == "" {
363 u.SetAPIError(c, u.ErrInternalServer.New("The uids cannot be empty"))
364
365 return
366 }
367
368 uidSlice, force := strings.Split(uids, ","), c.DefaultQuery("force", "false")
369
370 if len(uidSlice) > 100 {
371 u.SetAPIError(c, u.ErrInternalServer.New("Too many uids, please delete less than 100 at a time"))
372
373 return
374 }
375
376 for _, uid := range uidSlice {
377 if exp, err = s.archive.FindByUID(context.Background(), uid); err != nil {
378 if gorm.IsRecordNotFoundError(err) {
379 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
380 } else {
381 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
382 }
383
384 return
385 }
386
387 ns, name, kind := exp.Namespace, exp.Name, exp.Kind
388 if ok := checkAndDeleteChaos(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}, kind, force); !ok {
389 return
390 }
391 }
392
393 c.JSON(http.StatusOK, u.ResponseSuccess)
394 }
395
396 func checkAndDeleteChaos(c *gin.Context, kubeCli client.Client, namespacedName types.NamespacedName, kind string, force string) bool {
397 var (
398 chaosKind *v1alpha1.ChaosKind
399 ok bool
400 err error
401 )
402
403 if chaosKind, ok = v1alpha1.AllKinds()[kind]; !ok {
404 u.SetAPIError(c, u.ErrBadRequest.New("Kind "+kind+" is not supported"))
405
406 return false
407 }
408
409 ctx := context.Background()
410 chaos := chaosKind.SpawnObject()
411
412 if err = kubeCli.Get(ctx, namespacedName, chaos); err != nil {
413 u.SetAPImachineryError(c, err)
414
415 return false
416 }
417
418 if force == "true" {
419 if err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
420 return forceClean(kubeCli, chaos)
421 }); err != nil {
422 u.SetAPIError(c, u.ErrInternalServer.New("Forced deletion failed"))
423
424 return false
425 }
426 }
427
428 if err := kubeCli.Delete(ctx, chaos); err != nil {
429 u.SetAPImachineryError(c, err)
430
431 return false
432 }
433
434 return true
435 }
436
437 func forceClean(kubeCli client.Client, chaos client.Object) error {
438 annotations := chaos.(metav1.Object).GetAnnotations()
439 if annotations == nil {
440 annotations = make(map[string]string)
441 }
442
443 annotations[finalizers.AnnotationCleanFinalizer] = finalizers.AnnotationCleanFinalizerForced
444 chaos.(metav1.Object).SetAnnotations(annotations)
445
446 return kubeCli.Update(context.Background(), chaos)
447 }
448
449
450
451
452
453
454
455
456
457
458
459 func (s *Service) pause(c *gin.Context) {
460 var exp *core.Experiment
461
462 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
463 if err != nil {
464 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
465
466 return
467 }
468
469 uid := c.Param("uid")
470 if exp, err = s.archive.FindByUID(context.Background(), uid); err != nil {
471 if gorm.IsRecordNotFoundError(err) {
472 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
473 } else {
474 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
475 }
476
477 return
478 }
479
480 annotations := map[string]string{
481 v1alpha1.PauseAnnotationKey: "true",
482 }
483 if err = patchExperiment(kubeCli, exp, annotations); err != nil {
484 u.SetAPImachineryError(c, err)
485
486 return
487 }
488
489 c.JSON(http.StatusOK, u.ResponseSuccess)
490 }
491
492
493
494
495
496
497
498
499
500
501
502 func (s *Service) start(c *gin.Context) {
503 var exp *core.Experiment
504
505 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
506 if err != nil {
507 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
508
509 return
510 }
511
512 uid := c.Param("uid")
513 if exp, err = s.archive.FindByUID(context.Background(), uid); err != nil {
514 if gorm.IsRecordNotFoundError(err) {
515 u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
516 } else {
517 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
518 }
519
520 return
521 }
522
523 annotations := map[string]string{
524 v1alpha1.PauseAnnotationKey: "false",
525 }
526 if err = patchExperiment(kubeCli, exp, annotations); err != nil {
527 u.SetAPImachineryError(c, err)
528
529 return
530 }
531
532 c.JSON(http.StatusOK, u.ResponseSuccess)
533 }
534
535 func patchExperiment(kubeCli client.Client, exp *core.Experiment, annotations map[string]string) error {
536 chaos := v1alpha1.AllKinds()[exp.Kind].SpawnObject()
537
538 if err := kubeCli.Get(context.Background(), types.NamespacedName{Namespace: exp.Namespace, Name: exp.Name}, chaos); err != nil {
539 return err
540 }
541
542 var mergePatch []byte
543 mergePatch, _ = json.Marshal(map[string]interface{}{
544 "metadata": map[string]interface{}{
545 "annotations": annotations,
546 },
547 })
548
549 return kubeCli.Patch(context.Background(), chaos, client.RawPatch(types.MergePatchType, mergePatch))
550 }
551
552
553
554
555
556
557
558
559
560
561 func (s *Service) state(c *gin.Context) {
562 kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
563 if err != nil {
564 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
565
566 return
567 }
568
569 ns := c.Query("namespace")
570 if ns == "" && !s.config.ClusterScoped && s.config.TargetNamespace != "" {
571 ns = s.config.TargetNamespace
572
573 s.log.V(1).Info("Replace query namespace", "ns", ns)
574 }
575
576 allChaosStatus := status.AllChaosStatus{}
577
578 g, ctx := errgroup.WithContext(context.Background())
579 m := &sync.Mutex{}
580
581 var listOptions []client.ListOption
582 listOptions = append(listOptions, &client.ListOptions{Namespace: ns})
583
584 for _, chaosKind := range v1alpha1.AllKinds() {
585 list := chaosKind.SpawnList()
586
587 g.Go(func() error {
588 if err := kubeCli.List(ctx, list, listOptions...); err != nil {
589 return err
590 }
591 m.Lock()
592
593 for _, item := range list.GetItems() {
594 s := status.GetChaosStatus(item.(v1alpha1.InnerObject))
595
596 switch s {
597 case status.Injecting:
598 allChaosStatus.Injecting++
599 case status.Running:
600 allChaosStatus.Running++
601 case status.Finished:
602 allChaosStatus.Finished++
603 case status.Paused:
604 allChaosStatus.Paused++
605 }
606 }
607
608 m.Unlock()
609 return nil
610 })
611 }
612
613 if err = g.Wait(); err != nil {
614 u.SetAPImachineryError(c, err)
615
616 return
617 }
618
619 c.JSON(http.StatusOK, allChaosStatus)
620 }
621