...
1
2
3
4
5
6
7
8
9
10
11
12
13
14 package common
15
16 import (
17 "context"
18 "reflect"
19
20 "github.com/go-logr/logr"
21 "go.uber.org/fx"
22 "k8s.io/apimachinery/pkg/runtime"
23 k8sTypes "k8s.io/apimachinery/pkg/types"
24 ctrl "sigs.k8s.io/controller-runtime"
25 "sigs.k8s.io/controller-runtime/pkg/client"
26 "sigs.k8s.io/controller-runtime/pkg/handler"
27 "sigs.k8s.io/controller-runtime/pkg/reconcile"
28 "sigs.k8s.io/controller-runtime/pkg/source"
29
30 "github.com/chaos-mesh/chaos-mesh/controllers/types"
31 "github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
32 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
33 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
34 "github.com/chaos-mesh/chaos-mesh/pkg/selector"
35 )
36
37 type ChaosImplPair struct {
38 Name string
39 Object InnerObjectWithSelector
40 Impl ChaosImpl
41
42 ObjectList runtime.Object
43 Controlls []runtime.Object
44 }
45
46 type Params struct {
47 fx.In
48
49 Mgr ctrl.Manager
50 Client client.Client
51 Logger logr.Logger
52 Selector *selector.Selector
53 RecorderBuilder *recorder.RecorderBuilder
54 Impls []*ChaosImplPair `group:"impl"`
55 Reader client.Reader `name:"no-cache"`
56 }
57
58 func NewController(params Params) (types.Controller, error) {
59 logger := params.Logger
60 pairs := params.Impls
61 mgr := params.Mgr
62 client := params.Client
63 reader := params.Reader
64 selector := params.Selector
65 recorderBuilder := params.RecorderBuilder
66
67 setupLog := logger.WithName("setup-common")
68 for _, pair := range pairs {
69 setupLog.Info("setting up controller", "resource-name", pair.Name)
70
71 builder := builder.Default(mgr).
72 For(pair.Object).
73 Named(pair.Name + "-records")
74
75
76 if len(pair.Controlls) > 0 {
77 pair := pair
78 for _, obj := range pair.Controlls {
79 builder = builder.Watches(&source.Kind{Type: obj}, &handler.EnqueueRequestsFromMapFunc{
80 ToRequests: handler.ToRequestsFunc(func(obj handler.MapObject) []reconcile.Request {
81 reqs := []reconcile.Request{}
82 objName := k8sTypes.NamespacedName{
83 Namespace: obj.Meta.GetNamespace(),
84 Name: obj.Meta.GetName(),
85 }
86
87 list := pair.ObjectList.DeepCopyObject()
88 err := client.List(context.TODO(), list)
89 if err != nil {
90 setupLog.Error(err, "fail to list object")
91 }
92
93 items := reflect.ValueOf(list).Elem().FieldByName("Items")
94 for i := 0; i < items.Len(); i++ {
95 item := items.Index(i).Addr().Interface().(InnerObjectWithSelector)
96 for _, record := range item.GetStatus().Experiment.Records {
97 if controller.ParseNamespacedName(record.Id) == objName {
98 id := k8sTypes.NamespacedName{
99 Namespace: item.GetObjectMeta().Namespace,
100 Name: item.GetObjectMeta().Name,
101 }
102 setupLog.Info("mapping requests", "source", objName, "target", id)
103 reqs = append(reqs, reconcile.Request{
104 NamespacedName: id,
105 })
106 }
107 }
108 }
109
110 return reqs
111 }),
112 })
113 }
114 }
115
116 err := builder.Complete(&Reconciler{
117 Impl: pair.Impl,
118 Object: pair.Object,
119 Client: client,
120 Reader: reader,
121 Recorder: recorderBuilder.Build("records"),
122 Selector: selector,
123 Log: logger.WithName("records"),
124 })
125 if err != nil {
126 return "", err
127 }
128
129 }
130
131 return "records", nil
132 }
133