1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package common
17
18 import (
19 "context"
20 "reflect"
21
22 "github.com/go-logr/logr"
23 "go.uber.org/fx"
24 k8sTypes "k8s.io/apimachinery/pkg/types"
25 ctrl "sigs.k8s.io/controller-runtime"
26 "sigs.k8s.io/controller-runtime/pkg/client"
27 "sigs.k8s.io/controller-runtime/pkg/event"
28 "sigs.k8s.io/controller-runtime/pkg/handler"
29 "sigs.k8s.io/controller-runtime/pkg/predicate"
30 "sigs.k8s.io/controller-runtime/pkg/reconcile"
31
32 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
33 chaosimpltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
34 "github.com/chaos-mesh/chaos-mesh/controllers/common/pipeline"
35 "github.com/chaos-mesh/chaos-mesh/controllers/config"
36 "github.com/chaos-mesh/chaos-mesh/controllers/types"
37 "github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
38 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
39 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
40 "github.com/chaos-mesh/chaos-mesh/pkg/selector"
41 )
42
43 type Params struct {
44 fx.In
45
46 Mgr ctrl.Manager
47 Client client.Client
48 Logger logr.Logger
49 Selector *selector.Selector
50 RecorderBuilder *recorder.RecorderBuilder
51 Impls []*chaosimpltypes.ChaosImplPair `group:"impl"`
52 Reader client.Reader `name:"no-cache"`
53 Steps []pipeline.PipelineStep
54 }
55
56 func Bootstrap(params Params) error {
57 logger := params.Logger
58 pairs := params.Impls
59 mgr := params.Mgr
60 kubeclient := params.Client
61 reader := params.Reader
62 selector := params.Selector
63 recorderBuilder := params.RecorderBuilder
64
65 setupLog := logger.WithName("setup-common")
66 for _, pair := range pairs {
67 name := pair.Name + "-records"
68 if !config.ShouldSpawnController(name) {
69 return nil
70 }
71
72 setupLog.Info("setting up controller", "resource-name", pair.Name)
73
74 builder := builder.Default(mgr).
75 For(pair.Object).
76 Named(pair.Name + "-pipeline")
77
78
79
80 predicaters := []predicate.Predicate{StatusRecordEventsChangePredicate{}}
81
82
83 if len(pair.Controlls) > 0 {
84 pair := pair
85 for _, obj := range pair.Controlls {
86 builder.Watches(obj,
87 handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
88 reqs := []reconcile.Request{}
89 objName := k8sTypes.NamespacedName{
90 Namespace: obj.GetNamespace(),
91 Name: obj.GetName(),
92 }
93
94 list := pair.ObjectList.DeepCopyList()
95 err := kubeclient.List(context.TODO(), list)
96 if err != nil {
97 setupLog.Error(err, "fail to list object")
98 }
99
100 items := reflect.ValueOf(list).Elem().FieldByName("Items")
101 for i := 0; i < items.Len(); i++ {
102 item := items.Index(i).Addr().Interface().(v1alpha1.InnerObjectWithSelector)
103 for _, record := range item.GetStatus().Experiment.Records {
104 namespacedName, err := controller.ParseNamespacedName(record.Id)
105 if err != nil {
106 setupLog.Error(err, "failed to parse record", "record", record.Id)
107 continue
108 }
109 if namespacedName == objName {
110 id := k8sTypes.NamespacedName{
111 Namespace: item.GetNamespace(),
112 Name: item.GetName(),
113 }
114 setupLog.Info("mapping requests", "source", objName, "target", id)
115 reqs = append(reqs, reconcile.Request{
116 NamespacedName: id,
117 })
118 }
119 }
120 }
121 return reqs
122 }),
123 )
124 }
125 predicaters = append(predicaters, PickChildCRDPredicate{})
126 }
127
128 pipe := pipeline.NewPipeline(&pipeline.PipelineContext{
129 Logger: logger,
130 Object: &types.Object{
131 Name: pair.Name,
132 Object: pair.Object,
133 },
134 Impl: pair.Impl,
135 Mgr: mgr,
136 Client: kubeclient,
137 Reader: reader,
138 RecorderBuilder: recorderBuilder,
139 Selector: selector,
140 })
141
142 pipe.AddSteps(params.Steps...)
143 builder = builder.WithEventFilter(predicate.And(predicate.Or(predicaters...), RemoteChaosPredicate{}))
144 err := builder.Complete(pipe)
145 if err != nil {
146 return err
147 }
148
149 }
150
151 return nil
152 }
153
154
155
156
157
158 type PickChildCRDPredicate struct {
159 predicate.Funcs
160 }
161
162
163 func (PickChildCRDPredicate) Update(e event.UpdateEvent) bool {
164 switch e.ObjectNew.(type) {
165 case *v1alpha1.PodHttpChaos, *v1alpha1.PodIOChaos, *v1alpha1.PodNetworkChaos:
166 return true
167 }
168 return false
169 }
170
171
172
173 type StatusRecordEventsChangePredicate struct {
174 predicate.Funcs
175 }
176
177
178
179 func (StatusRecordEventsChangePredicate) Update(e event.UpdateEvent) bool {
180 objNew, ok := e.ObjectNew.DeepCopyObject().(v1alpha1.StatefulObject)
181 if !ok {
182 return false
183 }
184 objOld, ok := e.ObjectOld.DeepCopyObject().(v1alpha1.StatefulObject)
185 if !ok {
186 return false
187 }
188 statusNew := objNew.GetStatus()
189 statusOld := objOld.GetStatus()
190 if statusNew == nil || statusOld == nil {
191 return true
192 }
193 objNew.SetGeneration(0)
194 objOld.SetGeneration(0)
195 objNew.SetResourceVersion("")
196 objOld.SetResourceVersion("")
197 for i := range statusNew.Experiment.Records {
198 statusNew.Experiment.Records[i].Events = nil
199 }
200 for i := range statusOld.Experiment.Records {
201 statusOld.Experiment.Records[i].Events = nil
202 }
203 return !reflect.DeepEqual(objNew, objOld)
204 }
205
206 type RemoteChaosPredicate struct {
207 predicate.Funcs
208 }
209
210 func (RemoteChaosPredicate) Create(e event.CreateEvent) bool {
211 obj, ok := e.Object.DeepCopyObject().(v1alpha1.RemoteObject)
212 if !ok {
213 return true
214 }
215
216 if obj.GetRemoteCluster() == "" {
217 return true
218 }
219
220 return false
221 }
222
223 func (RemoteChaosPredicate) Update(e event.UpdateEvent) bool {
224 obj, ok := e.ObjectNew.DeepCopyObject().(v1alpha1.RemoteObject)
225 if !ok {
226 return true
227 }
228
229 if obj.GetRemoteCluster() == "" {
230 return true
231 }
232
233 return false
234 }
235
236 func (RemoteChaosPredicate) Delete(e event.DeleteEvent) bool {
237 obj, ok := e.Object.DeepCopyObject().(v1alpha1.RemoteObject)
238 if !ok {
239 return true
240 }
241
242 if obj.GetRemoteCluster() == "" {
243 return true
244 }
245
246 return false
247 }
248
249 func (RemoteChaosPredicate) Generic(e event.GenericEvent) bool {
250 obj, ok := e.Object.DeepCopyObject().(v1alpha1.RemoteObject)
251 if !ok {
252 return true
253 }
254
255 if obj.GetRemoteCluster() == "" {
256 return true
257 }
258
259 return false
260 }
261