...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/collector/server.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/collector

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package collector
    17  
    18  import (
    19  	"context"
    20  	"net"
    21  	"os"
    22  	"strconv"
    23  
    24  	"github.com/go-logr/logr"
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/runtime"
    27  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    28  	"k8s.io/client-go/rest"
    29  	ctrl "sigs.k8s.io/controller-runtime"
    30  	"sigs.k8s.io/controller-runtime/pkg/cache"
    31  	"sigs.k8s.io/controller-runtime/pkg/client"
    32  	metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
    33  	"sigs.k8s.io/controller-runtime/pkg/webhook"
    34  
    35  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    36  	"github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
    37  	config "github.com/chaos-mesh/chaos-mesh/pkg/config"
    38  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
    39  )
    40  
    41  var (
    42  	scheme = runtime.NewScheme()
    43  )
    44  
    45  func init() {
    46  	_ = clientgoscheme.AddToScheme(scheme)
    47  
    48  	_ = v1alpha1.AddToScheme(scheme)
    49  }
    50  
    51  // Server defines a server to manage collectors.
    52  type Server struct {
    53  	Manager ctrl.Manager
    54  	logger  logr.Logger
    55  }
    56  
    57  // NewServer returns a CollectorServer and Client.
    58  func NewServer(
    59  	conf *config.ChaosDashboardConfig,
    60  	experimentArchive core.ExperimentStore,
    61  	scheduleArchive core.ScheduleStore,
    62  	event core.EventStore,
    63  	workflowStore core.WorkflowStore,
    64  	logger logr.Logger,
    65  ) (*Server, client.Client, client.Reader, *runtime.Scheme) {
    66  	s := &Server{logger: logger}
    67  
    68  	// namespace scoped
    69  	options := ctrl.Options{
    70  		Scheme: scheme,
    71  		Metrics: metricsserver.Options{
    72  			BindAddress: net.JoinHostPort(conf.MetricHost, strconv.Itoa(conf.MetricPort)),
    73  		},
    74  		LeaderElection: conf.EnableLeaderElection,
    75  		WebhookServer: webhook.NewServer(webhook.Options{
    76  			Port: 9443,
    77  		}),
    78  	}
    79  	if conf.ClusterScoped {
    80  		logger.Info("Chaos controller manager is running in cluster scoped mode.")
    81  	} else {
    82  		logger.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", conf.TargetNamespace)
    83  		options.NewCache = func(cfg *rest.Config, opts cache.Options) (cache.Cache, error) {
    84  			opts.DefaultNamespaces = map[string]cache.Config{
    85  				conf.TargetNamespace: {},
    86  			}
    87  			return cache.New(cfg, opts)
    88  		}
    89  	}
    90  
    91  	var err error
    92  
    93  	cfg := ctrl.GetConfigOrDie()
    94  
    95  	if conf.QPS > 0 {
    96  		cfg.QPS = conf.QPS
    97  		cfg.Burst = conf.Burst
    98  	}
    99  
   100  	s.Manager, err = ctrl.NewManager(cfg, options)
   101  	if err != nil {
   102  		logger.Error(err, "unable to start collector")
   103  		os.Exit(1)
   104  	}
   105  
   106  	if conf.SecurityMode {
   107  		clientpool.K8sClients, err = clientpool.NewClientPool(cfg, scheme, 100)
   108  		if err != nil {
   109  			// this should never happen
   110  			logger.Error(err, "fail to create client pool")
   111  			os.Exit(1)
   112  		}
   113  	} else {
   114  		clientpool.K8sClients, err = clientpool.NewLocalClient(cfg, scheme)
   115  		if err != nil {
   116  			logger.Error(err, "fail to create client pool")
   117  			os.Exit(1)
   118  		}
   119  	}
   120  
   121  	for kind, chaosKind := range v1alpha1.AllKinds() {
   122  		if err = (&ChaosCollector{
   123  			Client:  s.Manager.GetClient(),
   124  			Log:     logger.WithName(kind),
   125  			archive: experimentArchive,
   126  			event:   event,
   127  		}).Setup(s.Manager, chaosKind.SpawnObject()); err != nil {
   128  			logger.Error(err, "unable to create collector", "collector", kind)
   129  			os.Exit(1)
   130  		}
   131  	}
   132  
   133  	if err = (&ScheduleCollector{
   134  		Client:  s.Manager.GetClient(),
   135  		Log:     logger.WithName("schedule-collector").WithName(v1alpha1.KindSchedule),
   136  		archive: scheduleArchive,
   137  	}).Setup(s.Manager, &v1alpha1.Schedule{}); err != nil {
   138  		logger.Error(err, "unable to create collector", "collector", v1alpha1.KindSchedule)
   139  		os.Exit(1)
   140  	}
   141  
   142  	if err = (&EventCollector{
   143  		Client: s.Manager.GetClient(),
   144  		Log:    logger.WithName("event-collector").WithName("Event"),
   145  		event:  event,
   146  	}).Setup(s.Manager, &v1.Event{}); err != nil {
   147  		logger.Error(err, "unable to create collector", "collector", v1alpha1.KindSchedule)
   148  		os.Exit(1)
   149  	}
   150  
   151  	if err = (&WorkflowCollector{
   152  		kubeClient: s.Manager.GetClient(),
   153  		Log:        logger.WithName("workflow-collector").WithName(v1alpha1.KindWorkflow),
   154  		store:      workflowStore,
   155  	}).Setup(s.Manager, &v1alpha1.Workflow{}); err != nil {
   156  		logger.Error(err, "unable to create collector", "collector", v1alpha1.KindWorkflow)
   157  		os.Exit(1)
   158  	}
   159  
   160  	return s, s.Manager.GetClient(), s.Manager.GetAPIReader(), s.Manager.GetScheme()
   161  }
   162  
   163  // Register starts collectors manager.
   164  func Register(ctx context.Context, s *Server) {
   165  	go func() {
   166  		s.logger.Info("Starting collector")
   167  		if err := s.Manager.Start(ctx); err != nil {
   168  			s.logger.Error(err, "could not start collector")
   169  			os.Exit(1)
   170  		}
   171  	}()
   172  }
   173