...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package common
17
18 import (
19 "context"
20 "reflect"
21
22 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
23
24 "github.com/go-logr/logr"
25 "go.uber.org/fx"
26 k8sTypes "k8s.io/apimachinery/pkg/types"
27 ctrl "sigs.k8s.io/controller-runtime"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29 "sigs.k8s.io/controller-runtime/pkg/handler"
30 "sigs.k8s.io/controller-runtime/pkg/reconcile"
31 "sigs.k8s.io/controller-runtime/pkg/source"
32
33 chaosimpltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
34 "github.com/chaos-mesh/chaos-mesh/controllers/common/pipeline"
35 "github.com/chaos-mesh/chaos-mesh/controllers/config"
36 "github.com/chaos-mesh/chaos-mesh/controllers/types"
37 "github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
38 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
39 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
40 "github.com/chaos-mesh/chaos-mesh/pkg/selector"
41 )
42
43 type Params struct {
44 fx.In
45
46 Mgr ctrl.Manager
47 Client client.Client
48 Logger logr.Logger
49 Selector *selector.Selector
50 RecorderBuilder *recorder.RecorderBuilder
51 Impls []*chaosimpltypes.ChaosImplPair `group:"impl"`
52 Reader client.Reader `name:"no-cache"`
53 Steps []pipeline.PipelineStep
54 }
55
56 func Bootstrap(params Params) error {
57 logger := params.Logger
58 pairs := params.Impls
59 mgr := params.Mgr
60 kubeclient := params.Client
61 reader := params.Reader
62 selector := params.Selector
63 recorderBuilder := params.RecorderBuilder
64
65 setupLog := logger.WithName("setup-common")
66 for _, pair := range pairs {
67 name := pair.Name + "-records"
68 if !config.ShouldSpawnController(name) {
69 return nil
70 }
71
72 setupLog.Info("setting up controller", "resource-name", pair.Name)
73
74 builder := builder.Default(mgr).
75 For(pair.Object).
76 Named(pair.Name + "-pipeline")
77
78
79 if len(pair.Controlls) > 0 {
80 pair := pair
81 for _, obj := range pair.Controlls {
82 builder.Watches(&source.Kind{
83 Type: obj,
84 },
85 handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
86 reqs := []reconcile.Request{}
87 objName := k8sTypes.NamespacedName{
88 Namespace: obj.GetNamespace(),
89 Name: obj.GetName(),
90 }
91
92 list := pair.ObjectList.DeepCopyList()
93 err := kubeclient.List(context.TODO(), list)
94 if err != nil {
95 setupLog.Error(err, "fail to list object")
96 }
97
98 items := reflect.ValueOf(list).Elem().FieldByName("Items")
99 for i := 0; i < items.Len(); i++ {
100 item := items.Index(i).Addr().Interface().(v1alpha1.InnerObjectWithSelector)
101 for _, record := range item.GetStatus().Experiment.Records {
102 namespacedName, err := controller.ParseNamespacedName(record.Id)
103 if err != nil {
104 setupLog.Error(err, "failed to parse record", "record", record.Id)
105 continue
106 }
107 if namespacedName == objName {
108 id := k8sTypes.NamespacedName{
109 Namespace: item.GetNamespace(),
110 Name: item.GetName(),
111 }
112 setupLog.Info("mapping requests", "source", objName, "target", id)
113 reqs = append(reqs, reconcile.Request{
114 NamespacedName: id,
115 })
116 }
117 }
118 }
119 return reqs
120 }),
121 )
122 }
123 }
124
125 pipe := pipeline.NewPipeline(&pipeline.PipelineContext{
126 Logger: logger,
127 Object: &types.Object{
128 Name: pair.Name,
129 Object: pair.Object,
130 },
131 Impl: pair.Impl,
132 Mgr: mgr,
133 Client: kubeclient,
134 Reader: reader,
135 RecorderBuilder: recorderBuilder,
136 Selector: selector,
137 })
138
139 pipe.AddSteps(params.Steps...)
140 err := builder.Complete(pipe)
141 if err != nil {
142 return err
143 }
144
145 }
146
147 return nil
148 }
149