1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package collector
17
18 import (
19 "context"
20 "encoding/json"
21 "errors"
22
23 "github.com/go-logr/logr"
24 "github.com/jinzhu/gorm"
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/runtime"
28 ctrl "sigs.k8s.io/controller-runtime"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
33 )
34
35
36 type ChaosCollector struct {
37 client.Client
38 Log logr.Logger
39 apiType runtime.Object
40 archive core.ExperimentStore
41 event core.EventStore
42 }
43
44
45 func (r *ChaosCollector) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
46 var (
47 chaosMeta metav1.Object
48 ok bool
49 manageFlag bool
50 )
51
52 if r.apiType == nil {
53 r.Log.Error(nil, "apiType has not been initialized")
54 return ctrl.Result{}, nil
55 }
56
57 manageFlag = false
58
59 obj, ok := r.apiType.DeepCopyObject().(v1alpha1.InnerObject)
60 if !ok {
61 r.Log.Error(nil, "it's not a stateful object")
62 return ctrl.Result{}, nil
63 }
64
65 err := r.Get(ctx, req.NamespacedName, obj)
66 if apierrors.IsNotFound(err) {
67 if chaosMeta, ok = obj.(metav1.Object); !ok {
68 r.Log.Error(nil, "failed to get chaos meta information")
69 }
70 if chaosMeta.GetLabels()[v1alpha1.LabelManagedBy] != "" {
71 manageFlag = true
72 }
73 if !manageFlag {
74 if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
75 r.Log.Error(err, "failed to archive experiment")
76 }
77 } else {
78 if err = r.event.DeleteByUID(ctx, string(chaosMeta.GetUID())); err != nil {
79 r.Log.Error(err, "failed to delete experiment related events")
80 }
81 }
82 return ctrl.Result{}, nil
83 }
84
85 if err != nil {
86 r.Log.Error(err, "failed to get chaos object", "request", req.NamespacedName)
87 return ctrl.Result{}, nil
88 }
89
90 if chaosMeta, ok = obj.(metav1.Object); !ok {
91 r.Log.Error(nil, "failed to get chaos meta information")
92 }
93
94 if chaosMeta.GetLabels()[v1alpha1.LabelManagedBy] != "" {
95 manageFlag = true
96 }
97
98 if obj.IsDeleted() {
99 if !manageFlag {
100 if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
101 r.Log.Error(err, "failed to archive experiment")
102 }
103 } else {
104 if err = r.event.DeleteByUID(ctx, string(chaosMeta.GetUID())); err != nil {
105 r.Log.Error(err, "failed to delete experiment related events")
106 }
107 }
108 return ctrl.Result{}, nil
109 }
110
111 if err := r.setUnarchivedExperiment(req, obj); err != nil {
112 r.Log.Error(err, "failed to archive experiment")
113
114 }
115
116 return ctrl.Result{}, nil
117 }
118
119
120 func (r *ChaosCollector) Setup(mgr ctrl.Manager, apiType client.Object) error {
121 r.apiType = apiType
122
123 return ctrl.NewControllerManagedBy(mgr).
124 For(apiType).
125 Complete(r)
126 }
127
128 func (r *ChaosCollector) setUnarchivedExperiment(req ctrl.Request, obj v1alpha1.InnerObject) error {
129 var (
130 chaosMeta metav1.Object
131 ok bool
132 )
133
134 if chaosMeta, ok = obj.(metav1.Object); !ok {
135 r.Log.Error(nil, "failed to get chaos meta information")
136 }
137 UID := string(chaosMeta.GetUID())
138
139 archive := &core.Experiment{
140 ExperimentMeta: core.ExperimentMeta{
141 Namespace: req.Namespace,
142 Name: req.Name,
143 Kind: obj.GetObjectKind().GroupVersionKind().Kind,
144 UID: UID,
145 Archived: false,
146 },
147 }
148
149 switch chaos := obj.(type) {
150 case *v1alpha1.PodChaos:
151 archive.Action = string(chaos.Spec.Action)
152 case *v1alpha1.NetworkChaos:
153 archive.Action = string(chaos.Spec.Action)
154 case *v1alpha1.IOChaos:
155 archive.Action = string(chaos.Spec.Action)
156 case *v1alpha1.TimeChaos, *v1alpha1.KernelChaos, *v1alpha1.StressChaos, *v1alpha1.HTTPChaos:
157 archive.Action = ""
158 case *v1alpha1.DNSChaos:
159 archive.Action = string(chaos.Spec.Action)
160 case *v1alpha1.PhysicalMachineChaos:
161 archive.Action = string(chaos.Spec.Action)
162 case *v1alpha1.AWSChaos:
163 archive.Action = string(chaos.Spec.Action)
164 case *v1alpha1.GCPChaos:
165 archive.Action = string(chaos.Spec.Action)
166 case *v1alpha1.JVMChaos:
167 archive.Action = string(chaos.Spec.Action)
168 default:
169 return errors.New("unsupported chaos type " + archive.Kind)
170 }
171
172 archive.StartTime = chaosMeta.GetCreationTimestamp().Time
173 if chaosMeta.GetDeletionTimestamp() != nil {
174 archive.FinishTime = chaosMeta.GetDeletionTimestamp().Time
175 }
176
177 data, err := json.Marshal(chaosMeta)
178 if err != nil {
179 r.Log.Error(err, "failed to marshal chaos", "kind", archive.Kind,
180 "namespace", archive.Namespace, "name", archive.Name)
181 return err
182 }
183
184 archive.Experiment = string(data)
185
186 find, err := r.archive.FindByUID(context.Background(), UID)
187 if err != nil && !gorm.IsRecordNotFoundError(err) {
188 r.Log.Error(err, "failed to find experiment", "UID", UID)
189 return err
190 }
191
192 if find != nil {
193 archive.ID = find.ID
194 archive.CreatedAt = find.CreatedAt
195 archive.UpdatedAt = find.UpdatedAt
196 }
197
198 if err := r.archive.Set(context.Background(), archive); err != nil {
199 r.Log.Error(err, "failed to update experiment", "archive", archive)
200 return err
201 }
202
203 return nil
204 }
205
206 func (r *ChaosCollector) archiveExperiment(ns, name string) error {
207 if err := r.archive.Archive(context.Background(), ns, name); err != nil {
208 r.Log.Error(err, "failed to archive experiment", "namespace", ns, "name", name)
209 return err
210 }
211
212 return nil
213 }
214