1
2
3
4
5
6
7
8
9
10
11
12
13
14 package provider
15
16 import (
17 "math"
18
19 "github.com/go-logr/logr"
20 lru "github.com/hashicorp/golang-lru"
21 "go.uber.org/fx"
22
23 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
24 "github.com/chaos-mesh/chaos-mesh/controllers/config"
25
26 "k8s.io/apimachinery/pkg/runtime"
27 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
28 authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
29 "k8s.io/client-go/rest"
30 "k8s.io/client-go/tools/record"
31 ctrl "sigs.k8s.io/controller-runtime"
32 "sigs.k8s.io/controller-runtime/pkg/cache"
33 "sigs.k8s.io/controller-runtime/pkg/client"
34 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
35 )
36
37 var (
38 scheme = runtime.NewScheme()
39 )
40
41 func init() {
42 _ = clientgoscheme.AddToScheme(scheme)
43
44 _ = v1alpha1.AddToScheme(scheme)
45
46 }
47
48 func NewScheme() *runtime.Scheme {
49 return scheme
50 }
51
52 func NewOption(logger logr.Logger) *ctrl.Options {
53 setupLog := logger.WithName("setup")
54
55 options := ctrl.Options{
56 Scheme: scheme,
57 MetricsBindAddress: config.ControllerCfg.MetricsAddr,
58 LeaderElection: config.ControllerCfg.EnableLeaderElection,
59 Port: 9443,
60
61 EventBroadcaster: record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{
62 MaxEvents: math.MaxInt32,
63 MaxIntervalInSeconds: 1,
64 }),
65 }
66
67 if config.ControllerCfg.ClusterScoped {
68 setupLog.Info("Chaos controller manager is running in cluster scoped mode.")
69
70 } else {
71 setupLog.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", config.ControllerCfg.TargetNamespace)
72 options.Namespace = config.ControllerCfg.TargetNamespace
73 }
74
75 return &options
76 }
77
78 func NewConfig() *rest.Config {
79 return ctrl.GetConfigOrDie()
80 }
81
82 func NewManager(options *ctrl.Options, cfg *rest.Config) (ctrl.Manager, error) {
83 if config.ControllerCfg.QPS > 0 {
84 cfg.QPS = config.ControllerCfg.QPS
85 }
86 if config.ControllerCfg.Burst > 0 {
87 cfg.Burst = config.ControllerCfg.Burst
88 }
89
90 return ctrl.NewManager(cfg, *options)
91 }
92
93 func NewAuthCli(cfg *rest.Config) (*authorizationv1.AuthorizationV1Client, error) {
94
95 if config.ControllerCfg.QPS > 0 {
96 cfg.QPS = config.ControllerCfg.QPS
97 }
98 if config.ControllerCfg.Burst > 0 {
99 cfg.Burst = config.ControllerCfg.Burst
100 }
101
102 return authorizationv1.NewForConfig(cfg)
103 }
104
105 func NewClient(mgr ctrl.Manager, scheme *runtime.Scheme) (client.Client, error) {
106
107 cache, err := lru.New(100)
108 if err != nil {
109 return nil, err
110 }
111 return &UpdatedClient{
112 client: mgr.GetClient(),
113 scheme: scheme,
114 cache: cache,
115 }, nil
116 }
117
118 func NewLogger() logr.Logger {
119 return ctrl.Log
120 }
121
122 type noCacheReader struct {
123 fx.Out
124
125 client.Reader `name:"no-cache"`
126 }
127
128 func NewNoCacheReader(mgr ctrl.Manager) noCacheReader {
129 return noCacheReader{
130 Reader: mgr.GetAPIReader(),
131 }
132 }
133
134 type globalCacheReader struct {
135 fx.Out
136
137 client.Reader `name:"global-cache"`
138 }
139
140 func NewGlobalCacheReader(mgr ctrl.Manager) globalCacheReader {
141 return globalCacheReader{
142 Reader: mgr.GetClient(),
143 }
144 }
145
146 type controlPlaneCacheReader struct {
147 fx.Out
148
149 client.Reader `name:"control-plane-cache"`
150 }
151
152 func NewControlPlaneCacheReader(logger logr.Logger) (controlPlaneCacheReader, error) {
153 cfg := ctrl.GetConfigOrDie()
154
155 mapper, err := apiutil.NewDynamicRESTMapper(cfg)
156 if err != nil {
157 return controlPlaneCacheReader{}, err
158 }
159
160 scheme := runtime.NewScheme()
161 _ = clientgoscheme.AddToScheme(scheme)
162
163
164 cache, err := cache.New(cfg, cache.Options{Scheme: scheme, Mapper: mapper, Resync: nil, Namespace: config.ControllerCfg.Namespace})
165 if err != nil {
166 return controlPlaneCacheReader{}, err
167 }
168
169 go func() {
170 err := cache.Start(make(chan struct{}))
171 if err != nil {
172 logger.Error(err, "fail to start cached client")
173 }
174 }()
175
176 c, err := client.New(cfg, client.Options{Scheme: scheme, Mapper: mapper})
177 if err != nil {
178 return controlPlaneCacheReader{}, err
179 }
180
181 cachedClient := &client.DelegatingClient{
182 Reader: &client.DelegatingReader{
183 CacheReader: cache,
184 ClientReader: c,
185 },
186 Writer: c,
187 StatusClient: c,
188 }
189
190 return controlPlaneCacheReader{
191 Reader: cachedClient,
192 }, nil
193 }
194
195 var Module = fx.Provide(
196 NewOption,
197 NewClient,
198 NewManager,
199 NewLogger,
200 NewAuthCli,
201 NewScheme,
202 NewConfig,
203 NewNoCacheReader,
204 NewGlobalCacheReader,
205 NewControlPlaneCacheReader,
206 )
207