...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/schedule/schedule.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/schedule

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package schedule
    17  
    18  import (
    19  	"context"
    20  	"encoding/json"
    21  	"net/http"
    22  	"sort"
    23  	"strings"
    24  	"time"
    25  
    26  	"github.com/gin-gonic/gin"
    27  	"github.com/jinzhu/gorm"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/runtime"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	"sigs.k8s.io/controller-runtime/pkg/client"
    32  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    33  
    34  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    35  	"github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
    36  	config "github.com/chaos-mesh/chaos-mesh/pkg/config/dashboard"
    37  	u "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
    38  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
    39  	"github.com/chaos-mesh/chaos-mesh/pkg/status"
    40  )
    41  
    42  var log = u.Log.WithName("schedules")
    43  
    44  // Service defines a handler service for schedules.
    45  type Service struct {
    46  	schedule core.ScheduleStore
    47  	event    core.EventStore
    48  	config   *config.ChaosDashboardConfig
    49  	scheme   *runtime.Scheme
    50  }
    51  
    52  func NewService(
    53  	schedule core.ScheduleStore,
    54  	event core.EventStore,
    55  	config *config.ChaosDashboardConfig,
    56  	scheme *runtime.Scheme,
    57  ) *Service {
    58  	return &Service{
    59  		schedule: schedule,
    60  		event:    event,
    61  		config:   config,
    62  		scheme:   scheme,
    63  	}
    64  }
    65  
    66  // Register schedules RouterGroup.
    67  func Register(r *gin.RouterGroup, s *Service) {
    68  	endpoint := r.Group("/schedules")
    69  
    70  	endpoint.GET("", s.list)
    71  	endpoint.POST("", s.create)
    72  	endpoint.GET("/:uid", s.get)
    73  	endpoint.DELETE("/:uid", s.delete)
    74  	endpoint.DELETE("", s.batchDelete)
    75  	endpoint.PUT("/pause/:uid", s.pauseSchedule)
    76  	endpoint.PUT("/start/:uid", s.startSchedule)
    77  }
    78  
    79  // Schedule defines the information of a schedule.
    80  type Schedule struct {
    81  	core.ObjectBase
    82  	Status status.ScheduleStatus `json:"status"`
    83  }
    84  
    85  // Detail adds KubeObjectDesc on Schedule.
    86  type Detail struct {
    87  	Schedule
    88  	ExperimentUIDs []string            `json:"experiment_uids"`
    89  	KubeObject     core.KubeObjectDesc `json:"kube_object"`
    90  }
    91  
    92  // @Summary List chaos schedules.
    93  // @Description Get chaos schedules from k8s cluster in real time.
    94  // @Tags schedules
    95  // @Produce json
    96  // @Param namespace query string false "filter schedules by namespace"
    97  // @Param name query string false "filter schedules by name"
    98  // @Success 200 {array} Schedule
    99  // @Failure 400 {object} utils.APIError
   100  // @Failure 500 {object} utils.APIError
   101  // @Router /schedules [get]
   102  func (s *Service) list(c *gin.Context) {
   103  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   104  	if err != nil {
   105  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   106  
   107  		return
   108  	}
   109  
   110  	ns, name := c.Query("namespace"), c.Query("name")
   111  
   112  	if ns == "" && !s.config.ClusterScoped && s.config.TargetNamespace != "" {
   113  		ns = s.config.TargetNamespace
   114  
   115  		log.V(1).Info("Replace query namespace with", ns)
   116  	}
   117  
   118  	ScheduleList := v1alpha1.ScheduleList{}
   119  	if err = kubeCli.List(context.Background(), &ScheduleList, &client.ListOptions{Namespace: ns}); err != nil {
   120  		u.SetAPImachineryError(c, err)
   121  
   122  		return
   123  	}
   124  
   125  	sches := make([]*Schedule, 0)
   126  	for _, schedule := range ScheduleList.Items {
   127  		if name != "" && schedule.Name != name {
   128  			continue
   129  		}
   130  
   131  		sches = append(sches, &Schedule{
   132  			ObjectBase: core.ObjectBase{
   133  				Namespace: schedule.Namespace,
   134  				Name:      schedule.Name,
   135  				Kind:      string(schedule.Spec.Type),
   136  				UID:       string(schedule.UID),
   137  				Created:   schedule.CreationTimestamp.Format(time.RFC3339),
   138  			},
   139  			Status: status.GetScheduleStatus(schedule),
   140  		})
   141  	}
   142  
   143  	sort.Slice(sches, func(i, j int) bool {
   144  		return sches[i].Created > sches[j].Created
   145  	})
   146  
   147  	c.JSON(http.StatusOK, sches)
   148  }
   149  
   150  // @Summary Create a new schedule.
   151  // @Description Pass a JSON object to create a new schedule. The schema for JSON is the same as the YAML schema for the Kubernetes object.
   152  // @Tags schedules
   153  // @Accept json
   154  // @Produce json
   155  // @Param schedule body v1alpha1.Schedule true "the schedule definition"
   156  // @Success 200 {object} v1alpha1.Schedule
   157  // @Failure 400 {object} utils.APIError
   158  // @Failure 500 {object} utils.APIError
   159  // @Router /schedules [post]
   160  func (s *Service) create(c *gin.Context) {
   161  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   162  	if err != nil {
   163  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   164  
   165  		return
   166  	}
   167  
   168  	var sch v1alpha1.Schedule
   169  	if err = u.ShouldBindBodyWithJSON(c, &sch); err != nil {
   170  		return
   171  	}
   172  
   173  	if err = kubeCli.Create(context.Background(), &sch); err != nil {
   174  		u.SetAPImachineryError(c, err)
   175  
   176  		return
   177  	}
   178  
   179  	c.JSON(http.StatusOK, sch)
   180  }
   181  
   182  // @Summary Get a schedule.
   183  // @Description Get the schedule's detail by uid.
   184  // @Tags schedules
   185  // @Produce json
   186  // @Param uid path string true "the schedule uid"
   187  // @Success 200 {object} Detail
   188  // @Failure 400 {object} utils.APIError
   189  // @Failure 404 {object} utils.APIError
   190  // @Failure 500 {object} utils.APIError
   191  // @Router /schedules/{uid} [get]
   192  func (s *Service) get(c *gin.Context) {
   193  	var (
   194  		sch       *core.Schedule
   195  		schDetail *Detail
   196  	)
   197  
   198  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   199  	if err != nil {
   200  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   201  
   202  		return
   203  	}
   204  
   205  	uid := c.Param("uid")
   206  	if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
   207  		if gorm.IsRecordNotFoundError(err) {
   208  			u.SetAPIError(c, u.ErrBadRequest.New("Schedule "+uid+"not found"))
   209  		} else {
   210  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   211  		}
   212  
   213  		return
   214  	}
   215  
   216  	ns, name := sch.Namespace, sch.Name
   217  	schDetail = s.findScheduleInCluster(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name})
   218  	if schDetail == nil {
   219  		return
   220  	}
   221  
   222  	c.JSON(http.StatusOK, schDetail)
   223  }
   224  
   225  func (s *Service) findScheduleInCluster(c *gin.Context, kubeCli client.Client, namespacedName types.NamespacedName) *Detail {
   226  	var sch v1alpha1.Schedule
   227  
   228  	if err := kubeCli.Get(context.Background(), namespacedName, &sch); err != nil {
   229  		u.SetAPImachineryError(c, err)
   230  
   231  		return nil
   232  	}
   233  
   234  	gvk, err := apiutil.GVKForObject(&sch, s.scheme)
   235  	if err != nil {
   236  		u.SetAPImachineryError(c, err)
   237  
   238  		return nil
   239  	}
   240  
   241  	UIDList := make([]string, 0)
   242  	schType := string(sch.Spec.Type)
   243  	chaosKind, ok := v1alpha1.AllScheduleItemKinds()[schType]
   244  	if !ok {
   245  		u.SetAPIError(c, u.ErrInternalServer.New("Kind "+schType+" is not supported"))
   246  
   247  		return nil
   248  	}
   249  
   250  	selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
   251  		MatchLabels: map[string]string{v1alpha1.LabelManagedBy: sch.Name},
   252  	})
   253  	if err != nil {
   254  		u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   255  
   256  		return nil
   257  	}
   258  
   259  	chaosList := chaosKind.SpawnList()
   260  	err = kubeCli.List(context.Background(), chaosList, &client.ListOptions{
   261  		Namespace:     sch.Namespace,
   262  		LabelSelector: selector,
   263  	})
   264  	if err != nil {
   265  		u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   266  
   267  		return nil
   268  	}
   269  
   270  	items := chaosList.GetItems()
   271  	for _, item := range items {
   272  		UIDList = append(UIDList, string(item.GetUID()))
   273  	}
   274  
   275  	return &Detail{
   276  		Schedule: Schedule{
   277  			ObjectBase: core.ObjectBase{
   278  				Namespace: sch.Namespace,
   279  				Name:      sch.Name,
   280  				Kind:      string(sch.Spec.Type),
   281  				UID:       string(sch.UID),
   282  				Created:   sch.CreationTimestamp.Format(time.RFC3339),
   283  			},
   284  			Status: status.GetScheduleStatus(sch),
   285  		},
   286  		ExperimentUIDs: UIDList,
   287  		KubeObject: core.KubeObjectDesc{
   288  			TypeMeta: metav1.TypeMeta{
   289  				APIVersion: gvk.GroupVersion().String(),
   290  				Kind:       gvk.Kind,
   291  			},
   292  			Meta: core.KubeObjectMeta{
   293  				Namespace:   sch.Namespace,
   294  				Name:        sch.Name,
   295  				Labels:      sch.Labels,
   296  				Annotations: sch.Annotations,
   297  			},
   298  			Spec: sch.Spec,
   299  		},
   300  	}
   301  }
   302  
   303  // @Summary Delete a schedule.
   304  // @Description Delete the schedule by uid.
   305  // @Tags schedules
   306  // @Produce json
   307  // @Param uid path string true "the schedule uid"
   308  // @Success 200 {object} utils.Response
   309  // @Failure 400 {object} utils.APIError
   310  // @Failure 404 {object} utils.APIError
   311  // @Failure 500 {object} utils.APIError
   312  // @Router /schedules/{uid} [delete]
   313  func (s *Service) delete(c *gin.Context) {
   314  	var (
   315  		sch *core.Schedule
   316  	)
   317  
   318  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   319  	if err != nil {
   320  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   321  
   322  		return
   323  	}
   324  
   325  	uid := c.Param("uid")
   326  	if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
   327  		if gorm.IsRecordNotFoundError(err) {
   328  			u.SetAPIError(c, u.ErrNotFound.New("Schedule "+uid+" not found"))
   329  		} else {
   330  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   331  		}
   332  
   333  		return
   334  	}
   335  
   336  	ns, name := sch.Namespace, sch.Name
   337  	if err = checkAndDeleteSchedule(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}); err != nil {
   338  		u.SetAPImachineryError(c, err)
   339  
   340  		return
   341  	}
   342  
   343  	c.JSON(http.StatusOK, u.ResponseSuccess)
   344  }
   345  
   346  // @Summary Batch delete schedules.
   347  // @Description Batch delete schedules by uids.
   348  // @Tags schedules
   349  // @Produce json
   350  // @Param uids query string true "the schedule uids, split with comma. Example: ?uids=uid1,uid2"
   351  // @Success 200 {object} utils.Response
   352  // @Failure 400 {object} utils.APIError
   353  // @Failure 404 {object} utils.APIError
   354  // @Failure 500 {object} utils.APIError
   355  // @Router /schedules [delete]
   356  func (s *Service) batchDelete(c *gin.Context) {
   357  	var (
   358  		sch *core.Schedule
   359  	)
   360  
   361  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   362  	if err != nil {
   363  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   364  
   365  		return
   366  	}
   367  
   368  	uids := c.Query("uids")
   369  	if uids == "" {
   370  		u.SetAPIError(c, u.ErrInternalServer.New("The uids cannot be empty"))
   371  
   372  		return
   373  	}
   374  
   375  	uidSlice := strings.Split(uids, ",")
   376  
   377  	if len(uidSlice) > 100 {
   378  		u.SetAPIError(c, u.ErrInternalServer.New("Too many uids, please delete less than 100 at a time"))
   379  
   380  		return
   381  	}
   382  
   383  	for _, uid := range uidSlice {
   384  		if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
   385  			if gorm.IsRecordNotFoundError(err) {
   386  				u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
   387  			} else {
   388  				u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   389  			}
   390  
   391  			return
   392  		}
   393  
   394  		ns, name := sch.Namespace, sch.Name
   395  		if err = checkAndDeleteSchedule(c, kubeCli, types.NamespacedName{Namespace: ns, Name: name}); err != nil {
   396  			u.SetAPImachineryError(c, err)
   397  
   398  			return
   399  		}
   400  
   401  	}
   402  
   403  	c.JSON(http.StatusOK, u.ResponseSuccess)
   404  }
   405  
   406  func checkAndDeleteSchedule(c *gin.Context, kubeCli client.Client, namespacedName types.NamespacedName) (err error) {
   407  	ctx := context.Background()
   408  	var sch v1alpha1.Schedule
   409  
   410  	if err = kubeCli.Get(ctx, namespacedName, &sch); err != nil {
   411  		return
   412  	}
   413  
   414  	if err = kubeCli.Delete(ctx, &sch); err != nil {
   415  		return
   416  	}
   417  
   418  	return
   419  }
   420  
   421  // @Summary Pause a schedule.
   422  // @Description Pause a schedule.
   423  // @Tags schedules
   424  // @Produce json
   425  // @Param uid path string true "the schedule uid"
   426  // @Success 200 {object} utils.Response
   427  // @Failure 400 {object} utils.APIError
   428  // @Failure 404 {object} utils.APIError
   429  // @Failure 500 {object} utils.APIError
   430  // @Router /schedules/pause/{uid} [put]
   431  func (s *Service) pauseSchedule(c *gin.Context) {
   432  	var sch *core.Schedule
   433  
   434  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   435  	if err != nil {
   436  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   437  
   438  		return
   439  	}
   440  
   441  	uid := c.Param("uid")
   442  	if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
   443  		if gorm.IsRecordNotFoundError(err) {
   444  			u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
   445  		} else {
   446  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   447  		}
   448  
   449  		return
   450  	}
   451  
   452  	annotations := map[string]string{
   453  		v1alpha1.PauseAnnotationKey: "true",
   454  	}
   455  	if err = patchSchedule(kubeCli, sch, annotations); err != nil {
   456  		u.SetAPImachineryError(c, err)
   457  
   458  		return
   459  	}
   460  	c.JSON(http.StatusOK, u.ResponseSuccess)
   461  }
   462  
   463  // @Summary Start a schedule.
   464  // @Description Start a schedule.
   465  // @Tags schedules
   466  // @Produce json
   467  // @Param uid path string true "the schedule uid"
   468  // @Success 200 {object} utils.Response
   469  // @Failure 400 {object} utils.APIError
   470  // @Failure 404 {object} utils.APIError
   471  // @Failure 500 {object} utils.APIError
   472  // @Router /schedules/start/{uid} [put]
   473  func (s *Service) startSchedule(c *gin.Context) {
   474  	var sch *core.Schedule
   475  
   476  	kubeCli, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   477  	if err != nil {
   478  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   479  
   480  		return
   481  	}
   482  
   483  	uid := c.Param("uid")
   484  	if sch, err = s.schedule.FindByUID(context.Background(), uid); err != nil {
   485  		if gorm.IsRecordNotFoundError(err) {
   486  			u.SetAPIError(c, u.ErrNotFound.New("Experiment "+uid+" not found"))
   487  		} else {
   488  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   489  		}
   490  
   491  		return
   492  	}
   493  
   494  	annotations := map[string]string{
   495  		v1alpha1.PauseAnnotationKey: "false",
   496  	}
   497  	if err = patchSchedule(kubeCli, sch, annotations); err != nil {
   498  		u.SetAPImachineryError(c, err)
   499  
   500  		return
   501  	}
   502  	c.JSON(http.StatusOK, u.ResponseSuccess)
   503  }
   504  
   505  func patchSchedule(kubeCli client.Client, sch *core.Schedule, annotations map[string]string) error {
   506  	var tmp v1alpha1.Schedule
   507  
   508  	if err := kubeCli.Get(context.Background(), types.NamespacedName{Namespace: sch.Namespace, Name: sch.Name}, &tmp); err != nil {
   509  		return err
   510  	}
   511  
   512  	var mergePatch []byte
   513  	mergePatch, _ = json.Marshal(map[string]interface{}{
   514  		"metadata": map[string]interface{}{
   515  			"annotations": annotations,
   516  		},
   517  	})
   518  
   519  	return kubeCli.Patch(context.Background(), &tmp, client.RawPatch(types.MergePatchType, mergePatch))
   520  }
   521