1
2
3
4
5
6
7
8
9
10
11
12
13
14 package collector
15
16 import (
17 "context"
18 "encoding/json"
19 "errors"
20
21 "github.com/go-logr/logr"
22 "github.com/jinzhu/gorm"
23 apierrors "k8s.io/apimachinery/pkg/api/errors"
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/apimachinery/pkg/runtime"
26 ctrl "sigs.k8s.io/controller-runtime"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 "github.com/chaos-mesh/chaos-mesh/pkg/core"
31 )
32
33
34 type ChaosCollector struct {
35 client.Client
36 Log logr.Logger
37 apiType runtime.Object
38 archive core.ExperimentStore
39 event core.EventStore
40 }
41
42
43 func (r *ChaosCollector) Reconcile(req ctrl.Request) (ctrl.Result, error) {
44 var (
45 chaosMeta metav1.Object
46 ok bool
47 manageFlag bool
48 )
49
50 if r.apiType == nil {
51 r.Log.Error(nil, "apiType has not been initialized")
52 return ctrl.Result{}, nil
53 }
54 ctx := context.Background()
55 manageFlag = false
56
57 obj, ok := r.apiType.DeepCopyObject().(v1alpha1.InnerObject)
58 if !ok {
59 r.Log.Error(nil, "it's not a stateful object")
60 return ctrl.Result{}, nil
61 }
62
63 err := r.Get(ctx, req.NamespacedName, obj)
64 if apierrors.IsNotFound(err) {
65 if chaosMeta, ok = obj.(metav1.Object); !ok {
66 r.Log.Error(nil, "failed to get chaos meta information")
67 }
68 if chaosMeta.GetLabels()["managed-by"] != "" {
69 manageFlag = true
70 }
71 if !manageFlag {
72 if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
73 r.Log.Error(err, "failed to archive experiment")
74 }
75 } else {
76 if err = r.event.DeleteByUID(ctx, string(chaosMeta.GetUID())); err != nil {
77 r.Log.Error(err, "failed to delete experiment related events")
78 }
79 }
80 return ctrl.Result{}, nil
81 }
82
83 if err != nil {
84 r.Log.Error(err, "failed to get chaos object", "request", req.NamespacedName)
85 return ctrl.Result{}, nil
86 }
87
88 if chaosMeta, ok = obj.(metav1.Object); !ok {
89 r.Log.Error(nil, "failed to get chaos meta information")
90 }
91
92 if chaosMeta.GetLabels()["managed-by"] != "" {
93 manageFlag = true
94 }
95
96 if obj.IsDeleted() {
97 if !manageFlag {
98 if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
99 r.Log.Error(err, "failed to archive experiment")
100 }
101 } else {
102 if err = r.event.DeleteByUID(ctx, string(chaosMeta.GetUID())); err != nil {
103 r.Log.Error(err, "failed to delete experiment related events")
104 }
105 }
106 return ctrl.Result{}, nil
107 }
108
109 if err := r.setUnarchivedExperiment(req, obj); err != nil {
110 r.Log.Error(err, "failed to archive experiment")
111
112 }
113
114 return ctrl.Result{}, nil
115 }
116
117
118 func (r *ChaosCollector) Setup(mgr ctrl.Manager, apiType runtime.Object) error {
119 r.apiType = apiType
120
121 return ctrl.NewControllerManagedBy(mgr).
122 For(apiType).
123 Complete(r)
124 }
125
126 func (r *ChaosCollector) setUnarchivedExperiment(req ctrl.Request, obj v1alpha1.InnerObject) error {
127 var (
128 chaosMeta metav1.Object
129 ok bool
130 )
131
132 if chaosMeta, ok = obj.(metav1.Object); !ok {
133 r.Log.Error(nil, "failed to get chaos meta information")
134 }
135 UID := string(chaosMeta.GetUID())
136
137 archive := &core.Experiment{
138 ExperimentMeta: core.ExperimentMeta{
139 Namespace: req.Namespace,
140 Name: req.Name,
141 Kind: obj.GetObjectKind().GroupVersionKind().Kind,
142 UID: UID,
143 Archived: false,
144 },
145 }
146
147 switch chaos := obj.(type) {
148 case *v1alpha1.PodChaos:
149 archive.Action = string(chaos.Spec.Action)
150 case *v1alpha1.NetworkChaos:
151 archive.Action = string(chaos.Spec.Action)
152 case *v1alpha1.IOChaos:
153 archive.Action = string(chaos.Spec.Action)
154 case *v1alpha1.TimeChaos, *v1alpha1.KernelChaos, *v1alpha1.StressChaos:
155 archive.Action = ""
156 case *v1alpha1.DNSChaos:
157 archive.Action = string(chaos.Spec.Action)
158 case *v1alpha1.AWSChaos:
159 archive.Action = string(chaos.Spec.Action)
160 case *v1alpha1.GCPChaos:
161 archive.Action = string(chaos.Spec.Action)
162 default:
163 return errors.New("unsupported chaos type " + archive.Kind)
164 }
165
166 archive.StartTime = chaosMeta.GetCreationTimestamp().Time
167 if chaosMeta.GetDeletionTimestamp() != nil {
168 archive.FinishTime = chaosMeta.GetDeletionTimestamp().Time
169 }
170
171 data, err := json.Marshal(chaosMeta)
172 if err != nil {
173 r.Log.Error(err, "failed to marshal chaos", "kind", archive.Kind,
174 "namespace", archive.Namespace, "name", archive.Name)
175 return err
176 }
177
178 archive.Experiment = string(data)
179
180 find, err := r.archive.FindByUID(context.Background(), UID)
181 if err != nil && !gorm.IsRecordNotFoundError(err) {
182 r.Log.Error(err, "failed to find experiment", "UID", UID)
183 return err
184 }
185
186 if find != nil {
187 archive.ID = find.ID
188 archive.CreatedAt = find.CreatedAt
189 archive.UpdatedAt = find.UpdatedAt
190 }
191
192 if err := r.archive.Set(context.Background(), archive); err != nil {
193 r.Log.Error(err, "failed to update experiment", "archive", archive)
194 return err
195 }
196
197 return nil
198 }
199
200 func (r *ChaosCollector) archiveExperiment(ns, name string) error {
201 if err := r.archive.Archive(context.Background(), ns, name); err != nil {
202 r.Log.Error(err, "failed to archive experiment", "namespace", ns, "name", name)
203 return err
204 }
205
206 return nil
207 }
208