...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/server.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon

     1  // Copyright 2019 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package chaosdaemon
    15  
    16  import (
    17  	"context"
    18  	"crypto/tls"
    19  	"crypto/x509"
    20  	"fmt"
    21  	"io/ioutil"
    22  	"net"
    23  
    24  	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
    25  	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
    26  	"github.com/moby/locker"
    27  	"github.com/prometheus/client_golang/prometheus"
    28  	"golang.org/x/sync/errgroup"
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/credentials"
    31  	"google.golang.org/grpc/reflection"
    32  	ctrl "sigs.k8s.io/controller-runtime"
    33  
    34  	"github.com/chaos-mesh/chaos-mesh/pkg/bpm"
    35  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/crclients"
    36  	pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    37  	grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
    38  )
    39  
    40  var log = ctrl.Log.WithName("chaos-daemon-server")
    41  
    42  //go:generate protoc -I pb pb/chaosdaemon.proto --go_out=plugins=grpc:pb
    43  
    44  // Config contains the basic chaos daemon configuration.
    45  type Config struct {
    46  	HTTPPort  int
    47  	GRPCPort  int
    48  	Host      string
    49  	Runtime   string
    50  	Profiling bool
    51  
    52  	tlsConfig
    53  }
    54  
    55  // tlsConfig contains the config of TLS Server
    56  type tlsConfig struct {
    57  	CaCert string
    58  	Cert   string
    59  	Key    string
    60  }
    61  
    62  // Get the http address
    63  func (c *Config) HttpAddr() string {
    64  	return net.JoinHostPort(c.Host, fmt.Sprintf("%d", c.HTTPPort))
    65  }
    66  
    67  // Get the grpc address
    68  func (c *Config) GrpcAddr() string {
    69  	return net.JoinHostPort(c.Host, fmt.Sprintf("%d", c.GRPCPort))
    70  }
    71  
    72  // DaemonServer represents a grpc server for tc daemon
    73  type DaemonServer struct {
    74  	crClient                 crclients.ContainerRuntimeInfoClient
    75  	backgroundProcessManager bpm.BackgroundProcessManager
    76  
    77  	IPSetLocker *locker.Locker
    78  }
    79  
    80  func newDaemonServer(containerRuntime string) (*DaemonServer, error) {
    81  	crClient, err := crclients.CreateContainerRuntimeInfoClient(containerRuntime)
    82  	if err != nil {
    83  		return nil, err
    84  	}
    85  
    86  	return NewDaemonServerWithCRClient(crClient), nil
    87  }
    88  
    89  // NewDaemonServerWithCRClient returns DaemonServer with container runtime client
    90  func NewDaemonServerWithCRClient(crClient crclients.ContainerRuntimeInfoClient) *DaemonServer {
    91  	return &DaemonServer{
    92  		IPSetLocker:              locker.New(),
    93  		crClient:                 crClient,
    94  		backgroundProcessManager: bpm.NewBackgroundProcessManager(),
    95  	}
    96  }
    97  
    98  func newGRPCServer(containerRuntime string, reg prometheus.Registerer, tlsConf tlsConfig) (*grpc.Server, error) {
    99  	ds, err := newDaemonServer(containerRuntime)
   100  	if err != nil {
   101  		return nil, err
   102  	}
   103  
   104  	grpcMetrics := grpc_prometheus.NewServerMetrics()
   105  	grpcMetrics.EnableHandlingTimeHistogram(
   106  		grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10}),
   107  	)
   108  	reg.MustRegister(grpcMetrics)
   109  
   110  	grpcOpts := []grpc.ServerOption{
   111  		grpc_middleware.WithUnaryServerChain(
   112  			grpcUtils.TimeoutServerInterceptor,
   113  			grpcMetrics.UnaryServerInterceptor(),
   114  		),
   115  	}
   116  
   117  	if tlsConf != (tlsConfig{}) {
   118  		caCert, err := ioutil.ReadFile(tlsConf.CaCert)
   119  		if err != nil {
   120  			return nil, err
   121  		}
   122  		caCertPool := x509.NewCertPool()
   123  		caCertPool.AppendCertsFromPEM(caCert)
   124  
   125  		serverCert, err := tls.LoadX509KeyPair(tlsConf.Cert, tlsConf.Key)
   126  		if err != nil {
   127  			return nil, err
   128  		}
   129  
   130  		creds := credentials.NewTLS(&tls.Config{
   131  			Certificates: []tls.Certificate{serverCert},
   132  			ClientCAs:    caCertPool,
   133  			ClientAuth:   tls.RequireAndVerifyClientCert,
   134  		})
   135  
   136  		grpcOpts = append(grpcOpts, grpc.Creds(creds))
   137  	}
   138  
   139  	s := grpc.NewServer(grpcOpts...)
   140  	grpcMetrics.InitializeMetrics(s)
   141  
   142  	pb.RegisterChaosDaemonServer(s, ds)
   143  	reflection.Register(s)
   144  
   145  	return s, nil
   146  }
   147  
   148  // RegisterGatherer combine prometheus.Registerer and prometheus.Gatherer
   149  type RegisterGatherer interface {
   150  	prometheus.Registerer
   151  	prometheus.Gatherer
   152  }
   153  
   154  // StartServer starts chaos-daemon.
   155  func StartServer(conf *Config, reg RegisterGatherer) error {
   156  	g := &errgroup.Group{}
   157  
   158  	httpBindAddr := conf.HttpAddr()
   159  	httpServer := newHTTPServerBuilder().Addr(httpBindAddr).Metrics(reg).Profiling(conf.Profiling).Build()
   160  
   161  	grpcBindAddr := conf.GrpcAddr()
   162  	grpcListener, err := net.Listen("tcp", grpcBindAddr)
   163  	if err != nil {
   164  		log.Error(err, "failed to listen grpc address", "grpcBindAddr", grpcBindAddr)
   165  		return err
   166  	}
   167  
   168  	grpcServer, err := newGRPCServer(conf.Runtime, reg, conf.tlsConfig)
   169  	if err != nil {
   170  		log.Error(err, "failed to create grpc server")
   171  		return err
   172  	}
   173  
   174  	g.Go(func() error {
   175  		log.Info("Starting http endpoint", "address", httpBindAddr)
   176  		if err := httpServer.ListenAndServe(); err != nil {
   177  			log.Error(err, "failed to start http endpoint")
   178  			httpServer.Shutdown(context.Background())
   179  			return err
   180  		}
   181  		return nil
   182  	})
   183  
   184  	g.Go(func() error {
   185  		log.Info("Starting grpc endpoint", "address", grpcBindAddr, "runtime", conf.Runtime)
   186  		if err := grpcServer.Serve(grpcListener); err != nil {
   187  			log.Error(err, "failed to start grpc endpoint")
   188  			grpcServer.Stop()
   189  			return err
   190  		}
   191  		return nil
   192  	})
   193  
   194  	return g.Wait()
   195  }
   196