...

Source file src/github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager/provider/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager/provider

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package provider
    15  
    16  import (
    17  	"math"
    18  
    19  	"github.com/go-logr/logr"
    20  	lru "github.com/hashicorp/golang-lru"
    21  	"go.uber.org/fx"
    22  
    23  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    24  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    25  
    26  	"k8s.io/apimachinery/pkg/runtime"
    27  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    28  	authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
    29  	"k8s.io/client-go/rest"
    30  	"k8s.io/client-go/tools/record"
    31  	ctrl "sigs.k8s.io/controller-runtime"
    32  	"sigs.k8s.io/controller-runtime/pkg/cache"
    33  	"sigs.k8s.io/controller-runtime/pkg/client"
    34  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    35  )
    36  
    37  var (
    38  	scheme = runtime.NewScheme()
    39  )
    40  
    41  func init() {
    42  	_ = clientgoscheme.AddToScheme(scheme)
    43  
    44  	_ = v1alpha1.AddToScheme(scheme)
    45  	// +kubebuilder:scaffold:scheme
    46  }
    47  
    48  func NewScheme() *runtime.Scheme {
    49  	return scheme
    50  }
    51  
    52  func NewOption(logger logr.Logger) *ctrl.Options {
    53  	setupLog := logger.WithName("setup")
    54  
    55  	options := ctrl.Options{
    56  		Scheme:             scheme,
    57  		MetricsBindAddress: config.ControllerCfg.MetricsAddr,
    58  		LeaderElection:     config.ControllerCfg.EnableLeaderElection,
    59  		Port:               9443,
    60  		// Don't aggregate events
    61  		EventBroadcaster: record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{
    62  			MaxEvents:            math.MaxInt32,
    63  			MaxIntervalInSeconds: 1,
    64  		}),
    65  	}
    66  
    67  	if config.ControllerCfg.ClusterScoped {
    68  		setupLog.Info("Chaos controller manager is running in cluster scoped mode.")
    69  		// will not specific a certain namespace
    70  	} else {
    71  		setupLog.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", config.ControllerCfg.TargetNamespace)
    72  		options.Namespace = config.ControllerCfg.TargetNamespace
    73  	}
    74  
    75  	return &options
    76  }
    77  
    78  func NewConfig() *rest.Config {
    79  	return ctrl.GetConfigOrDie()
    80  }
    81  
    82  func NewManager(options *ctrl.Options, cfg *rest.Config) (ctrl.Manager, error) {
    83  	if config.ControllerCfg.QPS > 0 {
    84  		cfg.QPS = config.ControllerCfg.QPS
    85  	}
    86  	if config.ControllerCfg.Burst > 0 {
    87  		cfg.Burst = config.ControllerCfg.Burst
    88  	}
    89  
    90  	return ctrl.NewManager(cfg, *options)
    91  }
    92  
    93  func NewAuthCli(cfg *rest.Config) (*authorizationv1.AuthorizationV1Client, error) {
    94  
    95  	if config.ControllerCfg.QPS > 0 {
    96  		cfg.QPS = config.ControllerCfg.QPS
    97  	}
    98  	if config.ControllerCfg.Burst > 0 {
    99  		cfg.Burst = config.ControllerCfg.Burst
   100  	}
   101  
   102  	return authorizationv1.NewForConfig(cfg)
   103  }
   104  
   105  func NewClient(mgr ctrl.Manager, scheme *runtime.Scheme) (client.Client, error) {
   106  	// TODO: make this size configurable
   107  	cache, err := lru.New(100)
   108  	if err != nil {
   109  		return nil, err
   110  	}
   111  	return &UpdatedClient{
   112  		client: mgr.GetClient(),
   113  		scheme: scheme,
   114  		cache:  cache,
   115  	}, nil
   116  }
   117  
   118  func NewLogger() logr.Logger {
   119  	return ctrl.Log
   120  }
   121  
   122  type noCacheReader struct {
   123  	fx.Out
   124  
   125  	client.Reader `name:"no-cache"`
   126  }
   127  
   128  func NewNoCacheReader(mgr ctrl.Manager) noCacheReader {
   129  	return noCacheReader{
   130  		Reader: mgr.GetAPIReader(),
   131  	}
   132  }
   133  
   134  type globalCacheReader struct {
   135  	fx.Out
   136  
   137  	client.Reader `name:"global-cache"`
   138  }
   139  
   140  func NewGlobalCacheReader(mgr ctrl.Manager) globalCacheReader {
   141  	return globalCacheReader{
   142  		Reader: mgr.GetClient(),
   143  	}
   144  }
   145  
   146  type controlPlaneCacheReader struct {
   147  	fx.Out
   148  
   149  	client.Reader `name:"control-plane-cache"`
   150  }
   151  
   152  func NewControlPlaneCacheReader(logger logr.Logger) (controlPlaneCacheReader, error) {
   153  	cfg := ctrl.GetConfigOrDie()
   154  
   155  	mapper, err := apiutil.NewDynamicRESTMapper(cfg)
   156  	if err != nil {
   157  		return controlPlaneCacheReader{}, err
   158  	}
   159  
   160  	scheme := runtime.NewScheme()
   161  	_ = clientgoscheme.AddToScheme(scheme)
   162  
   163  	// Create the cache for the cached read client and registering informers
   164  	cache, err := cache.New(cfg, cache.Options{Scheme: scheme, Mapper: mapper, Resync: nil, Namespace: config.ControllerCfg.Namespace})
   165  	if err != nil {
   166  		return controlPlaneCacheReader{}, err
   167  	}
   168  	// TODO: store the channel and use it to stop
   169  	go func() {
   170  		err := cache.Start(make(chan struct{}))
   171  		if err != nil {
   172  			logger.Error(err, "fail to start cached client")
   173  		}
   174  	}()
   175  
   176  	c, err := client.New(cfg, client.Options{Scheme: scheme, Mapper: mapper})
   177  	if err != nil {
   178  		return controlPlaneCacheReader{}, err
   179  	}
   180  
   181  	cachedClient := &client.DelegatingClient{
   182  		Reader: &client.DelegatingReader{
   183  			CacheReader:  cache,
   184  			ClientReader: c,
   185  		},
   186  		Writer:       c,
   187  		StatusClient: c,
   188  	}
   189  
   190  	return controlPlaneCacheReader{
   191  		Reader: cachedClient,
   192  	}, nil
   193  }
   194  
   195  var Module = fx.Provide(
   196  	NewOption,
   197  	NewClient,
   198  	NewManager,
   199  	NewLogger,
   200  	NewAuthCli,
   201  	NewScheme,
   202  	NewConfig,
   203  	NewNoCacheReader,
   204  	NewGlobalCacheReader,
   205  	NewControlPlaneCacheReader,
   206  )
   207