...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/server.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package chaosdaemon
    17  
    18  import (
    19  	"context"
    20  	"crypto/tls"
    21  	"crypto/x509"
    22  	"fmt"
    23  	"net"
    24  	"net/http"
    25  	"os"
    26  	"sync"
    27  
    28  	"github.com/go-logr/logr"
    29  	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
    30  	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
    31  	"github.com/moby/locker"
    32  	"github.com/pkg/errors"
    33  	"github.com/prometheus/client_golang/prometheus"
    34  	"golang.org/x/sync/errgroup"
    35  	"google.golang.org/grpc"
    36  	"google.golang.org/grpc/credentials"
    37  	"google.golang.org/grpc/metadata"
    38  	"google.golang.org/grpc/reflection"
    39  
    40  	"github.com/chaos-mesh/chaos-mesh/pkg/bpm"
    41  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/crclients"
    42  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    43  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/tasks"
    44  	grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
    45  	"github.com/chaos-mesh/chaos-mesh/pkg/log"
    46  	"github.com/chaos-mesh/chaos-mesh/pkg/metrics"
    47  )
    48  
    49  //go:generate protoc -I pb pb/chaosdaemon.proto --go_out=plugins=grpc:pb
    50  
    51  // Config contains the basic chaos daemon configuration.
    52  type Config struct {
    53  	HTTPPort       int
    54  	GRPCPort       int
    55  	Host           string
    56  	CrClientConfig *crclients.CrClientConfig
    57  	Profiling      bool
    58  
    59  	tlsConfig
    60  }
    61  
    62  // tlsConfig contains the config of TLS Server
    63  type tlsConfig struct {
    64  	CaCert string
    65  	Cert   string
    66  	Key    string
    67  }
    68  
    69  // Get the http address
    70  func (c *Config) HttpAddr() string {
    71  	return net.JoinHostPort(c.Host, fmt.Sprintf("%d", c.HTTPPort))
    72  }
    73  
    74  // Get the grpc address
    75  func (c *Config) GrpcAddr() string {
    76  	return net.JoinHostPort(c.Host, fmt.Sprintf("%d", c.GRPCPort))
    77  }
    78  
    79  // DaemonServer represents a grpc server for tc daemon
    80  type DaemonServer struct {
    81  	crClient                 crclients.ContainerRuntimeInfoClient
    82  	backgroundProcessManager *bpm.BackgroundProcessManager
    83  	rootLogger               logr.Logger
    84  
    85  	// tproxyLocker is a set of tproxy processes to lock stdin/stdout/stderr
    86  	tproxyLocker *sync.Map
    87  
    88  	IPSetLocker     *locker.Locker
    89  	timeChaosServer TimeChaosServer
    90  }
    91  
    92  func (s *DaemonServer) getLoggerFromContext(ctx context.Context) logr.Logger {
    93  	return log.EnrichLoggerWithContext(ctx, s.rootLogger)
    94  }
    95  
    96  func newDaemonServer(clientConfig *crclients.CrClientConfig, reg prometheus.Registerer, log logr.Logger) (*DaemonServer, error) {
    97  	crClient, err := crclients.CreateContainerRuntimeInfoClient(clientConfig)
    98  	if err != nil {
    99  		return nil, err
   100  	}
   101  
   102  	return NewDaemonServerWithCRClient(crClient, reg, log), nil
   103  }
   104  
   105  // NewDaemonServerWithCRClient returns DaemonServer with container runtime client
   106  func NewDaemonServerWithCRClient(crClient crclients.ContainerRuntimeInfoClient, reg prometheus.Registerer, log logr.Logger) *DaemonServer {
   107  	return &DaemonServer{
   108  		IPSetLocker:              locker.New(),
   109  		crClient:                 crClient,
   110  		backgroundProcessManager: bpm.StartBackgroundProcessManager(reg, log),
   111  		tproxyLocker:             new(sync.Map),
   112  		rootLogger:               log,
   113  		timeChaosServer: TimeChaosServer{
   114  			podContainerNameProcessMap: tasks.NewPodProcessMap(),
   115  			manager:                    tasks.NewTaskManager(logr.New(log.GetSink()).WithName("TimeChaos")),
   116  			nameLocker:                 tasks.NewLockMap[tasks.PodContainerName](),
   117  			logger:                     logr.New(log.GetSink()).WithName("TimeChaos"),
   118  		},
   119  	}
   120  }
   121  
   122  func newGRPCServer(daemonServer *DaemonServer, reg prometheus.Registerer, tlsConf tlsConfig) (*grpc.Server, error) {
   123  	grpcMetrics := grpc_prometheus.NewServerMetrics()
   124  	grpcMetrics.EnableHandlingTimeHistogram(
   125  		grpc_prometheus.WithHistogramBuckets(metrics.ChaosDaemonGrpcServerBuckets),
   126  		metrics.WithHistogramName("chaos_daemon_grpc_server_handling_seconds"),
   127  	)
   128  	reg.MustRegister(
   129  		grpcMetrics,
   130  		metrics.DefaultChaosDaemonMetricsCollector.InjectCrClient(daemonServer.crClient),
   131  	)
   132  
   133  	grpcOpts := []grpc.ServerOption{
   134  		grpc_middleware.WithUnaryServerChain(
   135  			grpcUtils.TimeoutServerInterceptor,
   136  			grpcMetrics.UnaryServerInterceptor(),
   137  			MetadataExtractor(log.MetaNamespacedName),
   138  		),
   139  	}
   140  
   141  	if tlsConf != (tlsConfig{}) {
   142  		caCert, err := os.ReadFile(tlsConf.CaCert)
   143  		if err != nil {
   144  			return nil, err
   145  		}
   146  		caCertPool := x509.NewCertPool()
   147  		caCertPool.AppendCertsFromPEM(caCert)
   148  
   149  		serverCert, err := tls.LoadX509KeyPair(tlsConf.Cert, tlsConf.Key)
   150  		if err != nil {
   151  			return nil, err
   152  		}
   153  
   154  		creds := credentials.NewTLS(&tls.Config{
   155  			Certificates: []tls.Certificate{serverCert},
   156  			ClientCAs:    caCertPool,
   157  			ClientAuth:   tls.RequireAndVerifyClientCert,
   158  			MinVersion:   tls.VersionTLS13,
   159  		})
   160  
   161  		grpcOpts = append(grpcOpts, grpc.Creds(creds))
   162  	}
   163  
   164  	s := grpc.NewServer(grpcOpts...)
   165  	grpcMetrics.InitializeMetrics(s)
   166  
   167  	pb.RegisterChaosDaemonServer(s, daemonServer)
   168  	reflection.Register(s)
   169  
   170  	return s, nil
   171  }
   172  
   173  func MetadataExtractor(keys ...log.Metadatkey) grpc.UnaryServerInterceptor {
   174  	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
   175  		// Get the metadata from the incoming context
   176  		md, ok := metadata.FromIncomingContext(ctx)
   177  		if !ok {
   178  			return nil, errors.New("couldn't parse incoming context metadata")
   179  		}
   180  		for _, key := range keys {
   181  			values := md.Get(string(key))
   182  			if len(values) > 0 {
   183  				ctx = context.WithValue(ctx, key, values[0])
   184  			}
   185  		}
   186  
   187  		return handler(ctx, req)
   188  	}
   189  }
   190  
   191  // RegisterGatherer combine prometheus.Registerer and prometheus.Gatherer
   192  type RegisterGatherer interface {
   193  	prometheus.Registerer
   194  	prometheus.Gatherer
   195  }
   196  
   197  // Server is the server for chaos daemon
   198  type Server struct {
   199  	daemonServer *DaemonServer
   200  	httpServer   *http.Server
   201  	grpcServer   *grpc.Server
   202  
   203  	conf   *Config
   204  	logger logr.Logger
   205  }
   206  
   207  // BuildServer builds a chaos daemon server
   208  func BuildServer(conf *Config, reg RegisterGatherer, log logr.Logger) (*Server, error) {
   209  	server := &Server{conf: conf, logger: log}
   210  	var err error
   211  	server.daemonServer, err = newDaemonServer(conf.CrClientConfig, reg, log)
   212  	if err != nil {
   213  		return nil, errors.Wrap(err, "create daemon server")
   214  	}
   215  
   216  	server.httpServer = newHTTPServerBuilder().Addr(conf.HttpAddr()).Metrics(reg).Profiling(conf.Profiling).Build()
   217  	server.grpcServer, err = newGRPCServer(server.daemonServer, reg, conf.tlsConfig)
   218  	if err != nil {
   219  		return nil, errors.Wrap(err, "create grpc server")
   220  	}
   221  
   222  	return server, nil
   223  }
   224  
   225  // Start starts chaos-daemon.
   226  func (s *Server) Start() error {
   227  	grpcBindAddr := s.conf.GrpcAddr()
   228  	grpcListener, err := net.Listen("tcp", grpcBindAddr)
   229  	if err != nil {
   230  		return errors.Wrapf(err, "listen grpc address %s", grpcBindAddr)
   231  	}
   232  
   233  	var eg errgroup.Group
   234  
   235  	eg.Go(func() error {
   236  		s.logger.Info("Starting http endpoint", "address", s.conf.HttpAddr())
   237  		if err := s.httpServer.ListenAndServe(); err != nil {
   238  			return errors.Wrap(err, "start http endpoint")
   239  		}
   240  		return nil
   241  	})
   242  
   243  	eg.Go(func() error {
   244  		s.logger.Info("Starting grpc endpoint", "address", grpcBindAddr, "runtime", s.conf.CrClientConfig.Runtime)
   245  		if err := s.grpcServer.Serve(grpcListener); err != nil {
   246  			return errors.Wrap(err, "start grpc endpoint")
   247  		}
   248  		return nil
   249  	})
   250  
   251  	return eg.Wait()
   252  }
   253  
   254  func (s *Server) Shutdown() error {
   255  	if err := s.httpServer.Shutdown(context.TODO()); err != nil {
   256  		return errors.Wrap(err, "shut grpc endpoint down")
   257  	}
   258  	s.grpcServer.GracefulStop()
   259  	s.daemonServer.backgroundProcessManager.Shutdown(context.TODO())
   260  	return nil
   261  }
   262