1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package collector
17
18 import (
19 "context"
20 "encoding/json"
21
22 "github.com/go-logr/logr"
23 "github.com/jinzhu/gorm"
24 "github.com/pkg/errors"
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/runtime"
28 ctrl "sigs.k8s.io/controller-runtime"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
33 )
34
35
36 type ChaosCollector struct {
37 client.Client
38 Log logr.Logger
39 apiType runtime.Object
40 archive core.ExperimentStore
41 event core.EventStore
42 }
43
44
45 func (r *ChaosCollector) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
46 if r.apiType == nil {
47 r.Log.Error(nil, "apiType has not been initialized")
48 return ctrl.Result{}, nil
49 }
50
51 obj, ok := r.apiType.DeepCopyObject().(v1alpha1.InnerObject)
52 if !ok {
53 r.Log.Error(nil, "it's not a stateful object")
54 return ctrl.Result{}, nil
55 }
56
57 err := r.Get(ctx, req.NamespacedName, obj)
58 if apierrors.IsNotFound(err) {
59 if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
60 r.Log.Error(err, "failed to archive experiment")
61 }
62
63
64
65 if err = r.deleteManagedExperiments(req.Namespace, req.Name); err != nil {
66 r.Log.Error(err, "delete managed experiments", "namespace", req.Namespace, "name", req.Name)
67 }
68
69 return ctrl.Result{}, nil
70 }
71
72 if err != nil {
73 r.Log.Error(err, "failed to get chaos object", "request", req.NamespacedName)
74 return ctrl.Result{}, nil
75 }
76
77 if err := r.setUnarchivedExperiment(req, obj); err != nil {
78 r.Log.Error(err, "failed to archive experiment")
79
80 }
81
82 return ctrl.Result{}, nil
83 }
84
85
86 func (r *ChaosCollector) Setup(mgr ctrl.Manager, apiType client.Object) error {
87 r.apiType = apiType
88
89 return ctrl.NewControllerManagedBy(mgr).
90 For(apiType).
91 Complete(r)
92 }
93
94 func (r *ChaosCollector) setUnarchivedExperiment(req ctrl.Request, obj v1alpha1.InnerObject) error {
95 archive, err := convertInnerObjectToExperiment(obj)
96 if err != nil {
97 r.Log.Error(err, "failed to covert InnerObject")
98 return err
99 }
100
101 find, err := r.archive.FindByUID(context.Background(), archive.UID)
102 if err != nil && !gorm.IsRecordNotFoundError(err) {
103 r.Log.Error(err, "failed to find experiment", "UID", archive.UID)
104 return err
105 }
106
107 if find != nil {
108 archive.ID = find.ID
109 archive.CreatedAt = find.CreatedAt
110 archive.UpdatedAt = find.UpdatedAt
111 }
112
113 if err := r.archive.Set(context.Background(), archive); err != nil {
114 r.Log.Error(err, "failed to update experiment", "archive", archive)
115 return err
116 }
117
118 return nil
119 }
120
121 func (r *ChaosCollector) archiveExperiment(ns, name string) error {
122 if err := r.archive.Archive(context.Background(), ns, name); err != nil {
123 r.Log.Error(err, "failed to archive experiment", "namespace", ns, "name", name)
124 return err
125 }
126
127 return nil
128 }
129
130 func (r *ChaosCollector) deleteManagedExperiments(ns, name string) error {
131 archives, err := r.archive.FindManagedByNamespaceName(context.Background(), ns, name)
132 if gorm.IsRecordNotFoundError(err) {
133 return nil
134 }
135
136 if err != nil {
137 return err
138 }
139
140 for _, expr := range archives {
141 if err = r.event.DeleteByUID(context.Background(), expr.UID); err != nil {
142 r.Log.Error(err, "failed to delete experiment related events")
143 }
144
145 if err = r.archive.Delete(context.Background(), expr); err != nil {
146 r.Log.Error(err, "failed to delete managed experiment")
147 }
148 }
149
150 return nil
151 }
152
153 func convertInnerObjectToExperiment(obj v1alpha1.InnerObject) (*core.Experiment, error) {
154 chaosMeta, ok := obj.(metav1.Object)
155 if !ok {
156 return nil, errors.New("chaos meta information not found")
157 }
158 UID := string(chaosMeta.GetUID())
159
160 archive := &core.Experiment{
161 ExperimentMeta: core.ExperimentMeta{
162 Namespace: chaosMeta.GetNamespace(),
163 Name: chaosMeta.GetName(),
164 Kind: obj.GetObjectKind().GroupVersionKind().Kind,
165 UID: UID,
166 Archived: false,
167 },
168 }
169
170 switch chaos := obj.(type) {
171 case *v1alpha1.PodChaos:
172 archive.Action = string(chaos.Spec.Action)
173 case *v1alpha1.NetworkChaos:
174 archive.Action = string(chaos.Spec.Action)
175 case *v1alpha1.IOChaos:
176 archive.Action = string(chaos.Spec.Action)
177 case *v1alpha1.TimeChaos, *v1alpha1.KernelChaos, *v1alpha1.StressChaos, *v1alpha1.HTTPChaos:
178 archive.Action = ""
179 case *v1alpha1.DNSChaos:
180 archive.Action = string(chaos.Spec.Action)
181 case *v1alpha1.PhysicalMachineChaos:
182 archive.Action = string(chaos.Spec.Action)
183 case *v1alpha1.AWSChaos:
184 archive.Action = string(chaos.Spec.Action)
185 case *v1alpha1.GCPChaos:
186 archive.Action = string(chaos.Spec.Action)
187 case *v1alpha1.JVMChaos:
188 archive.Action = string(chaos.Spec.Action)
189 case *v1alpha1.BlockChaos:
190 archive.Action = string(chaos.Spec.Action)
191 default:
192 return nil, errors.New("unsupported chaos type " + archive.Kind)
193 }
194
195 archive.StartTime = chaosMeta.GetCreationTimestamp().Time
196 if chaosMeta.GetDeletionTimestamp() != nil {
197 archive.FinishTime = &chaosMeta.GetDeletionTimestamp().Time
198 }
199
200 data, err := json.Marshal(chaosMeta)
201 if err != nil {
202 return nil, err
203 }
204
205 archive.Experiment = string(data)
206
207 return archive, nil
208 }
209