1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package clusterregistry
17
18 import (
19 "context"
20 "os"
21 "sync"
22
23 fxlogr "github.com/chaos-mesh/fx-logr"
24 "github.com/go-logr/logr"
25 "github.com/pkg/errors"
26 "go.uber.org/fx"
27 "k8s.io/apimachinery/pkg/runtime"
28 "k8s.io/client-go/rest"
29 ctrl "sigs.k8s.io/controller-runtime"
30 "sigs.k8s.io/controller-runtime/pkg/client"
31 controllermetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
32 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
33
34 "github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager/provider"
35 "github.com/chaos-mesh/chaos-mesh/controllers/config"
36 "github.com/chaos-mesh/chaos-mesh/controllers/multicluster/remotechaosmonitor"
37 "github.com/chaos-mesh/chaos-mesh/controllers/types"
38 )
39
40 type remoteCluster struct {
41 app *fx.App
42
43 client.Client
44 }
45
46
47
48
49
50
51 type RemoteClusterRegistry struct {
52 clusters map[string]*remoteCluster
53 logger logr.Logger
54 client client.Client
55
56 lock *sync.Mutex
57 }
58
59 func New(logger logr.Logger, client client.Client) *RemoteClusterRegistry {
60 return &RemoteClusterRegistry{
61 clusters: make(map[string]*remoteCluster),
62 logger: logger.WithName("clusterregistry"),
63 client: client,
64
65 lock: &sync.Mutex{},
66 }
67 }
68
69
70 func run(lc fx.Lifecycle, mgr ctrl.Manager, logger logr.Logger) error {
71
72 executionCtx, cancel := context.WithCancel(context.TODO())
73 stopChan := make(chan struct{})
74
75 go func() {
76 setupLog := logger.WithName("setup")
77 setupLog.Info("Starting manager")
78
79 if err := mgr.Start(executionCtx); err != nil {
80 setupLog.Error(err, "unable to start manager")
81 os.Exit(1)
82 }
83
84 stopChan <- struct{}{}
85 }()
86
87 lc.Append(fx.Hook{
88 OnStop: func(ctx context.Context) error {
89 cancel()
90 <-stopChan
91 return nil
92 },
93 })
94
95 return nil
96 }
97
98
99 func controllerManagerOption(scheme *runtime.Scheme) *ctrl.Options {
100 options := ctrl.Options{
101
102 Scheme: scheme,
103 Metrics: metricsserver.Options{
104 BindAddress: "0",
105 },
106
107 LeaderElection: false,
108 RetryPeriod: &config.ControllerCfg.LeaderElectRetryPeriod,
109 RenewDeadline: &config.ControllerCfg.LeaderElectRenewDeadline,
110 }
111
112
113
114 return &options
115 }
116
117
118
119
120
121
122 func (r *RemoteClusterRegistry) WithClient(name string, f func(c client.Client) error) error {
123 r.lock.Lock()
124 defer r.lock.Unlock()
125
126 cluster, ok := r.clusters[name]
127 if !ok {
128 return errors.Wrapf(ErrNotExist, "lookup cluster: %s", name)
129 }
130
131 return f(cluster.Client)
132 }
133
134
135 func (r *RemoteClusterRegistry) Stop(ctx context.Context, name string) error {
136 r.lock.Lock()
137 defer r.lock.Unlock()
138
139 cluster, ok := r.clusters[name]
140 if !ok {
141 return errors.Wrapf(ErrNotExist, "lookup cluster: %s", name)
142 }
143
144 err := cluster.app.Stop(ctx)
145 if err != nil {
146 return errors.Wrapf(err, "stop fx app: %s", name)
147 }
148 delete(r.clusters, name)
149
150 r.logger.Info("controller manager stopped", "name", name)
151
152 return nil
153 }
154
155
156 func (r *RemoteClusterRegistry) Spawn(name string, config *rest.Config) error {
157 r.lock.Lock()
158 defer r.lock.Unlock()
159
160 if _, ok := r.clusters[name]; ok {
161 return errors.Wrapf(ErrAlreadyExist, "spawn cluster: %s", name)
162 }
163
164 remoteFxLogger := r.logger.WithName("remotecluster-fx-" + name)
165
166 localClient := r.client
167 var remoteClient client.Client
168 app := fx.New(
169 fx.WithLogger(fxlogr.WithLogr(&remoteFxLogger)),
170 fx.Supply(controllermetrics.Registry),
171 fx.Supply(r.logger.WithName("remotecluster-"+name)),
172 fx.Supply(config),
173 fx.Supply(fx.Annotated{
174 Name: "cluster-name",
175 Target: name,
176 }),
177 fx.Provide(
178 fx.Annotate(func() client.Client {
179 return localClient
180 }, fx.ResultTags(`name:"manage-client"`)),
181 ),
182 fx.Provide(
183 controllerManagerOption,
184 provider.NewClient,
185 provider.NewManager,
186 provider.NewScheme,
187 ),
188 fx.Option(types.ChaosObjects),
189
190
191 remotechaosmonitor.Module,
192 fx.Populate(&remoteClient),
193 fx.Invoke(run),
194 )
195
196 err := app.Start(context.TODO())
197 if err != nil {
198 return errors.Wrapf(err, "start controller-manager of remote cluster %s", name)
199 }
200
201 r.clusters[name] = &remoteCluster{
202 app: app,
203 Client: remoteClient,
204 }
205
206 return nil
207 }
208