...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/event/event.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/event

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package event
    17  
    18  import (
    19  	"context"
    20  	"net/http"
    21  	"sort"
    22  	"strconv"
    23  	"time"
    24  
    25  	"github.com/gin-gonic/gin"
    26  	"github.com/go-logr/logr"
    27  	"github.com/jinzhu/gorm"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  
    31  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    32  	"github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
    33  	config "github.com/chaos-mesh/chaos-mesh/pkg/config"
    34  	u "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
    35  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
    36  )
    37  
    38  // Service defines a handler service for events.
    39  type Service struct {
    40  	event         core.EventStore
    41  	workflowStore core.WorkflowStore
    42  	conf          *config.ChaosDashboardConfig
    43  	logger        logr.Logger
    44  }
    45  
    46  func NewService(
    47  	event core.EventStore,
    48  	workflowStore core.WorkflowStore,
    49  	conf *config.ChaosDashboardConfig,
    50  	logger logr.Logger,
    51  ) *Service {
    52  	return &Service{
    53  		event:         event,
    54  		workflowStore: workflowStore,
    55  		conf:          conf,
    56  		logger:        logger.WithName("events"),
    57  	}
    58  }
    59  
    60  // Register events RouterGroup.
    61  func Register(r *gin.RouterGroup, s *Service) {
    62  	endpoint := r.Group("/events")
    63  	endpoint.Use(func(c *gin.Context) {
    64  		u.AuthMiddleware(c, s.conf)
    65  	})
    66  
    67  	endpoint.GET("", s.list)
    68  	endpoint.GET("/:id", s.get)
    69  	endpoint.GET("/workflow/:uid", s.cascadeFetchEventsForWorkflow)
    70  }
    71  
    72  const layout = "2006-01-02 15:04:05"
    73  
    74  // @Summary list events.
    75  // @Description Get events from db.
    76  // @Tags events
    77  // @Produce json
    78  // @Param created_at query string false "The create time of events"
    79  // @Param name query string false "The name of the object"
    80  // @Param namespace query string false "The namespace of the object"
    81  // @Param object_id query string false "The UID of the object"
    82  // @Param kind query string false "kind" Enums(PodChaos, IOChaos, NetworkChaos, TimeChaos, KernelChaos, StressChaos, AWSChaos, GCPChaos, DNSChaos, Schedule)
    83  // @Param limit query number false "The max length of events list"
    84  // @Success 200 {array} core.Event
    85  // @Failure 500 {object} u.APIError
    86  // @Router /events [get]
    87  func (s *Service) list(c *gin.Context) {
    88  	ns := c.Query("namespace")
    89  
    90  	if ns == "" && !s.conf.ClusterScoped && s.conf.TargetNamespace != "" {
    91  		ns = s.conf.TargetNamespace
    92  
    93  		s.logger.V(1).Info("Replace query namespace", "ns", ns)
    94  	}
    95  
    96  	start, _ := time.Parse(time.RFC3339, c.Query("start"))
    97  	end, _ := time.Parse(time.RFC3339, c.Query("end"))
    98  
    99  	filter := core.Filter{
   100  		ObjectID:  c.Query("object_id"),
   101  		Start:     start.UTC().Format(layout),
   102  		End:       end.UTC().Format(layout),
   103  		Namespace: ns,
   104  		Name:      c.Query("name"),
   105  		Kind:      c.Query("kind"),
   106  		Limit:     c.Query("limit"),
   107  	}
   108  
   109  	events, err := s.event.ListByFilter(context.Background(), filter)
   110  	if err != nil {
   111  		u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   112  
   113  		return
   114  	}
   115  
   116  	c.JSON(http.StatusOK, events)
   117  }
   118  
   119  // @Summary cascadeFetchEventsForWorkflow list all events for Workflow and related WorkflowNode.
   120  // @Description list all events for Workflow and related WorkflowNode.
   121  // @Tags events
   122  // @Produce json
   123  // @Param uid path string true "The UID of the Workflow"
   124  // @Param namespace query string false "The namespace of the object"
   125  // @Param limit query number false "The max length of events list"
   126  // @Success 200 {array} core.Event
   127  // @Failure 500 {object} u.APIError
   128  // @Router /events/workflow/{uid} [get]
   129  func (s *Service) cascadeFetchEventsForWorkflow(c *gin.Context) {
   130  	ctx := c.Request.Context()
   131  	ns := c.Query("namespace")
   132  	uid := c.Param("uid")
   133  	start, _ := time.Parse(time.RFC3339, c.Query("start"))
   134  	end, _ := time.Parse(time.RFC3339, c.Query("end"))
   135  	limit := 0
   136  	limitString := c.Query("limit")
   137  	if len(limitString) > 0 {
   138  		parsedLimit, err := strconv.Atoi(limitString)
   139  		if err != nil {
   140  			u.SetAPIError(c, u.ErrBadRequest.Wrap(err, "parameter limit should be a integer"))
   141  			return
   142  		}
   143  		limit = parsedLimit
   144  	}
   145  
   146  	if ns == "" && !s.conf.ClusterScoped && s.conf.TargetNamespace != "" {
   147  		ns = s.conf.TargetNamespace
   148  
   149  		s.logger.V(1).Info("Replace query namespace", "ns", ns)
   150  	}
   151  
   152  	// we should fetch the events for Workflow and related WorkflowNode, so we need namespaced name at first
   153  	workflowEntity, err := s.workflowStore.FindByUID(ctx, uid)
   154  	if err != nil {
   155  		if gorm.IsRecordNotFoundError(err) {
   156  			u.SetAPIError(c, u.ErrNotFound.Wrap(err, "this requested workflow is not found, uid: %s", uid))
   157  		} else {
   158  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   159  		}
   160  		return
   161  	}
   162  
   163  	// if workflow has been archived, the Workflow CR and WorkflowNode CR also has been deleted from kubernetes, it's
   164  	// no way to "cascade fetch" anymore
   165  	if workflowEntity.Archived {
   166  		u.SetAPIError(c, u.ErrBadRequest.New("this requested workflow already been archived, can not list events for it"))
   167  		return
   168  	}
   169  
   170  	kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
   171  	if err != nil {
   172  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   173  		return
   174  	}
   175  
   176  	// fetch all related WorkflowNodes
   177  	workflowNodeList := v1alpha1.WorkflowNodeList{}
   178  	controlledByThisWorkflow, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{
   179  		v1alpha1.LabelWorkflow: workflowEntity.Name,
   180  	}})
   181  	if err != nil {
   182  		u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
   183  		return
   184  	}
   185  	err = kubeClient.List(ctx, &workflowNodeList, &client.ListOptions{
   186  		Namespace:     ns,
   187  		LabelSelector: controlledByThisWorkflow,
   188  	})
   189  	if err != nil {
   190  		u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   191  		return
   192  	}
   193  
   194  	result := make([]*core.Event, 0)
   195  	// fetch events of Workflow
   196  	eventsForWorkflow, err := s.event.ListByFilter(ctx, core.Filter{
   197  		ObjectID:  uid,
   198  		Namespace: ns,
   199  		Start:     start.UTC().Format(layout),
   200  		End:       end.UTC().Format(layout),
   201  	})
   202  	if err != nil {
   203  		u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   204  		return
   205  	}
   206  	result = append(result, eventsForWorkflow...)
   207  
   208  	// fetch all events of WorkflowNodes
   209  	for _, workflowNode := range workflowNodeList.Items {
   210  		eventsForWorkflowNode, err := s.event.ListByFilter(ctx, core.Filter{
   211  			Namespace: ns,
   212  			Name:      workflowNode.GetName(),
   213  			Start:     start.UTC().Format(layout),
   214  			End:       end.UTC().Format(layout),
   215  		})
   216  		if err != nil {
   217  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   218  			return
   219  		}
   220  		result = append(result, eventsForWorkflowNode...)
   221  	}
   222  
   223  	// sort by CreatedAt
   224  	sort.Slice(result, func(i, j int) bool {
   225  		return result[i].CreatedAt.UnixNano() > result[j].CreatedAt.UnixNano()
   226  	})
   227  
   228  	if limit > 0 && len(result) > limit {
   229  		c.JSON(http.StatusOK, result[:limit])
   230  		return
   231  	}
   232  	c.JSON(http.StatusOK, result)
   233  }
   234  
   235  // @Summary Get an event.
   236  // @Description Get the event from db by ID.
   237  // @Tags events
   238  // @Produce json
   239  // @Param id path uint true "The event ID"
   240  // @Success 200 {object} core.Event
   241  // @Failure 400 {object} u.APIError
   242  // @Failure 404 {object} u.APIError
   243  // @Failure 500 {object} u.APIError
   244  // @Router /events/{id} [get]
   245  func (s *Service) get(c *gin.Context) {
   246  	id, ns := c.Param("id"), c.Query("namespace")
   247  
   248  	if id == "" {
   249  		u.SetAPIError(c, u.ErrBadRequest.New("ID cannot be empty"))
   250  
   251  		return
   252  	}
   253  
   254  	intID, err := strconv.Atoi(id)
   255  	if err != nil {
   256  		u.SetAPIError(c, u.ErrBadRequest.New("ID is not a number"))
   257  
   258  		return
   259  	}
   260  
   261  	if ns == "" && !s.conf.ClusterScoped && s.conf.TargetNamespace != "" {
   262  		ns = s.conf.TargetNamespace
   263  
   264  		s.logger.V(1).Info("Replace query namespace", "ns", ns)
   265  	}
   266  
   267  	event, err := s.event.Find(context.Background(), uint(intID))
   268  	if err != nil {
   269  		if gorm.IsRecordNotFoundError(err) {
   270  			u.SetAPIError(c, u.ErrNotFound.New("Event "+id+" not found"))
   271  		} else {
   272  			u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
   273  		}
   274  
   275  		return
   276  	}
   277  
   278  	if len(ns) != 0 && event.Namespace != ns {
   279  		u.SetAPIError(c, u.ErrInternalServer.New("The namespace of event %s is %s instead of the %s in the request", id, event.Namespace, ns))
   280  
   281  		return
   282  	}
   283  
   284  	c.JSON(http.StatusOK, event)
   285  }
   286