...

Source file src/github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager/main.go

Documentation: github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager

     1  // Copyright 2019 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package main
    15  
    16  import (
    17  	"flag"
    18  	"net/http"
    19  	_ "net/http/pprof"
    20  	"os"
    21  	"time"
    22  
    23  	"golang.org/x/time/rate"
    24  
    25  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    26  	apiWebhook "github.com/chaos-mesh/chaos-mesh/api/webhook"
    27  	ccfg "github.com/chaos-mesh/chaos-mesh/controllers/config"
    28  	"github.com/chaos-mesh/chaos-mesh/controllers/metrics"
    29  	"github.com/chaos-mesh/chaos-mesh/controllers/podiochaos"
    30  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos"
    31  	grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
    32  	"github.com/chaos-mesh/chaos-mesh/pkg/router"
    33  	"github.com/chaos-mesh/chaos-mesh/pkg/version"
    34  	"github.com/chaos-mesh/chaos-mesh/pkg/webhook/config"
    35  	"github.com/chaos-mesh/chaos-mesh/pkg/webhook/config/watcher"
    36  
    37  	_ "github.com/chaos-mesh/chaos-mesh/controllers/awschaos/detachvolume"
    38  	_ "github.com/chaos-mesh/chaos-mesh/controllers/awschaos/ec2restart"
    39  	_ "github.com/chaos-mesh/chaos-mesh/controllers/awschaos/ec2stop"
    40  	_ "github.com/chaos-mesh/chaos-mesh/controllers/dnschaos"
    41  	_ "github.com/chaos-mesh/chaos-mesh/controllers/httpchaos"
    42  	_ "github.com/chaos-mesh/chaos-mesh/controllers/iochaos"
    43  	_ "github.com/chaos-mesh/chaos-mesh/controllers/jvmchaos"
    44  	_ "github.com/chaos-mesh/chaos-mesh/controllers/kernelchaos"
    45  	_ "github.com/chaos-mesh/chaos-mesh/controllers/networkchaos/partition"
    46  	_ "github.com/chaos-mesh/chaos-mesh/controllers/networkchaos/trafficcontrol"
    47  	_ "github.com/chaos-mesh/chaos-mesh/controllers/podchaos/containerkill"
    48  	_ "github.com/chaos-mesh/chaos-mesh/controllers/podchaos/podfailure"
    49  	_ "github.com/chaos-mesh/chaos-mesh/controllers/podchaos/podkill"
    50  	_ "github.com/chaos-mesh/chaos-mesh/controllers/stresschaos"
    51  	_ "github.com/chaos-mesh/chaos-mesh/controllers/timechaos"
    52  
    53  	"k8s.io/apimachinery/pkg/runtime"
    54  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    55  	authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
    56  	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    57  	"k8s.io/client-go/rest"
    58  	"k8s.io/client-go/util/workqueue"
    59  
    60  	ctrl "sigs.k8s.io/controller-runtime"
    61  	"sigs.k8s.io/controller-runtime/pkg/log/zap"
    62  
    63  	controllermetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
    64  	"sigs.k8s.io/controller-runtime/pkg/webhook"
    65  	// +kubebuilder:scaffold:imports
    66  )
    67  
    68  var (
    69  	scheme   = runtime.NewScheme()
    70  	setupLog = ctrl.Log.WithName("setup")
    71  )
    72  
    73  var (
    74  	printVersion                   bool
    75  	restConfigQPS, restConfigBurst int
    76  )
    77  
    78  func init() {
    79  	_ = clientgoscheme.AddToScheme(scheme)
    80  
    81  	_ = v1alpha1.AddToScheme(scheme)
    82  	// +kubebuilder:scaffold:scheme
    83  }
    84  
    85  func parseFlags() {
    86  	flag.BoolVar(&printVersion, "version", false, "print version information and exit")
    87  	flag.IntVar(&restConfigQPS, "rest-config-qps", 30, "QPS of rest config.")
    88  	flag.IntVar(&restConfigBurst, "rest-config-burst", 50, "burst of rest config.")
    89  	flag.Parse()
    90  }
    91  
    92  func main() {
    93  	parseFlags()
    94  	version.PrintVersionInfo("Controller manager")
    95  	if printVersion {
    96  		os.Exit(0)
    97  	}
    98  
    99  	// set RPCTimeout config
   100  	grpcUtils.RPCTimeout = ccfg.ControllerCfg.RPCTimeout
   101  
   102  	ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
   103  
   104  	options := ctrl.Options{
   105  		Scheme:             scheme,
   106  		MetricsBindAddress: ccfg.ControllerCfg.MetricsAddr,
   107  		LeaderElection:     ccfg.ControllerCfg.EnableLeaderElection,
   108  		Port:               9443,
   109  	}
   110  
   111  	if ccfg.ControllerCfg.ClusterScoped {
   112  		setupLog.Info("Chaos controller manager is running in cluster scoped mode.")
   113  		// will not specific a certain namespace
   114  	} else {
   115  		setupLog.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", ccfg.ControllerCfg.TargetNamespace)
   116  		options.Namespace = ccfg.ControllerCfg.TargetNamespace
   117  	}
   118  
   119  	cfg := ctrl.GetConfigOrDie()
   120  	setRestConfig(cfg)
   121  	mgr, err := ctrl.NewManager(cfg, options)
   122  	if err != nil {
   123  		setupLog.Error(err, "unable to start manager")
   124  		os.Exit(1)
   125  	}
   126  
   127  	authCli, err := authorizationv1.NewForConfig(cfg)
   128  	if err != nil {
   129  		setupLog.Error(err, "unable to get authorization client")
   130  		os.Exit(1)
   131  	}
   132  
   133  	err = router.SetupWithManagerAndConfigs(mgr, ccfg.ControllerCfg)
   134  	if err != nil {
   135  		setupLog.Error(err, "fail to setup with manager")
   136  		os.Exit(1)
   137  	}
   138  
   139  	// We only setup webhook for podiochaos, and the logic of applying chaos are in the mutation
   140  	// webhook, because we need to get the running result synchronously in io chaos reconciler
   141  	v1alpha1.RegisterPodIoHandler(&podiochaos.Handler{
   142  		Client: mgr.GetClient(),
   143  		Log:    ctrl.Log.WithName("handler").WithName("PodIOChaos"),
   144  	})
   145  	if err = (&v1alpha1.PodIoChaos{}).SetupWebhookWithManager(mgr); err != nil {
   146  		setupLog.Error(err, "unable to create webhook", "webhook", "PodIOChaos")
   147  		os.Exit(1)
   148  	}
   149  
   150  	// We only setup webhook for podnetworkchaos, and the logic of applying chaos are in the validation
   151  	// webhook, because we need to get the running result synchronously in network chaos reconciler
   152  	v1alpha1.RegisterRawPodNetworkHandler(&podnetworkchaos.Handler{
   153  		Client:                  mgr.GetClient(),
   154  		Reader:                  mgr.GetAPIReader(),
   155  		Log:                     ctrl.Log.WithName("handler").WithName("PodNetworkChaos"),
   156  		AllowHostNetworkTesting: ccfg.ControllerCfg.AllowHostNetworkTesting,
   157  	})
   158  	if err = (&v1alpha1.PodNetworkChaos{}).SetupWebhookWithManager(mgr); err != nil {
   159  		setupLog.Error(err, "unable to create webhook", "webhook", "PodNetworkChaos")
   160  		os.Exit(1)
   161  	}
   162  
   163  	// Init metrics collector
   164  	metricsCollector := metrics.NewChaosCollector(mgr.GetCache(), controllermetrics.Registry)
   165  
   166  	setupLog.Info("Setting up webhook server")
   167  	hookServer := mgr.GetWebhookServer()
   168  	hookServer.CertDir = ccfg.ControllerCfg.CertsDir
   169  	conf := config.NewConfigWatcherConf()
   170  
   171  	stopCh := ctrl.SetupSignalHandler()
   172  
   173  	if ccfg.ControllerCfg.PprofAddr != "0" {
   174  		go func() {
   175  			if err := http.ListenAndServe(ccfg.ControllerCfg.PprofAddr, nil); err != nil {
   176  				setupLog.Error(err, "unable to start pprof server")
   177  				os.Exit(1)
   178  			}
   179  		}()
   180  	}
   181  
   182  	if err = ccfg.ControllerCfg.WatcherConfig.Verify(); err != nil {
   183  		setupLog.Error(err, "invalid environment configuration")
   184  		os.Exit(1)
   185  	}
   186  	configWatcher, err := watcher.New(*ccfg.ControllerCfg.WatcherConfig, metricsCollector)
   187  	if err != nil {
   188  		setupLog.Error(err, "unable to create config watcher")
   189  		os.Exit(1)
   190  	}
   191  
   192  	go watchConfig(configWatcher, conf, stopCh)
   193  	hookServer.Register("/inject-v1-pod", &webhook.Admission{
   194  		Handler: &apiWebhook.PodInjector{
   195  			Config:        conf,
   196  			ControllerCfg: ccfg.ControllerCfg,
   197  			Metrics:       metricsCollector,
   198  		}},
   199  	)
   200  
   201  	hookServer.Register("/validate-auth", &webhook.Admission{
   202  		Handler: apiWebhook.NewAuthValidator(ccfg.ControllerCfg.SecurityMode, mgr.GetClient(), mgr.GetAPIReader(), authCli,
   203  			ccfg.ControllerCfg.ClusterScoped, ccfg.ControllerCfg.TargetNamespace, ccfg.ControllerCfg.EnableFilterNamespace),
   204  	},
   205  	)
   206  
   207  	// +kubebuilder:scaffold:builder
   208  
   209  	setupLog.Info("Starting manager")
   210  	if err := mgr.Start(stopCh); err != nil {
   211  		setupLog.Error(err, "unable to start manager")
   212  		os.Exit(1)
   213  	}
   214  }
   215  
   216  func setRestConfig(c *rest.Config) {
   217  	if restConfigQPS > 0 {
   218  		c.QPS = float32(restConfigQPS)
   219  	}
   220  	if restConfigBurst > 0 {
   221  		c.Burst = restConfigBurst
   222  	}
   223  }
   224  
   225  func setupWatchQueue(stopCh <-chan struct{}, configWatcher *watcher.K8sConfigMapWatcher) workqueue.Interface {
   226  	// watch for reconciliation signals, and grab configmaps, then update the running configuration
   227  	// for the server
   228  	sigChan := make(chan interface{}, 10)
   229  
   230  	queue := workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(0.5), 1)})
   231  
   232  	go func() {
   233  		for {
   234  			select {
   235  			case <-stopCh:
   236  				queue.ShutDown()
   237  				return
   238  			case <-sigChan:
   239  				queue.AddRateLimited(struct{}{})
   240  			}
   241  		}
   242  	}()
   243  
   244  	go func() {
   245  		for {
   246  			setupLog.Info("Launching watcher for ConfigMaps")
   247  			if err := configWatcher.Watch(sigChan, stopCh); err != nil {
   248  				switch err {
   249  				case watcher.ErrWatchChannelClosed:
   250  					// known issue: https://github.com/kubernetes/client-go/issues/334
   251  					setupLog.Info("watcher channel has closed, restart watcher")
   252  				default:
   253  					setupLog.Error(err, "unable to watch new ConfigMaps")
   254  					os.Exit(1)
   255  				}
   256  			}
   257  
   258  			select {
   259  			case <-stopCh:
   260  				close(sigChan)
   261  				return
   262  			default:
   263  				// sleep 2 seconds to prevent excessive log due to infinite restart
   264  				time.Sleep(2 * time.Second)
   265  			}
   266  		}
   267  	}()
   268  
   269  	return queue
   270  }
   271  
   272  func watchConfig(configWatcher *watcher.K8sConfigMapWatcher, cfg *config.Config, stopCh <-chan struct{}) {
   273  	queue := setupWatchQueue(stopCh, configWatcher)
   274  
   275  	for {
   276  		item, shutdown := queue.Get()
   277  		if shutdown {
   278  			break
   279  		}
   280  		func() {
   281  			defer queue.Done(item)
   282  
   283  			setupLog.Info("Triggering ConfigMap reconciliation")
   284  			updatedInjectionConfigs, err := configWatcher.GetInjectionConfigs()
   285  			if err != nil {
   286  				setupLog.Error(err, "unable to get ConfigMaps")
   287  				return
   288  			}
   289  
   290  			setupLog.Info("Updating server with newly loaded configurations",
   291  				"original configs count", len(cfg.Injections), "updated configs count", len(updatedInjectionConfigs))
   292  			cfg.ReplaceInjectionConfigs(updatedInjectionConfigs)
   293  			setupLog.Info("Configuration replaced")
   294  		}()
   295  	}
   296  }
   297