1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package event
17
18 import (
19 "context"
20 "net/http"
21 "sort"
22 "strconv"
23 "time"
24
25 "github.com/gin-gonic/gin"
26 "github.com/go-logr/logr"
27 "github.com/jinzhu/gorm"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 "github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
33 config "github.com/chaos-mesh/chaos-mesh/pkg/config"
34 u "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/apiserver/utils"
35 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
36 )
37
38
39 type Service struct {
40 event core.EventStore
41 workflowStore core.WorkflowStore
42 conf *config.ChaosDashboardConfig
43 logger logr.Logger
44 }
45
46 func NewService(
47 event core.EventStore,
48 workflowStore core.WorkflowStore,
49 conf *config.ChaosDashboardConfig,
50 logger logr.Logger,
51 ) *Service {
52 return &Service{
53 event: event,
54 workflowStore: workflowStore,
55 conf: conf,
56 logger: logger.WithName("events"),
57 }
58 }
59
60
61 func Register(r *gin.RouterGroup, s *Service) {
62 endpoint := r.Group("/events")
63 endpoint.Use(func(c *gin.Context) {
64 u.AuthMiddleware(c, s.conf)
65 })
66
67 endpoint.GET("", s.list)
68 endpoint.GET("/:id", s.get)
69 endpoint.GET("/workflow/:uid", s.cascadeFetchEventsForWorkflow)
70 }
71
72 const layout = "2006-01-02 15:04:05"
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 func (s *Service) list(c *gin.Context) {
88 ns := c.Query("namespace")
89
90 if ns == "" && !s.conf.ClusterScoped && s.conf.TargetNamespace != "" {
91 ns = s.conf.TargetNamespace
92
93 s.logger.V(1).Info("Replace query namespace", "ns", ns)
94 }
95
96 start, _ := time.Parse(time.RFC3339, c.Query("start"))
97 end, _ := time.Parse(time.RFC3339, c.Query("end"))
98
99 filter := core.Filter{
100 ObjectID: c.Query("object_id"),
101 Start: start.UTC().Format(layout),
102 End: end.UTC().Format(layout),
103 Namespace: ns,
104 Name: c.Query("name"),
105 Kind: c.Query("kind"),
106 Limit: c.Query("limit"),
107 }
108
109 events, err := s.event.ListByFilter(context.Background(), filter)
110 if err != nil {
111 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
112
113 return
114 }
115
116 c.JSON(http.StatusOK, events)
117 }
118
119
120
121
122
123
124
125
126
127
128
129 func (s *Service) cascadeFetchEventsForWorkflow(c *gin.Context) {
130 ctx := c.Request.Context()
131 ns := c.Query("namespace")
132 uid := c.Param("uid")
133 start, _ := time.Parse(time.RFC3339, c.Query("start"))
134 end, _ := time.Parse(time.RFC3339, c.Query("end"))
135 limit := 0
136 limitString := c.Query("limit")
137 if len(limitString) > 0 {
138 parsedLimit, err := strconv.Atoi(limitString)
139 if err != nil {
140 u.SetAPIError(c, u.ErrBadRequest.Wrap(err, "parameter limit should be a integer"))
141 return
142 }
143 limit = parsedLimit
144 }
145
146 if ns == "" && !s.conf.ClusterScoped && s.conf.TargetNamespace != "" {
147 ns = s.conf.TargetNamespace
148
149 s.logger.V(1).Info("Replace query namespace", "ns", ns)
150 }
151
152
153 workflowEntity, err := s.workflowStore.FindByUID(ctx, uid)
154 if err != nil {
155 if gorm.IsRecordNotFoundError(err) {
156 u.SetAPIError(c, u.ErrNotFound.Wrap(err, "this requested workflow is not found, uid: %s", uid))
157 } else {
158 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
159 }
160 return
161 }
162
163
164
165 if workflowEntity.Archived {
166 u.SetAPIError(c, u.ErrBadRequest.New("this requested workflow already been archived, can not list events for it"))
167 return
168 }
169
170 kubeClient, err := clientpool.ExtractTokenAndGetClient(c.Request.Header)
171 if err != nil {
172 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
173 return
174 }
175
176
177 workflowNodeList := v1alpha1.WorkflowNodeList{}
178 controlledByThisWorkflow, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{
179 v1alpha1.LabelWorkflow: workflowEntity.Name,
180 }})
181 if err != nil {
182 u.SetAPIError(c, u.ErrBadRequest.WrapWithNoMessage(err))
183 return
184 }
185 err = kubeClient.List(ctx, &workflowNodeList, &client.ListOptions{
186 Namespace: ns,
187 LabelSelector: controlledByThisWorkflow,
188 })
189 if err != nil {
190 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
191 return
192 }
193
194 result := make([]*core.Event, 0)
195
196 eventsForWorkflow, err := s.event.ListByFilter(ctx, core.Filter{
197 ObjectID: uid,
198 Namespace: ns,
199 Start: start.UTC().Format(layout),
200 End: end.UTC().Format(layout),
201 })
202 if err != nil {
203 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
204 return
205 }
206 result = append(result, eventsForWorkflow...)
207
208
209 for _, workflowNode := range workflowNodeList.Items {
210 eventsForWorkflowNode, err := s.event.ListByFilter(ctx, core.Filter{
211 Namespace: ns,
212 Name: workflowNode.GetName(),
213 Start: start.UTC().Format(layout),
214 End: end.UTC().Format(layout),
215 })
216 if err != nil {
217 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
218 return
219 }
220 result = append(result, eventsForWorkflowNode...)
221 }
222
223
224 sort.Slice(result, func(i, j int) bool {
225 return result[i].CreatedAt.UnixNano() > result[j].CreatedAt.UnixNano()
226 })
227
228 if limit > 0 && len(result) > limit {
229 c.JSON(http.StatusOK, result[:limit])
230 return
231 }
232 c.JSON(http.StatusOK, result)
233 }
234
235
236
237
238
239
240
241
242
243
244
245 func (s *Service) get(c *gin.Context) {
246 id, ns := c.Param("id"), c.Query("namespace")
247
248 if id == "" {
249 u.SetAPIError(c, u.ErrBadRequest.New("ID cannot be empty"))
250
251 return
252 }
253
254 intID, err := strconv.Atoi(id)
255 if err != nil {
256 u.SetAPIError(c, u.ErrBadRequest.New("ID is not a number"))
257
258 return
259 }
260
261 if ns == "" && !s.conf.ClusterScoped && s.conf.TargetNamespace != "" {
262 ns = s.conf.TargetNamespace
263
264 s.logger.V(1).Info("Replace query namespace", "ns", ns)
265 }
266
267 event, err := s.event.Find(context.Background(), uint(intID))
268 if err != nil {
269 if gorm.IsRecordNotFoundError(err) {
270 u.SetAPIError(c, u.ErrNotFound.New("Event "+id+" not found"))
271 } else {
272 u.SetAPIError(c, u.ErrInternalServer.WrapWithNoMessage(err))
273 }
274
275 return
276 }
277
278 if len(ns) != 0 && event.Namespace != ns {
279 u.SetAPIError(c, u.ErrInternalServer.New("The namespace of event %s is %s instead of the %s in the request", id, event.Namespace, ns))
280
281 return
282 }
283
284 c.JSON(http.StatusOK, event)
285 }
286