...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/event/event.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/event

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package event
    17  
    18  import (
    19  	"context"
    20  	"net/http"
    21  	"sort"
    22  	"strconv"
    23  	"time"
    24  
    25  	"github.com/gin-gonic/gin"
    26  	"github.com/jinzhu/gorm"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    29  
    30  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    31  	"github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
    32  	config "github.com/chaos-mesh/chaos-mesh/pkg/config/dashboard"
    33  	u "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
    34  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
    35  )
    36  
    37  var log = u.Log.WithName("events")
    38  
    39  // Service defines a handler service for events.
    40  type Service struct {
    41  	event         core.EventStore
    42  	workflowStore core.WorkflowStore
    43  	conf          *config.ChaosDashboardConfig
    44  }
    45  
    46  func NewService(
    47  	event core.EventStore,
    48  	workflowStore core.WorkflowStore,
    49  	conf *config.ChaosDashboardConfig,
    50  ) *Service {
    51  	return &Service{
    52  		event:         event,
    53  		workflowStore: workflowStore,
    54  		conf:          conf,
    55  	}
    56  }
    57  
    58  // Register events RouterGroup.
    59  func Register(r *gin.RouterGroup, s *Service) {
    60  	endpoint := r.Group("/events")
    61  	endpoint.Use(func(c *gin.Context) {
    62  		u.AuthMiddleware(c, s.conf)
    63  	})
    64  
    65  	endpoint.GET("", s.list)
    66  	endpoint.GET("/:id", s.get)
    67  	endpoint.GET("/workflow/:uid", s.cascadeFetchEventsForWorkflow)
    68  }
    69  
    70  const layout = "2006-01-02 15:04:05"
    71  
    72  // @Summary list events.
    73  // @Description Get events from db.
    74  // @Tags events
    75  // @Produce json
    76  // @Param created_at query string false "The create time of events"
    77  // @Param name query string false "The name of the object"
    78  // @Param namespace query string false "The namespace of the object"
    79  // @Param object_id query string false "The UID of the object"
    80  // @Param kind query string false "kind" Enums(PodChaos, IOChaos, NetworkChaos, TimeChaos, KernelChaos, StressChaos, AWSChaos, GCPChaos, DNSChaos, Schedule)
    81  // @Param limit query string false "The max length of events list"
    82  // @Success 200 {array} core.Event
    83  // @Failure 500 {object} utils.APIError
    84  // @Router /events [get]
    85  func (s *Service) list(c *gin.Context) {
    86  	ns := c.Query("namespace")
    87  
    88  	if ns == "" && !s.conf.ClusterScoped && s.conf.TargetNamespace != "" {
    89  		ns = s.conf.TargetNamespace
    90  
    91  		log.V(1).Info("Replace query namespace with", ns)
    92  	}
    93  
    94  	start, _ := time.Parse(time.RFC3339, c.Query("start"))
    95  	end, _ := time.Parse(time.RFC3339, c.Query("end"))
    96  
    97  	filter := core.Filter{
    98  		ObjectID:  c.Query("object_id"),
    99  		Start:     start.UTC().Format(layout),
   100  		End:       end.UTC().Format(layout),
   101  		Namespace: ns,
   102  		Name:      c.Query("name"),
   103  		Kind:      c.Query("kind"),
   104  		Limit:     c.Query("limit"),
   105  	}
   106  
   107  	events, err := s.event.ListByFilter(context.Background(), filter)
   108  	if err != nil {
   109  		u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   110  
   111  		return
   112  	}
   113  
   114  	c.JSON(http.StatusOK, events)
   115  }
   116  
   117  // @Summary cascadeFetchEventsForWorkflow list all events for Workflow and related WorkflowNode.
   118  // @Description list all events for Workflow and related WorkflowNode.
   119  // @Tags events
   120  // @Produce json
   121  // @Param namespace query string false "The namespace of the object"
   122  // @Param uid path string false "The UID of the Workflow"
   123  // @Param limit query string false "The max length of events list"
   124  // @Success 200 {array} core.Event
   125  // @Failure 500 {object} utils.APIError
   126  // @Router /events/workflow/{uid} [get]
   127  func (s *Service) cascadeFetchEventsForWorkflow(c *gin.Context) {
   128  	ctx := c.Request.Context()
   129  	ns := c.Query("namespace")
   130  	uid := c.Param("uid")
   131  	start, _ := time.Parse(time.RFC3339, c.Query("start"))
   132  	end, _ := time.Parse(time.RFC3339, c.Query("end"))
   133  	limit := 0
   134  	limitString := c.Query("limit")
   135  	if len(limitString) > 0 {
   136  		parsedLimit, err := strconv.Atoi(limitString)
   137  		if err != nil {
   138  			u.SetAPIError(c, u.ErrBadRequest.Wrap(err, "parameter limit should be a integer"))
   139  			return
   140  		}
   141  		limit = parsedLimit
   142  	}
   143  
   144  	if ns == "" && !s.conf.ClusterScoped && s.conf.TargetNamespace != "" {
   145  		ns = s.conf.TargetNamespace
   146  
   147  		log.V(1).Info("Replace query namespace with", ns)
   148  	}
   149  
   150  	// we should fetch the events for Workflow and related WorkflowNode, so we need namespaced name at first
   151  	workflowEntity, err := s.workflowStore.FindByUID(ctx, uid)
   152  	if err != nil {
   153  		if gorm.IsRecordNotFoundError(err) {
   154  			u.SetAPIError(c, u.ErrNotFound.Wrap(err, "this requested workflow is not found, uid: %s", uid))
   155  		} else {
   156  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   157  		}
   158  		return
   159  	}
   160  
   161  	// if workflow has been archived, the Workflow CR and WorkflowNode CR also has been deleted from kubernetes, it's
   162  	// no way to "cascade fetch" anymore
   163  	if workflowEntity.Archived {
   164  		u.SetAPIError(c, u.ErrBadRequest.New("this requested workflow already been archived, can not list events for it"))
   165  		return
   166  	}
   167  
   168  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   169  	if err != nil {
   170  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   171  		return
   172  	}
   173  
   174  	// fetch all related WorkflowNodes
   175  	workflowNodeList := v1alpha1.WorkflowNodeList{}
   176  	controlledByThisWorkflow, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{
   177  		v1alpha1.LabelWorkflow: workflowEntity.Name,
   178  	}})
   179  	if err != nil {
   180  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   181  		return
   182  	}
   183  	err = kubeClient.List(ctx, &workflowNodeList, &client.ListOptions{
   184  		Namespace:     ns,
   185  		LabelSelector: controlledByThisWorkflow,
   186  	})
   187  	if err != nil {
   188  		u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   189  		return
   190  	}
   191  
   192  	result := make([]*core.Event, 0)
   193  	// fetch events of Workflow
   194  	eventsForWorkflow, err := s.event.ListByFilter(ctx, core.Filter{
   195  		ObjectID:  uid,
   196  		Namespace: ns,
   197  		Start:     start.UTC().Format(layout),
   198  		End:       end.UTC().Format(layout),
   199  	})
   200  	if err != nil {
   201  		u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   202  		return
   203  	}
   204  	result = append(result, eventsForWorkflow...)
   205  
   206  	// fetch all events of WorkflowNodes
   207  	for _, workflowNode := range workflowNodeList.Items {
   208  		eventsForWorkflowNode, err := s.event.ListByFilter(ctx, core.Filter{
   209  			Namespace: ns,
   210  			Name:      workflowNode.GetName(),
   211  			Start:     start.UTC().Format(layout),
   212  			End:       end.UTC().Format(layout),
   213  		})
   214  		if err != nil {
   215  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   216  			return
   217  		}
   218  		result = append(result, eventsForWorkflowNode...)
   219  	}
   220  
   221  	// sort by CreatedAt
   222  	sort.Slice(result, func(i, j int) bool {
   223  		return result[i].CreatedAt.UnixNano() > result[j].CreatedAt.UnixNano()
   224  	})
   225  
   226  	if limit > 0 && len(result) > limit {
   227  		c.JSON(http.StatusOK, result[:limit])
   228  		return
   229  	}
   230  	c.JSON(http.StatusOK, result)
   231  }
   232  
   233  // @Summary Get an event.
   234  // @Description Get the event from db by ID.
   235  // @Tags events
   236  // @Produce json
   237  // @Param id path uint true "The event ID"
   238  // @Success 200 {object} core.Event
   239  // @Failure 400 {object} utils.APIError
   240  // @Failure 404 {object} utils.APIError
   241  // @Failure 500 {object} utils.APIError
   242  // @Router /events/{id} [get]
   243  func (s *Service) get(c *gin.Context) {
   244  	id, ns := c.Param("id"), c.Query("namespace")
   245  
   246  	if id == "" {
   247  		u.SetAPIError(c, u.ErrBadRequest.New("ID cannot be empty"))
   248  
   249  		return
   250  	}
   251  
   252  	intID, err := strconv.Atoi(id)
   253  	if err != nil {
   254  		u.SetAPIError(c, u.ErrBadRequest.New("ID is not a number"))
   255  
   256  		return
   257  	}
   258  
   259  	if ns == "" && !s.conf.ClusterScoped && s.conf.TargetNamespace != "" {
   260  		ns = s.conf.TargetNamespace
   261  
   262  		log.V(1).Info("Replace query namespace with", ns)
   263  	}
   264  
   265  	event, err := s.event.Find(context.Background(), uint(intID))
   266  	if err != nil {
   267  		if gorm.IsRecordNotFoundError(err) {
   268  			u.SetAPIError(c, u.ErrNotFound.New("Event "+id+" not found"))
   269  		} else {
   270  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   271  		}
   272  
   273  		return
   274  	}
   275  
   276  	if len(ns) != 0 && event.Namespace != ns {
   277  		u.SetAPIError(c, u.ErrInternalServer.New("The namespace of event %s is %s instead of the %s in the request", id, event.Namespace, ns))
   278  
   279  		return
   280  	}
   281  
   282  	c.JSON(http.StatusOK, event)
   283  }
   284