...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/collector/server.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/collector

     1  // Copyright 2020 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package collector
    15  
    16  import (
    17  	"os"
    18  
    19  	"k8s.io/apimachinery/pkg/runtime"
    20  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    21  	ctrl "sigs.k8s.io/controller-runtime"
    22  	"sigs.k8s.io/controller-runtime/pkg/client"
    23  
    24  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    25  	"github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
    26  	"github.com/chaos-mesh/chaos-mesh/pkg/config"
    27  	"github.com/chaos-mesh/chaos-mesh/pkg/core"
    28  )
    29  
    30  var (
    31  	scheme = runtime.NewScheme()
    32  	log    = ctrl.Log.WithName("collector")
    33  )
    34  
    35  func init() {
    36  	_ = clientgoscheme.AddToScheme(scheme)
    37  
    38  	_ = v1alpha1.AddToScheme(scheme)
    39  }
    40  
    41  // Server defines a server to manage collectors.
    42  type Server struct {
    43  	Manager ctrl.Manager
    44  }
    45  
    46  // NewServer returns a CollectorServer and Client.
    47  func NewServer(
    48  	conf *config.ChaosDashboardConfig,
    49  	archive core.ExperimentStore,
    50  	event core.EventStore,
    51  ) (*Server, client.Client, client.Reader, *runtime.Scheme) {
    52  	s := &Server{}
    53  
    54  	// namespace scoped
    55  	options := ctrl.Options{
    56  		Scheme:             scheme,
    57  		MetricsBindAddress: conf.MetricAddress,
    58  		LeaderElection:     conf.EnableLeaderElection,
    59  		Port:               9443,
    60  	}
    61  	if conf.ClusterScoped {
    62  		log.Info("Chaos controller manager is running in cluster scoped mode.")
    63  	} else {
    64  		log.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", conf.TargetNamespace)
    65  		options.Namespace = conf.TargetNamespace
    66  	}
    67  
    68  	var err error
    69  
    70  	cfg := ctrl.GetConfigOrDie()
    71  	s.Manager, err = ctrl.NewManager(cfg, options)
    72  	if err != nil {
    73  		log.Error(err, "unable to start collector")
    74  		os.Exit(1)
    75  	}
    76  
    77  	if conf.SecurityMode {
    78  		clientpool.K8sClients, err = clientpool.NewClientPool(cfg, scheme, 100)
    79  		if err != nil {
    80  			// this should never happen
    81  			log.Error(err, "fail to create client pool")
    82  			os.Exit(1)
    83  		}
    84  	} else {
    85  		clientpool.K8sClients, err = clientpool.NewLocalClient(cfg, scheme)
    86  		if err != nil {
    87  			log.Error(err, "fail to create client pool")
    88  			os.Exit(1)
    89  		}
    90  	}
    91  
    92  	for kind, chaosKind := range v1alpha1.AllKinds() {
    93  		if err = (&ChaosCollector{
    94  			Client:  s.Manager.GetClient(),
    95  			Log:     ctrl.Log.WithName("collector").WithName(kind),
    96  			archive: archive,
    97  			event:   event,
    98  		}).Setup(s.Manager, chaosKind.Chaos); err != nil {
    99  			log.Error(err, "unable to create collector", "collector", kind)
   100  			os.Exit(1)
   101  		}
   102  	}
   103  
   104  	return s, s.Manager.GetClient(), s.Manager.GetAPIReader(), s.Manager.GetScheme()
   105  }
   106  
   107  // Register starts collectors manager.
   108  func Register(s *Server, controllerRuntimeStopCh <-chan struct{}) {
   109  	go func() {
   110  		log.Info("Starting collector")
   111  		if err := s.Manager.Start(controllerRuntimeStopCh); err != nil {
   112  			log.Error(err, "could not start collector")
   113  			os.Exit(1)
   114  		}
   115  	}()
   116  }
   117