...

Source file src/github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager/main.go

Documentation: github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager

     1  // Copyright 2019 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package main
    15  
    16  import (
    17  	"flag"
    18  	"net/http"
    19  	_ "net/http/pprof"
    20  	"os"
    21  	"time"
    22  
    23  	"github.com/go-logr/logr"
    24  	"go.uber.org/fx"
    25  	"golang.org/x/time/rate"
    26  	authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
    27  	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    28  	"k8s.io/client-go/util/workqueue"
    29  	ctrl "sigs.k8s.io/controller-runtime"
    30  	"sigs.k8s.io/controller-runtime/pkg/log/zap"
    31  	controllermetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
    32  	"sigs.k8s.io/controller-runtime/pkg/webhook"
    33  
    34  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    35  	apiWebhook "github.com/chaos-mesh/chaos-mesh/api/webhook"
    36  	"github.com/chaos-mesh/chaos-mesh/cmd/chaos-controller-manager/provider"
    37  	"github.com/chaos-mesh/chaos-mesh/controllers"
    38  	ccfg "github.com/chaos-mesh/chaos-mesh/controllers/config"
    39  	"github.com/chaos-mesh/chaos-mesh/controllers/metrics"
    40  	"github.com/chaos-mesh/chaos-mesh/controllers/types"
    41  	grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
    42  	"github.com/chaos-mesh/chaos-mesh/pkg/selector"
    43  	"github.com/chaos-mesh/chaos-mesh/pkg/version"
    44  	"github.com/chaos-mesh/chaos-mesh/pkg/webhook/config"
    45  	"github.com/chaos-mesh/chaos-mesh/pkg/webhook/config/watcher"
    46  )
    47  
    48  var (
    49  	printVersion bool
    50  	setupLog     = ctrl.Log.WithName("setup")
    51  )
    52  
    53  func parseFlags() {
    54  	flag.BoolVar(&printVersion, "version", false, "print version information and exit")
    55  	flag.Parse()
    56  }
    57  
    58  func main() {
    59  	parseFlags()
    60  	version.PrintVersionInfo("Controller manager")
    61  	if printVersion {
    62  		os.Exit(0)
    63  	}
    64  
    65  	// set RPCTimeout config
    66  	grpcUtils.RPCTimeout = ccfg.ControllerCfg.RPCTimeout
    67  	ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
    68  
    69  	app := fx.New(
    70  		fx.Options(
    71  			provider.Module,
    72  			controllers.Module,
    73  			selector.Module,
    74  			types.ChaosObjects,
    75  		),
    76  		fx.Invoke(Run),
    77  	)
    78  
    79  	app.Run()
    80  }
    81  
    82  type RunParams struct {
    83  	fx.In
    84  
    85  	Mgr     ctrl.Manager
    86  	Logger  logr.Logger
    87  	AuthCli *authorizationv1.AuthorizationV1Client
    88  
    89  	Controllers []types.Controller `group:"controller"`
    90  	Objs        []types.Object     `group:"objs"`
    91  }
    92  
    93  func Run(params RunParams) error {
    94  	mgr := params.Mgr
    95  	authCli := params.AuthCli
    96  
    97  	var err error
    98  	for _, obj := range params.Objs {
    99  		err = ctrl.NewWebhookManagedBy(mgr).
   100  			For(obj.Object).
   101  			Complete()
   102  		if err != nil {
   103  			return err
   104  		}
   105  	}
   106  
   107  	// setup schedule webhook
   108  	err = ctrl.NewWebhookManagedBy(mgr).
   109  		For(&v1alpha1.Schedule{}).
   110  		Complete()
   111  	if err != nil {
   112  		return err
   113  	}
   114  
   115  	// setup workflow webhook
   116  	err = ctrl.NewWebhookManagedBy(mgr).
   117  		For(&v1alpha1.Workflow{}).
   118  		Complete()
   119  	if err != nil {
   120  		return err
   121  	}
   122  
   123  	// Init metrics collector
   124  	metricsCollector := metrics.NewChaosCollector(mgr.GetCache(), controllermetrics.Registry)
   125  
   126  	setupLog.Info("Setting up webhook server")
   127  	hookServer := mgr.GetWebhookServer()
   128  	hookServer.CertDir = ccfg.ControllerCfg.CertsDir
   129  	conf := config.NewConfigWatcherConf()
   130  
   131  	stopCh := ctrl.SetupSignalHandler()
   132  
   133  	if ccfg.ControllerCfg.PprofAddr != "0" {
   134  		go func() {
   135  			if err := http.ListenAndServe(ccfg.ControllerCfg.PprofAddr, nil); err != nil {
   136  				setupLog.Error(err, "unable to start pprof server")
   137  				os.Exit(1)
   138  			}
   139  		}()
   140  	}
   141  
   142  	if err = ccfg.ControllerCfg.WatcherConfig.Verify(); err != nil {
   143  		setupLog.Error(err, "invalid environment configuration")
   144  		os.Exit(1)
   145  	}
   146  	configWatcher, err := watcher.New(*ccfg.ControllerCfg.WatcherConfig, metricsCollector)
   147  	if err != nil {
   148  		setupLog.Error(err, "unable to create config watcher")
   149  		os.Exit(1)
   150  	}
   151  
   152  	go watchConfig(configWatcher, conf, stopCh)
   153  	hookServer.Register("/inject-v1-pod", &webhook.Admission{
   154  		Handler: &apiWebhook.PodInjector{
   155  			Config:        conf,
   156  			ControllerCfg: ccfg.ControllerCfg,
   157  			Metrics:       metricsCollector,
   158  		}},
   159  	)
   160  	hookServer.Register("/validate-auth", &webhook.Admission{
   161  		Handler: apiWebhook.NewAuthValidator(ccfg.ControllerCfg.SecurityMode, authCli,
   162  			ccfg.ControllerCfg.ClusterScoped, ccfg.ControllerCfg.TargetNamespace, ccfg.ControllerCfg.EnableFilterNamespace),
   163  	},
   164  	)
   165  
   166  	setupLog.Info("Starting manager")
   167  	if err := mgr.Start(stopCh); err != nil {
   168  		setupLog.Error(err, "unable to start manager")
   169  		os.Exit(1)
   170  	}
   171  
   172  	return nil
   173  }
   174  
   175  func setupWatchQueue(stopCh <-chan struct{}, configWatcher *watcher.K8sConfigMapWatcher) workqueue.Interface {
   176  	// watch for reconciliation signals, and grab configmaps, then update the running configuration
   177  	// for the server
   178  	sigChan := make(chan interface{}, 10)
   179  
   180  	queue := workqueue.NewRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(0.5), 1)})
   181  
   182  	go func() {
   183  		for {
   184  			select {
   185  			case <-stopCh:
   186  				queue.ShutDown()
   187  				return
   188  			case <-sigChan:
   189  				queue.AddRateLimited(struct{}{})
   190  			}
   191  		}
   192  	}()
   193  
   194  	go func() {
   195  		for {
   196  			setupLog.Info("Launching watcher for ConfigMaps")
   197  			if err := configWatcher.Watch(sigChan, stopCh); err != nil {
   198  				switch err {
   199  				case watcher.ErrWatchChannelClosed:
   200  					// known issue: https://github.com/kubernetes/client-go/issues/334
   201  					setupLog.Info("watcher channel has closed, restart watcher")
   202  				default:
   203  					setupLog.Error(err, "unable to watch new ConfigMaps")
   204  					os.Exit(1)
   205  				}
   206  			}
   207  
   208  			select {
   209  			case <-stopCh:
   210  				close(sigChan)
   211  				return
   212  			default:
   213  				// sleep 2 seconds to prevent excessive log due to infinite restart
   214  				time.Sleep(2 * time.Second)
   215  			}
   216  		}
   217  	}()
   218  
   219  	return queue
   220  }
   221  
   222  func watchConfig(configWatcher *watcher.K8sConfigMapWatcher, cfg *config.Config, stopCh <-chan struct{}) {
   223  	queue := setupWatchQueue(stopCh, configWatcher)
   224  
   225  	for {
   226  		item, shutdown := queue.Get()
   227  		if shutdown {
   228  			break
   229  		}
   230  		func() {
   231  			defer queue.Done(item)
   232  
   233  			setupLog.Info("Triggering ConfigMap reconciliation")
   234  			updatedInjectionConfigs, err := configWatcher.GetInjectionConfigs()
   235  			if err != nil {
   236  				setupLog.Error(err, "unable to get ConfigMaps")
   237  				return
   238  			}
   239  
   240  			setupLog.Info("Updating server with newly loaded configurations",
   241  				"original configs count", len(cfg.Injections), "updated configs count", len(updatedInjectionConfigs))
   242  			cfg.ReplaceInjectionConfigs(updatedInjectionConfigs)
   243  			setupLog.Info("Configuration replaced")
   244  		}()
   245  	}
   246  }
   247