1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package provider
17
18 import (
19 "context"
20 "math"
21 "net"
22 "strconv"
23
24 "github.com/go-logr/logr"
25 lru "github.com/hashicorp/golang-lru/v2"
26 "go.uber.org/fx"
27 "k8s.io/apimachinery/pkg/runtime"
28 "k8s.io/client-go/kubernetes"
29 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
30 authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
31 "k8s.io/client-go/rest"
32 "k8s.io/client-go/tools/record"
33 ctrl "sigs.k8s.io/controller-runtime"
34 "sigs.k8s.io/controller-runtime/pkg/cache"
35 "sigs.k8s.io/controller-runtime/pkg/client"
36 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
37 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
38 "sigs.k8s.io/controller-runtime/pkg/webhook"
39
40 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
41 "github.com/chaos-mesh/chaos-mesh/controllers/config"
42 )
43
44 var (
45 scheme = runtime.NewScheme()
46 )
47
48 func init() {
49 _ = clientgoscheme.AddToScheme(scheme)
50
51 _ = v1alpha1.AddToScheme(scheme)
52
53 }
54
55
56 func NewScheme() *runtime.Scheme {
57 return scheme
58 }
59
60
61 func NewOption(logger logr.Logger, scheme *runtime.Scheme) *ctrl.Options {
62 setupLog := logger.WithName("setup")
63
64 leaderElectionNamespace := config.ControllerCfg.Namespace
65 if len(leaderElectionNamespace) == 0 {
66 leaderElectionNamespace = "default"
67 }
68
69 options := ctrl.Options{
70
71 Scheme: scheme,
72 Metrics: metricsserver.Options{
73 BindAddress: net.JoinHostPort(config.ControllerCfg.MetricsHost, strconv.Itoa(config.ControllerCfg.MetricsPort)),
74 },
75 LeaderElection: config.ControllerCfg.EnableLeaderElection,
76 LeaderElectionNamespace: leaderElectionNamespace,
77 LeaderElectionResourceLock: "leases",
78 LeaderElectionID: "chaos-mesh",
79 LeaseDuration: &config.ControllerCfg.LeaderElectLeaseDuration,
80 RetryPeriod: &config.ControllerCfg.LeaderElectRetryPeriod,
81 RenewDeadline: &config.ControllerCfg.LeaderElectRenewDeadline,
82
83 EventBroadcaster: record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{
84 MaxEvents: math.MaxInt32,
85 MaxIntervalInSeconds: 1,
86 }),
87 WebhookServer: webhook.NewServer(webhook.Options{
88 Host: config.ControllerCfg.WebhookHost,
89 Port: config.ControllerCfg.WebhookPort,
90 CertDir: config.ControllerCfg.CertsDir,
91 }),
92 }
93
94 if config.ControllerCfg.ClusterScoped {
95 setupLog.Info("Chaos controller manager is running in cluster scoped mode.")
96
97 } else {
98 setupLog.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", config.ControllerCfg.TargetNamespace)
99 options.NewCache = func(cfg *rest.Config, opts cache.Options) (cache.Cache, error) {
100 opts.DefaultNamespaces = map[string]cache.Config{
101 config.ControllerCfg.TargetNamespace: {},
102 }
103 return cache.New(cfg, opts)
104 }
105 }
106
107 return &options
108 }
109
110
111 func NewConfig() *rest.Config {
112 return ctrl.GetConfigOrDie()
113 }
114
115
116 func NewManager(options *ctrl.Options, cfg *rest.Config) (ctrl.Manager, error) {
117 if config.ControllerCfg.QPS > 0 {
118 cfg.QPS = config.ControllerCfg.QPS
119 }
120 if config.ControllerCfg.Burst > 0 {
121 cfg.Burst = config.ControllerCfg.Burst
122 }
123
124 return ctrl.NewManager(cfg, *options)
125 }
126
127
128 func NewAuthCli(cfg *rest.Config) (*authorizationv1.AuthorizationV1Client, error) {
129
130 if config.ControllerCfg.QPS > 0 {
131 cfg.QPS = config.ControllerCfg.QPS
132 }
133 if config.ControllerCfg.Burst > 0 {
134 cfg.Burst = config.ControllerCfg.Burst
135 }
136
137 return authorizationv1.NewForConfig(cfg)
138 }
139
140
141 func NewClient(mgr ctrl.Manager, scheme *runtime.Scheme) (client.Client, error) {
142
143 cache, err := lru.New[string, runtime.Object](100)
144 if err != nil {
145 return nil, err
146 }
147 return &UpdatedClient{
148 client: mgr.GetClient(),
149 scheme: scheme,
150 cache: cache,
151 }, nil
152 }
153
154 type noCacheReader struct {
155 fx.Out
156
157 client.Reader `name:"no-cache"`
158 }
159
160
161
162 func NewNoCacheReader(mgr ctrl.Manager) noCacheReader {
163 return noCacheReader{
164 Reader: mgr.GetAPIReader(),
165 }
166 }
167
168 type controlPlaneCacheReader struct {
169 fx.Out
170
171 client.Reader `name:"control-plane-cache"`
172 }
173
174
175 func NewControlPlaneCacheReader(logger logr.Logger, cfg *rest.Config) (controlPlaneCacheReader, error) {
176 httpClient, err := rest.HTTPClientFor(cfg)
177 if err != nil {
178 return controlPlaneCacheReader{}, err
179 }
180 mapper, err := apiutil.NewDynamicRESTMapper(cfg, httpClient)
181 if err != nil {
182 return controlPlaneCacheReader{}, err
183 }
184
185 scheme := runtime.NewScheme()
186 _ = clientgoscheme.AddToScheme(scheme)
187
188
189 cacheReader, err := cache.New(cfg, cache.Options{
190 Scheme: scheme,
191 Mapper: mapper,
192 DefaultNamespaces: map[string]cache.Config{
193 config.ControllerCfg.Namespace: {},
194 },
195 })
196 if err != nil {
197 return controlPlaneCacheReader{}, err
198 }
199
200
201 go func() {
202
203 err := cacheReader.Start(context.TODO())
204 if err != nil {
205 logger.Error(err, "fail to start cached client")
206 }
207 }()
208
209 c, err := client.New(cfg, client.Options{Scheme: scheme, Mapper: mapper, Cache: &client.CacheOptions{
210 Reader: cacheReader,
211 DisableFor: nil,
212 Unstructured: false,
213 }})
214 if err != nil {
215 return controlPlaneCacheReader{}, err
216 }
217
218 return controlPlaneCacheReader{
219 Reader: c,
220 }, nil
221 }
222
223 func NewClientSet(config *rest.Config) (*kubernetes.Clientset, error) {
224 return kubernetes.NewForConfig(config)
225 }
226
227
228 var Module = fx.Provide(
229 NewOption,
230 NewClient,
231 NewClientSet,
232 NewManager,
233 NewAuthCli,
234 NewScheme,
235 NewConfig,
236 NewNoCacheReader,
237 NewControlPlaneCacheReader,
238 )
239