...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/schedule/schedule.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/schedule

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package schedule
    17  
    18  import (
    19  	"context"
    20  	"encoding/json"
    21  	"net/http"
    22  	"sort"
    23  	"strings"
    24  	"time"
    25  
    26  	"github.com/gin-gonic/gin"
    27  	"github.com/go-logr/logr"
    28  	"github.com/jinzhu/gorm"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  	"k8s.io/apimachinery/pkg/types"
    32  	"sigs.k8s.io/controller-runtime/pkg/client"
    33  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    34  
    35  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    36  	"github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
    37  	config "github.com/chaos-mesh/chaos-mesh/pkg/config"
    38  	apiservertypes "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/types"
    39  	u "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
    40  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
    41  	"github.com/chaos-mesh/chaos-mesh/pkg/status"
    42  )
    43  
    44  // Service defines a handler service for schedules.
    45  type Service struct {
    46  	schedule core.ScheduleStore
    47  	event    core.EventStore
    48  	config   *config.ChaosDashboardConfig
    49  	scheme   *runtime.Scheme
    50  	log      logr.Logger
    51  }
    52  
    53  func NewService(
    54  	schedule core.ScheduleStore,
    55  	event core.EventStore,
    56  	config *config.ChaosDashboardConfig,
    57  	scheme *runtime.Scheme,
    58  	log logr.Logger,
    59  ) *Service {
    60  	return &Service{
    61  		schedule: schedule,
    62  		event:    event,
    63  		config:   config,
    64  		scheme:   scheme,
    65  		log:      log,
    66  	}
    67  }
    68  
    69  // Register schedules RouterGroup.
    70  func Register(r *gin.RouterGroup, s *Service) {
    71  	endpoint := r.Group("/schedules")
    72  
    73  	endpoint.GET("", s.list)
    74  	endpoint.POST("", s.create)
    75  	endpoint.GET("/:uid", s.get)
    76  	endpoint.DELETE("/:uid", s.delete)
    77  	endpoint.DELETE("", s.batchDelete)
    78  	endpoint.PUT("/pause/:uid", s.pauseSchedule)
    79  	endpoint.PUT("/start/:uid", s.startSchedule)
    80  }
    81  
    82  // @Summary List chaos schedules.
    83  // @Description Get chaos schedules from k8s cluster in real time.
    84  // @Tags schedules
    85  // @Produce json
    86  // @Param namespace query string false "filter schedules by namespace"
    87  // @Param name query string false "filter schedules by name"
    88  // @Success 200 {array} apiservertypes.Schedule
    89  // @Failure 400 {object} u.APIError
    90  // @Failure 500 {object} u.APIError
    91  // @Router /schedules [get]
    92  func (s *Service) list(c *gin.Context) {
    93  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
    94  	if err != nil {
    95  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
    96  
    97  		return
    98  	}
    99  
   100  	ns, name := c.Query("namespace"), c.Query("name")
   101  
   102  	if ns == "" && !s.config.ClusterScoped && s.config.TargetNamespace != "" {
   103  		ns = s.config.TargetNamespace
   104  
   105  		s.log.V(1).Info("Replace query namespace", "ns", ns)
   106  	}
   107  
   108  	ScheduleList := v1alpha1.ScheduleList{}
   109  	if err = kubeCli.List(context.Background(), &ScheduleList, &client.ListOptions{Namespace: ns}); err != nil {
   110  		u.SetAPImachineryError(c, err)
   111  
   112  		return
   113  	}
   114  
   115  	sches := make([]*apiservertypes.Schedule, 0)
   116  	for _, schedule := range ScheduleList.Items {
   117  		if name != "" && schedule.Name != name {
   118  			continue
   119  		}
   120  
   121  		sches = append(sches, &apiservertypes.Schedule{
   122  			ObjectBase: core.ObjectBase{
   123  				Namespace: schedule.Namespace,
   124  				Name:      schedule.Name,
   125  				Kind:      string(schedule.Spec.Type),
   126  				UID:       string(schedule.UID),
   127  				Created:   schedule.CreationTimestamp.Format(time.RFC3339),
   128  			},
   129  			Status: status.GetScheduleStatus(schedule),
   130  		})
   131  	}
   132  
   133  	sort.Slice(sches, func(i, j int) bool {
   134  		return sches[i].Created > sches[j].Created
   135  	})
   136  
   137  	c.JSON(http.StatusOK, sches)
   138  }
   139  
   140  // @Summary Create a new schedule.
   141  // @Description Pass a JSON object to create a new schedule. The schema for JSON is the same as the YAML schema for the Kubernetes object.
   142  // @Tags schedules
   143  // @Accept json
   144  // @Produce json
   145  // @Param schedule body v1alpha1.Schedule true "the schedule definition"
   146  // @Success 200 {object} v1alpha1.Schedule
   147  // @Failure 400 {object} u.APIError
   148  // @Failure 500 {object} u.APIError
   149  // @Router /schedules [post]
   150  func (s *Service) create(c *gin.Context) {
   151  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   152  	if err != nil {
   153  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   154  
   155  		return
   156  	}
   157  
   158  	var sch v1alpha1.Schedule
   159  	if err = u.ShouldBindBodyWithJSON(c, &sch); err != nil {
   160  		return
   161  	}
   162  
   163  	if err = kubeCli.Create(context.Background(), &sch); err != nil {
   164  		u.SetAPImachineryError(c, err)
   165  
   166  		return
   167  	}
   168  
   169  	c.JSON(http.StatusOK, sch)
   170  }
   171  
   172  // @Summary Get a schedule.
   173  // @Description Get the schedule's detail by uid.
   174  // @Tags schedules
   175  // @Produce json
   176  // @Param uid path string true "the schedule uid"
   177  // @Success 200 {object} apiservertypes.ScheduleDetail
   178  // @Failure 400 {object} u.APIError
   179  // @Failure 404 {object} u.APIError
   180  // @Failure 500 {object} u.APIError
   181  // @Router /schedules/{uid} [get]
   182  func (s *Service) get(c *gin.Context) {
   183  	var (
   184  		sch       *core.Schedule
   185  		schDetail *apiservertypes.ScheduleDetail
   186  	)
   187  
   188  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   189  	if err != nil {
   190  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   191  
   192  		return
   193  	}
   194  
   195  	uid := c.Param("uid")
   196  	if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
   197  		if gorm.IsRecordNotFoundError(err) {
   198  			u.SetAPIError(c, u.ErrBadRequest.New("Schedule "+uid+"not found"))
   199  		} else {
   200  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   201  		}
   202  
   203  		return
   204  	}
   205  
   206  	ns, name := sch.Namespace, sch.Name
   207  	schDetail = s.findScheduleInCluster(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name})
   208  	if schDetail == nil {
   209  		return
   210  	}
   211  
   212  	c.JSON(http.StatusOK, schDetail)
   213  }
   214  
   215  func (s *Service) findScheduleInCluster(c *gin.Context, kubeCli client.Client, namespacedName types.NamespacedName) *apiservertypes.ScheduleDetail {
   216  	var sch v1alpha1.Schedule
   217  
   218  	if err := kubeCli.Get(context.Background(), namespacedName, &sch); err != nil {
   219  		u.SetAPImachineryError(c, err)
   220  
   221  		return nil
   222  	}
   223  
   224  	gvk, err := apiutil.GVKForObject(&sch, s.scheme)
   225  	if err != nil {
   226  		u.SetAPImachineryError(c, err)
   227  
   228  		return nil
   229  	}
   230  
   231  	UIDList := make([]string, 0)
   232  	schType := string(sch.Spec.Type)
   233  	chaosKind, ok := v1alpha1.AllScheduleItemKinds()[schType]
   234  	if !ok {
   235  		u.SetAPIError(c, u.ErrInternalServer.New("Kind "+schType+" is not supported"))
   236  
   237  		return nil
   238  	}
   239  
   240  	selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   241  		MatchLabels: map[string]string{v1alpha1.LabelManagedBy: sch.Name},
   242  	})
   243  	if err != nil {
   244  		u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   245  
   246  		return nil
   247  	}
   248  
   249  	chaosList := chaosKind.SpawnList()
   250  	err = kubeCli.List(context.Background(), chaosList, &client.ListOptions{
   251  		Namespace:     sch.Namespace,
   252  		LabelSelector: selector,
   253  	})
   254  	if err != nil {
   255  		u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   256  
   257  		return nil
   258  	}
   259  
   260  	items := chaosList.GetItems()
   261  	for _, item := range items {
   262  		UIDList = append(UIDList, string(item.GetUID()))
   263  	}
   264  
   265  	return &apiservertypes.ScheduleDetail{
   266  		Schedule: apiservertypes.Schedule{
   267  			ObjectBase: core.ObjectBase{
   268  				Namespace: sch.Namespace,
   269  				Name:      sch.Name,
   270  				Kind:      string(sch.Spec.Type),
   271  				UID:       string(sch.UID),
   272  				Created:   sch.CreationTimestamp.Format(time.RFC3339),
   273  			},
   274  			Status: status.GetScheduleStatus(sch),
   275  		},
   276  		ExperimentUIDs: UIDList,
   277  		KubeObject: core.KubeObjectDesc{
   278  			TypeMeta: metav1.TypeMeta{
   279  				APIVersion: gvk.GroupVersion().String(),
   280  				Kind:       gvk.Kind,
   281  			},
   282  			Meta: core.KubeObjectMeta{
   283  				Namespace:   sch.Namespace,
   284  				Name:        sch.Name,
   285  				Labels:      sch.Labels,
   286  				Annotations: sch.Annotations,
   287  			},
   288  			Spec: sch.Spec,
   289  		},
   290  	}
   291  }
   292  
   293  // @Summary Delete a schedule.
   294  // @Description Delete the schedule by uid.
   295  // @Tags schedules
   296  // @Produce json
   297  // @Param uid path string true "the schedule uid"
   298  // @Success 200 {object} u.Response
   299  // @Failure 400 {object} u.APIError
   300  // @Failure 404 {object} u.APIError
   301  // @Failure 500 {object} u.APIError
   302  // @Router /schedules/{uid} [delete]
   303  func (s *Service) delete(c *gin.Context) {
   304  	var (
   305  		sch *core.Schedule
   306  	)
   307  
   308  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   309  	if err != nil {
   310  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   311  
   312  		return
   313  	}
   314  
   315  	uid := c.Param("uid")
   316  	if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
   317  		if gorm.IsRecordNotFoundError(err) {
   318  			u.SetAPIError(c, u.ErrNotFound.New("Schedule "+uid+" not found"))
   319  		} else {
   320  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   321  		}
   322  
   323  		return
   324  	}
   325  
   326  	ns, name := sch.Namespace, sch.Name
   327  	if err = checkAndDeleteSchedule(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}); err != nil {
   328  		u.SetAPImachineryError(c, err)
   329  
   330  		return
   331  	}
   332  
   333  	c.JSON(http.StatusOK, u.ResponseSuccess)
   334  }
   335  
   336  // @Summary Batch delete schedules.
   337  // @Description Batch delete schedules by uids.
   338  // @Tags schedules
   339  // @Produce json
   340  // @Param uids query string true "the schedule uids, split with comma. Example: ?uids=uid1,uid2"
   341  // @Success 200 {object} u.Response
   342  // @Failure 400 {object} u.APIError
   343  // @Failure 404 {object} u.APIError
   344  // @Failure 500 {object} u.APIError
   345  // @Router /schedules [delete]
   346  func (s *Service) batchDelete(c *gin.Context) {
   347  	var (
   348  		sch *core.Schedule
   349  	)
   350  
   351  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   352  	if err != nil {
   353  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   354  
   355  		return
   356  	}
   357  
   358  	uids := c.Query("uids")
   359  	if uids == "" {
   360  		u.SetAPIError(c, u.ErrInternalServer.New("The uids cannot be empty"))
   361  
   362  		return
   363  	}
   364  
   365  	uidSlice := strings.Split(uids, ",")
   366  
   367  	if len(uidSlice) > 100 {
   368  		u.SetAPIError(c, u.ErrInternalServer.New("Too many uids, please delete less than 100 at a time"))
   369  
   370  		return
   371  	}
   372  
   373  	for _, uid := range uidSlice {
   374  		if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
   375  			if gorm.IsRecordNotFoundError(err) {
   376  				u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
   377  			} else {
   378  				u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   379  			}
   380  
   381  			return
   382  		}
   383  
   384  		ns, name := sch.Namespace, sch.Name
   385  		if err = checkAndDeleteSchedule(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}); err != nil {
   386  			u.SetAPImachineryError(c, err)
   387  
   388  			return
   389  		}
   390  
   391  	}
   392  
   393  	c.JSON(http.StatusOK, u.ResponseSuccess)
   394  }
   395  
   396  func checkAndDeleteSchedule(c *gin.Context, kubeCli client.Client, namespacedName types.NamespacedName) (err error) {
   397  	ctx := context.Background()
   398  	var sch v1alpha1.Schedule
   399  
   400  	if err = kubeCli.Get(ctx, namespacedName, &sch); err != nil {
   401  		return
   402  	}
   403  
   404  	if err = kubeCli.Delete(ctx, &sch); err != nil {
   405  		return
   406  	}
   407  
   408  	return
   409  }
   410  
   411  // @Summary Pause a schedule.
   412  // @Description Pause a schedule.
   413  // @Tags schedules
   414  // @Produce json
   415  // @Param uid path string true "the schedule uid"
   416  // @Success 200 {object} u.Response
   417  // @Failure 400 {object} u.APIError
   418  // @Failure 404 {object} u.APIError
   419  // @Failure 500 {object} u.APIError
   420  // @Router /schedules/pause/{uid} [put]
   421  func (s *Service) pauseSchedule(c *gin.Context) {
   422  	var sch *core.Schedule
   423  
   424  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   425  	if err != nil {
   426  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   427  
   428  		return
   429  	}
   430  
   431  	uid := c.Param("uid")
   432  	if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
   433  		if gorm.IsRecordNotFoundError(err) {
   434  			u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
   435  		} else {
   436  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   437  		}
   438  
   439  		return
   440  	}
   441  
   442  	annotations := map[string]string{
   443  		v1alpha1.PauseAnnotationKey: "true",
   444  	}
   445  	if err = patchSchedule(kubeCli, sch, annotations); err != nil {
   446  		u.SetAPImachineryError(c, err)
   447  
   448  		return
   449  	}
   450  	c.JSON(http.StatusOK, u.ResponseSuccess)
   451  }
   452  
   453  // @Summary Start a schedule.
   454  // @Description Start a schedule.
   455  // @Tags schedules
   456  // @Produce json
   457  // @Param uid path string true "the schedule uid"
   458  // @Success 200 {object} u.Response
   459  // @Failure 400 {object} u.APIError
   460  // @Failure 404 {object} u.APIError
   461  // @Failure 500 {object} u.APIError
   462  // @Router /schedules/start/{uid} [put]
   463  func (s *Service) startSchedule(c *gin.Context) {
   464  	var sch *core.Schedule
   465  
   466  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   467  	if err != nil {
   468  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   469  
   470  		return
   471  	}
   472  
   473  	uid := c.Param("uid")
   474  	if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
   475  		if gorm.IsRecordNotFoundError(err) {
   476  			u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
   477  		} else {
   478  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   479  		}
   480  
   481  		return
   482  	}
   483  
   484  	annotations := map[string]string{
   485  		v1alpha1.PauseAnnotationKey: "false",
   486  	}
   487  	if err = patchSchedule(kubeCli, sch, annotations); err != nil {
   488  		u.SetAPImachineryError(c, err)
   489  
   490  		return
   491  	}
   492  	c.JSON(http.StatusOK, u.ResponseSuccess)
   493  }
   494  
   495  func patchSchedule(kubeCli client.Client, sch *core.Schedule, annotations map[string]string) error {
   496  	var tmp v1alpha1.Schedule
   497  
   498  	if err := kubeCli.Get(context.Background(), types.NamespacedName{Namespace: sch.Namespace, Name: sch.Name}, &tmp); err != nil {
   499  		return err
   500  	}
   501  
   502  	var mergePatch []byte
   503  	mergePatch, _ = json.Marshal(map[string]interface{}{
   504  		"metadata": map[string]interface{}{
   505  			"annotations": annotations,
   506  		},
   507  	})
   508  
   509  	return kubeCli.Patch(context.Background(), &tmp, client.RawPatch(types.MergePatchType, mergePatch))
   510  }
   511