...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/collector/server.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/collector

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package collector
    17  
    18  import (
    19  	"context"
    20  	"os"
    21  
    22  	v1 "k8s.io/api/core/v1"
    23  	"k8s.io/apimachinery/pkg/runtime"
    24  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    25  	ctrl "sigs.k8s.io/controller-runtime"
    26  	"sigs.k8s.io/controller-runtime/pkg/client"
    27  
    28  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    29  	"github.com/chaos-mesh/chaos-mesh/pkg/clientpool"
    30  	config "github.com/chaos-mesh/chaos-mesh/pkg/config/dashboard"
    31  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
    32  )
    33  
    34  var (
    35  	scheme = runtime.NewScheme()
    36  	log    = ctrl.Log.WithName("collector")
    37  )
    38  
    39  func init() {
    40  	_ = clientgoscheme.AddToScheme(scheme)
    41  
    42  	_ = v1alpha1.AddToScheme(scheme)
    43  }
    44  
    45  // Server defines a server to manage collectors.
    46  type Server struct {
    47  	Manager ctrl.Manager
    48  }
    49  
    50  // NewServer returns a CollectorServer and Client.
    51  func NewServer(
    52  	conf *config.ChaosDashboardConfig,
    53  	experimentArchive core.ExperimentStore,
    54  	scheduleArchive core.ScheduleStore,
    55  	event core.EventStore,
    56  	workflowStore core.WorkflowStore,
    57  ) (*Server, client.Client, client.Reader, *runtime.Scheme) {
    58  	s := &Server{}
    59  
    60  	// namespace scoped
    61  	options := ctrl.Options{
    62  		Scheme:             scheme,
    63  		MetricsBindAddress: conf.MetricAddress,
    64  		LeaderElection:     conf.EnableLeaderElection,
    65  		Port:               9443,
    66  	}
    67  	if conf.ClusterScoped {
    68  		log.Info("Chaos controller manager is running in cluster scoped mode.")
    69  	} else {
    70  		log.Info("Chaos controller manager is running in namespace scoped mode.", "targetNamespace", conf.TargetNamespace)
    71  		options.Namespace = conf.TargetNamespace
    72  	}
    73  
    74  	var err error
    75  
    76  	cfg := ctrl.GetConfigOrDie()
    77  	s.Manager, err = ctrl.NewManager(cfg, options)
    78  	if err != nil {
    79  		log.Error(err, "unable to start collector")
    80  		os.Exit(1)
    81  	}
    82  
    83  	if conf.SecurityMode {
    84  		clientpool.K8sClients, err = clientpool.NewClientPool(cfg, scheme, 100)
    85  		if err != nil {
    86  			// this should never happen
    87  			log.Error(err, "fail to create client pool")
    88  			os.Exit(1)
    89  		}
    90  	} else {
    91  		clientpool.K8sClients, err = clientpool.NewLocalClient(cfg, scheme)
    92  		if err != nil {
    93  			log.Error(err, "fail to create client pool")
    94  			os.Exit(1)
    95  		}
    96  	}
    97  
    98  	for kind, chaosKind := range v1alpha1.AllKinds() {
    99  		if err = (&ChaosCollector{
   100  			Client:  s.Manager.GetClient(),
   101  			Log:     ctrl.Log.WithName("collector").WithName(kind),
   102  			archive: experimentArchive,
   103  			event:   event,
   104  		}).Setup(s.Manager, chaosKind.SpawnObject()); err != nil {
   105  			log.Error(err, "unable to create collector", "collector", kind)
   106  			os.Exit(1)
   107  		}
   108  	}
   109  
   110  	if err = (&ScheduleCollector{
   111  		Client:  s.Manager.GetClient(),
   112  		Log:     ctrl.Log.WithName("schedule-collector").WithName(v1alpha1.KindSchedule),
   113  		archive: scheduleArchive,
   114  	}).Setup(s.Manager, &v1alpha1.Schedule{}); err != nil {
   115  		log.Error(err, "unable to create collector", "collector", v1alpha1.KindSchedule)
   116  		os.Exit(1)
   117  	}
   118  
   119  	if err = (&EventCollector{
   120  		Client: s.Manager.GetClient(),
   121  		Log:    ctrl.Log.WithName("event-collector").WithName("Event"),
   122  		event:  event,
   123  	}).Setup(s.Manager, &v1.Event{}); err != nil {
   124  		log.Error(err, "unable to create collector", "collector", v1alpha1.KindSchedule)
   125  		os.Exit(1)
   126  	}
   127  
   128  	if err = (&WorkflowCollector{
   129  		kubeClient: s.Manager.GetClient(),
   130  		Log:        ctrl.Log.WithName("workflow-collector").WithName(v1alpha1.KindWorkflow),
   131  		store:      workflowStore,
   132  	}).Setup(s.Manager, &v1alpha1.Workflow{}); err != nil {
   133  		log.Error(err, "unable to create collector", "collector", v1alpha1.KindWorkflow)
   134  		os.Exit(1)
   135  	}
   136  
   137  	return s, s.Manager.GetClient(), s.Manager.GetAPIReader(), s.Manager.GetScheme()
   138  }
   139  
   140  // Register starts collectors manager.
   141  func Register(ctx context.Context, s *Server) {
   142  	go func() {
   143  		log.Info("Starting collector")
   144  		if err := s.Manager.Start(ctx); err != nil {
   145  			log.Error(err, "could not start collector")
   146  			os.Exit(1)
   147  		}
   148  	}()
   149  }
   150