...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/multicluster/clusterregistry/registry.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/multicluster/clusterregistry

     1  // Copyright 2022 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package clusterregistry
    17  
    18  import (
    19  	"context"
    20  	"os"
    21  	"sync"
    22  
    23  	fxlogr "github.com/chaos-mesh/fx-logr"
    24  	"github.com/go-logr/logr"
    25  	"github.com/pkg/errors"
    26  	"go.uber.org/fx"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	"k8s.io/client-go/rest"
    29  	ctrl "sigs.k8s.io/controller-runtime"
    30  	"sigs.k8s.io/controller-runtime/pkg/client"
    31  	controllermetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
    32  	metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
    33  
    34  	"github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager/provider"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    36  	"github.com/chaos-mesh/chaos-mesh/controllers/multicluster/remotechaosmonitor"
    37  	"github.com/chaos-mesh/chaos-mesh/controllers/types"
    38  )
    39  
    40  type remoteCluster struct {
    41  	app *fx.App
    42  
    43  	client.Client
    44  }
    45  
    46  // RemoteClusterRegistry will manage all controllers running on a remote
    47  // cluster. The construction of these controllers (managers) will be managed by
    48  // `fx`. The main process of constructing a controller manage is nearly the same
    49  // with the main one. The only difference is that we'll need to provide a new
    50  // `RestConfig`, and `Populate` the client to allow others to use its client.
    51  type RemoteClusterRegistry struct {
    52  	clusters map[string]*remoteCluster
    53  	logger   logr.Logger
    54  	client   client.Client
    55  
    56  	lock *sync.Mutex
    57  }
    58  
    59  func New(logger logr.Logger, client client.Client) *RemoteClusterRegistry {
    60  	return &RemoteClusterRegistry{
    61  		clusters: make(map[string]*remoteCluster),
    62  		logger:   logger.WithName("clusterregistry"),
    63  		client:   client,
    64  
    65  		lock: &sync.Mutex{},
    66  	}
    67  }
    68  
    69  // run will start the controller manager of the remote cluster
    70  func run(lc fx.Lifecycle, mgr ctrl.Manager, logger logr.Logger) error {
    71  	// TODO: use the global signal context with a cancel function
    72  	executionCtx, cancel := context.WithCancel(context.TODO())
    73  	stopChan := make(chan struct{})
    74  
    75  	go func() {
    76  		setupLog := logger.WithName("setup")
    77  		setupLog.Info("Starting manager")
    78  
    79  		if err := mgr.Start(executionCtx); err != nil {
    80  			setupLog.Error(err, "unable to start manager")
    81  			os.Exit(1)
    82  		}
    83  
    84  		stopChan <- struct{}{}
    85  	}()
    86  
    87  	lc.Append(fx.Hook{
    88  		OnStop: func(ctx context.Context) error {
    89  			cancel()
    90  			<-stopChan
    91  			return nil
    92  		},
    93  	})
    94  
    95  	return nil
    96  }
    97  
    98  // TODO: unify this option with global provider.NewOption
    99  func controllerManagerOption(scheme *runtime.Scheme) *ctrl.Options {
   100  	options := ctrl.Options{
   101  		// TODO: accept the schema from parameter instead of using scheme directly
   102  		Scheme: scheme,
   103  		Metrics: metricsserver.Options{
   104  			BindAddress: "0",
   105  		},
   106  		// TODO: enable leader election
   107  		LeaderElection: false,
   108  		RetryPeriod:    &config.ControllerCfg.LeaderElectRetryPeriod,
   109  		RenewDeadline:  &config.ControllerCfg.LeaderElectRenewDeadline,
   110  	}
   111  
   112  	// TODO: consider the cluster scope / namespace scope with multi-cluster
   113  
   114  	return &options
   115  }
   116  
   117  // WithClient enables developer getting a client of remote cluster to operate
   118  // inside the remote cluster.
   119  //
   120  // TODO: add more kinds of client, like `no-cache` into this registry, if they
   121  // are needed
   122  func (r *RemoteClusterRegistry) WithClient(name string, f func(c client.Client) error) error {
   123  	r.lock.Lock()
   124  	defer r.lock.Unlock()
   125  
   126  	cluster, ok := r.clusters[name]
   127  	if !ok {
   128  		return errors.Wrapf(ErrNotExist, "lookup cluster: %s", name)
   129  	}
   130  
   131  	return f(cluster.Client)
   132  }
   133  
   134  // Stop stops the running controller-manager which watches the remote cluster.
   135  func (r *RemoteClusterRegistry) Stop(ctx context.Context, name string) error {
   136  	r.lock.Lock()
   137  	defer r.lock.Unlock()
   138  
   139  	cluster, ok := r.clusters[name]
   140  	if !ok {
   141  		return errors.Wrapf(ErrNotExist, "lookup cluster: %s", name)
   142  	}
   143  
   144  	err := cluster.app.Stop(ctx)
   145  	if err != nil {
   146  		return errors.Wrapf(err, "stop fx app: %s", name)
   147  	}
   148  	delete(r.clusters, name)
   149  
   150  	r.logger.Info("controller manager stopped", "name", name)
   151  
   152  	return nil
   153  }
   154  
   155  // Spawn starts the controller-manager and watches the remote cluster
   156  func (r *RemoteClusterRegistry) Spawn(name string, config *rest.Config) error {
   157  	r.lock.Lock()
   158  	defer r.lock.Unlock()
   159  
   160  	if _, ok := r.clusters[name]; ok {
   161  		return errors.Wrapf(ErrAlreadyExist, "spawn cluster: %s", name)
   162  	}
   163  
   164  	remoteFxLogger := r.logger.WithName("remotecluster-fx-" + name)
   165  
   166  	localClient := r.client
   167  	var remoteClient client.Client
   168  	app := fx.New(
   169  		fx.WithLogger(fxlogr.WithLogr(&remoteFxLogger)),
   170  		fx.Supply(controllermetrics.Registry),
   171  		fx.Supply(r.logger.WithName("remotecluster-"+name)),
   172  		fx.Supply(config),
   173  		fx.Supply(fx.Annotated{
   174  			Name:   "cluster-name",
   175  			Target: name,
   176  		}),
   177  		fx.Provide(
   178  			fx.Annotate(func() client.Client {
   179  				return localClient
   180  			}, fx.ResultTags(`name:"manage-client"`)),
   181  		),
   182  		fx.Provide(
   183  			controllerManagerOption,
   184  			provider.NewClient,
   185  			provider.NewManager,
   186  			provider.NewScheme,
   187  		),
   188  		fx.Option(types.ChaosObjects),
   189  		// more reconcilers can be listed here to add themselves to the
   190  		// controller manager
   191  		remotechaosmonitor.Module,
   192  		fx.Populate(&remoteClient),
   193  		fx.Invoke(run),
   194  	)
   195  
   196  	err := app.Start(context.TODO())
   197  	if err != nil {
   198  		return errors.Wrapf(err, "start controller-manager of remote cluster %s", name)
   199  	}
   200  
   201  	r.clusters[name] = &remoteCluster{
   202  		app:    app,
   203  		Client: remoteClient,
   204  	}
   205  
   206  	return nil
   207  }
   208