1
2
3
4
5
6
7
8
9
10
11
12
13
14 package collector
15
16 import (
17 "context"
18 "encoding/json"
19 "errors"
20 "fmt"
21
22 "github.com/go-logr/logr"
23 "github.com/jinzhu/gorm"
24
25 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
26 "github.com/chaos-mesh/chaos-mesh/pkg/core"
27
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/runtime"
31 ctrl "sigs.k8s.io/controller-runtime"
32 "sigs.k8s.io/controller-runtime/pkg/client"
33 )
34
35
36 type ChaosCollector struct {
37 client.Client
38 Log logr.Logger
39 apiType runtime.Object
40 archive core.ExperimentStore
41 event core.EventStore
42 }
43
44
45 func (r *ChaosCollector) Reconcile(req ctrl.Request) (ctrl.Result, error) {
46 if r.apiType == nil {
47 r.Log.Error(nil, "apiType has not been initialized")
48 return ctrl.Result{}, nil
49 }
50 ctx := context.Background()
51
52 obj, ok := r.apiType.DeepCopyObject().(v1alpha1.InnerObject)
53 if !ok {
54 r.Log.Error(nil, "it's not a stateful object")
55 return ctrl.Result{}, nil
56 }
57
58 err := r.Get(ctx, req.NamespacedName, obj)
59 if apierrors.IsNotFound(err) {
60 if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
61 r.Log.Error(err, "failed to archive experiment")
62 }
63 return ctrl.Result{}, nil
64 }
65
66 if err != nil {
67 r.Log.Error(err, "failed to get chaos object", "request", req.NamespacedName)
68 return ctrl.Result{}, nil
69 }
70
71 if obj.IsDeleted() {
72 if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
73 r.Log.Error(err, "failed to archive experiment")
74 }
75 return ctrl.Result{}, nil
76 }
77
78 if err := r.setUnarchivedExperiment(req, obj); err != nil {
79 r.Log.Error(err, "failed to archive experiment")
80
81 }
82
83 if err := r.recordEvent(req, obj); err != nil {
84 r.Log.Error(err, "failed to record event")
85 }
86
87 return ctrl.Result{}, nil
88 }
89
90
91 func (r *ChaosCollector) Setup(mgr ctrl.Manager, apiType runtime.Object) error {
92 r.apiType = apiType
93
94 return ctrl.NewControllerManagedBy(mgr).
95 For(apiType).
96 Complete(r)
97 }
98
99 func (r *ChaosCollector) recordEvent(req ctrl.Request, obj v1alpha1.InnerObject) error {
100 var (
101 chaosMeta metav1.Object
102 ok bool
103 )
104
105 if chaosMeta, ok = obj.(metav1.Object); !ok {
106 return errors.New("failed to get chaos meta information")
107 }
108
109 UID := chaosMeta.GetUID()
110 status := obj.GetStatus()
111 kind := obj.GetObjectKind().GroupVersionKind().Kind
112
113 switch status.Experiment.Phase {
114 case v1alpha1.ExperimentPhaseRunning:
115 return r.createEvent(req, kind, status, string(UID))
116 case v1alpha1.ExperimentPhaseFinished, v1alpha1.ExperimentPhasePaused, v1alpha1.ExperimentPhaseWaiting:
117 return r.updateOrCreateEvent(req, kind, status, string(UID))
118 }
119
120 return nil
121 }
122
123 func (r *ChaosCollector) createEvent(req ctrl.Request, kind string, status *v1alpha1.ChaosStatus, UID string) error {
124 if status.Experiment.StartTime == nil {
125 r.Log.Info("failed to create event, because experiment startTime is empty")
126 return fmt.Errorf("failed to create event, because experiment startTime is empty")
127 }
128
129 event := &core.Event{
130 Experiment: req.Name,
131 Namespace: req.Namespace,
132 Kind: kind,
133 StartTime: &status.Experiment.StartTime.Time,
134 ExperimentID: UID,
135
136 Message: status.FailedMessage,
137 }
138
139 if _, err := r.event.FindByExperimentAndStartTime(
140 context.Background(), event.Experiment, event.Namespace, event.StartTime); err == nil {
141 r.Log.Info("event has been created")
142 return nil
143 }
144
145 for _, pod := range status.Experiment.PodRecords {
146 podRecord := &core.PodRecord{
147 EventID: event.ID,
148 PodIP: pod.PodIP,
149 PodName: pod.Name,
150 Namespace: pod.Namespace,
151 Message: pod.Message,
152 Action: pod.Action,
153 }
154 event.Pods = append(event.Pods, podRecord)
155 }
156 if err := r.event.Create(context.Background(), event); err != nil {
157 r.Log.Error(err, "failed to store event", "event", event)
158 return err
159 }
160
161 return nil
162 }
163
164 func (r *ChaosCollector) updateOrCreateEvent(req ctrl.Request, kind string, status *v1alpha1.ChaosStatus, UID string) error {
165 if status.Experiment.StartTime == nil || status.Experiment.EndTime == nil {
166 return fmt.Errorf("failed to get experiment time, startTime or endTime is empty")
167 }
168
169 event := &core.Event{
170 Experiment: req.Name,
171 Namespace: req.Namespace,
172 Kind: kind,
173 StartTime: &status.Experiment.StartTime.Time,
174 FinishTime: &status.Experiment.EndTime.Time,
175 Duration: status.Experiment.Duration,
176 ExperimentID: UID,
177 }
178
179 if _, err := r.event.FindByExperimentAndStartTime(
180 context.Background(), event.Experiment, event.Namespace, event.StartTime); err != nil && gorm.IsRecordNotFoundError(err) {
181 if err := r.createEvent(req, kind, status, UID); err != nil {
182 return err
183 }
184 }
185
186 if err := r.event.Update(context.Background(), event); err != nil {
187 r.Log.Error(err, "failed to update event", "event", event)
188 return err
189 }
190
191 return nil
192 }
193
194 func (r *ChaosCollector) setUnarchivedExperiment(req ctrl.Request, obj v1alpha1.InnerObject) error {
195 var (
196 chaosMeta metav1.Object
197 ok bool
198 )
199
200 if chaosMeta, ok = obj.(metav1.Object); !ok {
201 r.Log.Error(nil, "failed to get chaos meta information")
202 }
203 UID := string(chaosMeta.GetUID())
204
205 archive := &core.Experiment{
206 ExperimentMeta: core.ExperimentMeta{
207 Namespace: req.Namespace,
208 Name: req.Name,
209 Kind: obj.GetObjectKind().GroupVersionKind().Kind,
210 UID: UID,
211 Archived: false,
212 },
213 }
214
215 switch chaos := obj.(type) {
216 case *v1alpha1.PodChaos:
217 archive.Action = string(chaos.Spec.Action)
218 case *v1alpha1.NetworkChaos:
219 archive.Action = string(chaos.Spec.Action)
220 case *v1alpha1.IoChaos:
221 archive.Action = string(chaos.Spec.Action)
222 case *v1alpha1.TimeChaos, *v1alpha1.KernelChaos, *v1alpha1.StressChaos:
223 archive.Action = ""
224 case *v1alpha1.DNSChaos:
225 archive.Action = string(chaos.Spec.Action)
226 default:
227 return errors.New("unsupported chaos type " + archive.Kind)
228 }
229
230 archive.StartTime = chaosMeta.GetCreationTimestamp().Time
231 if chaosMeta.GetDeletionTimestamp() != nil {
232 archive.FinishTime = chaosMeta.GetDeletionTimestamp().Time
233 }
234
235 data, err := json.Marshal(chaosMeta)
236 if err != nil {
237 r.Log.Error(err, "failed to marshal chaos", "kind", archive.Kind,
238 "namespace", archive.Namespace, "name", archive.Name)
239 return err
240 }
241
242 archive.Experiment = string(data)
243
244 find, err := r.archive.FindByUID(context.Background(), UID)
245 if err != nil && !gorm.IsRecordNotFoundError(err) {
246 r.Log.Error(err, "failed to find experiment", "UID", UID)
247 return err
248 }
249
250 if find != nil {
251 archive.ID = find.ID
252 archive.CreatedAt = find.CreatedAt
253 archive.UpdatedAt = find.UpdatedAt
254 }
255
256 if err := r.archive.Set(context.Background(), archive); err != nil {
257 r.Log.Error(err, "failed to update experiment", "archive", archive)
258 return err
259 }
260
261 return nil
262 }
263
264 func (r *ChaosCollector) archiveExperiment(ns, name string) error {
265 if err := r.event.UpdateIncompleteEvents(context.Background(), ns, name); err != nil {
266 r.Log.Error(err, "failed to update incomplete events", "namespace", ns, "name", name)
267 return err
268 }
269
270 if err := r.archive.Archive(context.Background(), ns, name); err != nil {
271 r.Log.Error(err, "failed to archive experiment", "namespace", ns, "name", name)
272 return err
273 }
274
275 return nil
276 }
277