...

Source file src/github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager/provider/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager/provider

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package provider
    17  
    18  import (
    19  	"context"
    20  	"math"
    21  	"net"
    22  	"strconv"
    23  
    24  	"github.com/go-logr/logr"
    25  	lru "github.com/hashicorp/golang-lru/v2"
    26  	"go.uber.org/fx"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	"k8s.io/client-go/kubernetes"
    29  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    30  	authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
    31  	"k8s.io/client-go/rest"
    32  	"k8s.io/client-go/tools/record"
    33  	ctrl "sigs.k8s.io/controller-runtime"
    34  	"sigs.k8s.io/controller-runtime/pkg/cache"
    35  	"sigs.k8s.io/controller-runtime/pkg/client"
    36  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    37  	metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
    38  	"sigs.k8s.io/controller-runtime/pkg/webhook"
    39  
    40  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    41  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    42  )
    43  
    44  var (
    45  	scheme = runtime.NewScheme()
    46  )
    47  
    48  func init() {
    49  	_ = clientgoscheme.AddToScheme(scheme)
    50  
    51  	_ = v1alpha1.AddToScheme(scheme)
    52  	// +kubebuilder:scaffold:scheme
    53  }
    54  
    55  // NewScheme returns the runtime.Scheme used by controller-runtime
    56  func NewScheme() *runtime.Scheme {
    57  	return scheme
    58  }
    59  
    60  // NewOption returns the manager.Options for build the controller-runtime Manager
    61  func NewOption(logger logr.Logger, scheme *runtime.Scheme) *ctrl.Options {
    62  	setupLog := logger.WithName("setup")
    63  
    64  	leaderElectionNamespace := config.ControllerCfg.Namespace
    65  	if len(leaderElectionNamespace) == 0 {
    66  		leaderElectionNamespace = "default"
    67  	}
    68  
    69  	options := ctrl.Options{
    70  		// TODO: accept the schema from parameter instead of using scheme directly
    71  		Scheme: scheme,
    72  		Metrics: metricsserver.Options{
    73  			BindAddress: net.JoinHostPort(config.ControllerCfg.MetricsHost, strconv.Itoa(config.ControllerCfg.MetricsPort)),
    74  		},
    75  		LeaderElection:             config.ControllerCfg.EnableLeaderElection,
    76  		LeaderElectionNamespace:    leaderElectionNamespace,
    77  		LeaderElectionResourceLock: "leases",
    78  		LeaderElectionID:           "chaos-mesh",
    79  		LeaseDuration:              &config.ControllerCfg.LeaderElectLeaseDuration,
    80  		RetryPeriod:                &config.ControllerCfg.LeaderElectRetryPeriod,
    81  		RenewDeadline:              &config.ControllerCfg.LeaderElectRenewDeadline,
    82  		// Don't aggregate events
    83  		EventBroadcaster: record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{
    84  			MaxEvents:            math.MaxInt32,
    85  			MaxIntervalInSeconds: 1,
    86  		}),
    87  		WebhookServer: webhook.NewServer(webhook.Options{
    88  			Host:    config.ControllerCfg.WebhookHost,
    89  			Port:    config.ControllerCfg.WebhookPort,
    90  			CertDir: config.ControllerCfg.CertsDir,
    91  		}),
    92  	}
    93  
    94  	if config.ControllerCfg.ClusterScoped {
    95  		setupLog.Info("Chaos controller manager is running in cluster scoped mode.")
    96  		// will not specific a certain namespace
    97  	} else {
    98  		setupLog.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", config.ControllerCfg.TargetNamespace)
    99  		options.NewCache = func(cfg *rest.Config, opts cache.Options) (cache.Cache, error) {
   100  			opts.DefaultNamespaces = map[string]cache.Config{
   101  				config.ControllerCfg.TargetNamespace: {},
   102  			}
   103  			return cache.New(cfg, opts)
   104  		}
   105  	}
   106  
   107  	return &options
   108  }
   109  
   110  // NewConfig would fetch the rest.Config from environment. When it failed to fetch config, it would exit the whole application.
   111  func NewConfig() *rest.Config {
   112  	return ctrl.GetConfigOrDie()
   113  }
   114  
   115  // NewManager would build the controller-runtime manager with the given parameters.
   116  func NewManager(options *ctrl.Options, cfg *rest.Config) (ctrl.Manager, error) {
   117  	if config.ControllerCfg.QPS > 0 {
   118  		cfg.QPS = config.ControllerCfg.QPS
   119  	}
   120  	if config.ControllerCfg.Burst > 0 {
   121  		cfg.Burst = config.ControllerCfg.Burst
   122  	}
   123  
   124  	return ctrl.NewManager(cfg, *options)
   125  }
   126  
   127  // NewAuthCli would build the authorizationv1.AuthorizationV1Client with given parameters.
   128  func NewAuthCli(cfg *rest.Config) (*authorizationv1.AuthorizationV1Client, error) {
   129  
   130  	if config.ControllerCfg.QPS > 0 {
   131  		cfg.QPS = config.ControllerCfg.QPS
   132  	}
   133  	if config.ControllerCfg.Burst > 0 {
   134  		cfg.Burst = config.ControllerCfg.Burst
   135  	}
   136  
   137  	return authorizationv1.NewForConfig(cfg)
   138  }
   139  
   140  // NewClient would build the controller-runtime client.Client with given parameters.
   141  func NewClient(mgr ctrl.Manager, scheme *runtime.Scheme) (client.Client, error) {
   142  	// TODO: make this size configurable
   143  	cache, err := lru.New[string, runtime.Object](100)
   144  	if err != nil {
   145  		return nil, err
   146  	}
   147  	return &UpdatedClient{
   148  		client: mgr.GetClient(),
   149  		scheme: scheme,
   150  		cache:  cache,
   151  	}, nil
   152  }
   153  
   154  type noCacheReader struct {
   155  	fx.Out
   156  
   157  	client.Reader `name:"no-cache"`
   158  }
   159  
   160  // NewNoCacheReader builds a client.Reader with no cache.
   161  // TODO: we could return with fx.Annotate instead of struct noCacheReader and magic name "no-cache"
   162  func NewNoCacheReader(mgr ctrl.Manager) noCacheReader {
   163  	return noCacheReader{
   164  		Reader: mgr.GetAPIReader(),
   165  	}
   166  }
   167  
   168  type controlPlaneCacheReader struct {
   169  	fx.Out
   170  
   171  	client.Reader `name:"control-plane-cache"`
   172  }
   173  
   174  // NewControlPlaneCacheReader builds a client.Reader with cache for certain usage for control plane
   175  func NewControlPlaneCacheReader(logger logr.Logger, cfg *rest.Config) (controlPlaneCacheReader, error) {
   176  	httpClient, err := rest.HTTPClientFor(cfg)
   177  	if err != nil {
   178  		return controlPlaneCacheReader{}, err
   179  	}
   180  	mapper, err := apiutil.NewDynamicRESTMapper(cfg, httpClient)
   181  	if err != nil {
   182  		return controlPlaneCacheReader{}, err
   183  	}
   184  
   185  	scheme := runtime.NewScheme()
   186  	_ = clientgoscheme.AddToScheme(scheme)
   187  
   188  	// Create the cache for the cached read client and registering informers
   189  	cacheReader, err := cache.New(cfg, cache.Options{
   190  		Scheme: scheme,
   191  		Mapper: mapper,
   192  		DefaultNamespaces: map[string]cache.Config{
   193  			config.ControllerCfg.Namespace: {},
   194  		},
   195  	})
   196  	if err != nil {
   197  		return controlPlaneCacheReader{}, err
   198  	}
   199  	// TODO: store the channel and use it to stop
   200  	// FIXME: goroutine leaks
   201  	go func() {
   202  		// FIXME: get context from parameter
   203  		err := cacheReader.Start(context.TODO())
   204  		if err != nil {
   205  			logger.Error(err, "fail to start cached client")
   206  		}
   207  	}()
   208  
   209  	c, err := client.New(cfg, client.Options{Scheme: scheme, Mapper: mapper, Cache: &client.CacheOptions{
   210  		Reader:       cacheReader,
   211  		DisableFor:   nil,
   212  		Unstructured: false,
   213  	}})
   214  	if err != nil {
   215  		return controlPlaneCacheReader{}, err
   216  	}
   217  
   218  	return controlPlaneCacheReader{
   219  		Reader: c,
   220  	}, nil
   221  }
   222  
   223  func NewClientSet(config *rest.Config) (*kubernetes.Clientset, error) {
   224  	return kubernetes.NewForConfig(config)
   225  }
   226  
   227  // Module would provide objects to fx for dependency injection.
   228  var Module = fx.Provide(
   229  	NewOption,
   230  	NewClient,
   231  	NewClientSet,
   232  	NewManager,
   233  	NewAuthCli,
   234  	NewScheme,
   235  	NewConfig,
   236  	NewNoCacheReader,
   237  	NewControlPlaneCacheReader,
   238  )
   239