1
2
3
4
5
6
7
8
9
10
11
12
13
14 package chaosdaemon
15
16 import (
17 "context"
18 "crypto/tls"
19 "crypto/x509"
20 "fmt"
21 "io/ioutil"
22 "net"
23
24 grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
25 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
26 "github.com/moby/locker"
27 "github.com/prometheus/client_golang/prometheus"
28 "golang.org/x/sync/errgroup"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/credentials"
31 "google.golang.org/grpc/reflection"
32 ctrl "sigs.k8s.io/controller-runtime"
33
34 "github.com/chaos-mesh/chaos-mesh/pkg/bpm"
35 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/crclients"
36 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
37 grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
38 )
39
40 var log = ctrl.Log.WithName("chaos-daemon-server")
41
42
43
44
45 type Config struct {
46 HTTPPort int
47 GRPCPort int
48 Host string
49 Runtime string
50 Profiling bool
51
52 tlsConfig
53 }
54
55
56 type tlsConfig struct {
57 CaCert string
58 Cert string
59 Key string
60 }
61
62
63 func (c *Config) HttpAddr() string {
64 return fmt.Sprintf("%s:%d", c.Host, c.HTTPPort)
65 }
66
67
68 func (c *Config) GrpcAddr() string {
69 return fmt.Sprintf("%s:%d", c.Host, c.GRPCPort)
70 }
71
72
73 type DaemonServer struct {
74 crClient crclients.ContainerRuntimeInfoClient
75 backgroundProcessManager bpm.BackgroundProcessManager
76
77 IPSetLocker *locker.Locker
78 }
79
80 func newDaemonServer(containerRuntime string) (*DaemonServer, error) {
81 crClient, err := crclients.CreateContainerRuntimeInfoClient(containerRuntime)
82 if err != nil {
83 return nil, err
84 }
85
86 return NewDaemonServerWithCRClient(crClient), nil
87 }
88
89
90 func NewDaemonServerWithCRClient(crClient crclients.ContainerRuntimeInfoClient) *DaemonServer {
91 return &DaemonServer{
92 IPSetLocker: locker.New(),
93 crClient: crClient,
94 backgroundProcessManager: bpm.NewBackgroundProcessManager(),
95 }
96 }
97
98 func newGRPCServer(containerRuntime string, reg prometheus.Registerer, tlsConf tlsConfig) (*grpc.Server, error) {
99 ds, err := newDaemonServer(containerRuntime)
100 if err != nil {
101 return nil, err
102 }
103
104 grpcMetrics := grpc_prometheus.NewServerMetrics()
105 grpcMetrics.EnableHandlingTimeHistogram(
106 grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10}),
107 )
108 reg.MustRegister(grpcMetrics)
109
110 grpcOpts := []grpc.ServerOption{
111 grpc_middleware.WithUnaryServerChain(
112 grpcUtils.TimeoutServerInterceptor,
113 grpcMetrics.UnaryServerInterceptor(),
114 ),
115 }
116
117 if tlsConf != (tlsConfig{}) {
118 caCert, err := ioutil.ReadFile(tlsConf.CaCert)
119 if err != nil {
120 return nil, err
121 }
122 caCertPool := x509.NewCertPool()
123 caCertPool.AppendCertsFromPEM(caCert)
124
125 serverCert, err := tls.LoadX509KeyPair(tlsConf.Cert, tlsConf.Key)
126 if err != nil {
127 return nil, err
128 }
129
130 creds := credentials.NewTLS(&tls.Config{
131 Certificates: []tls.Certificate{serverCert},
132 ClientCAs: caCertPool,
133 ClientAuth: tls.RequireAndVerifyClientCert,
134 MinVersion: tls.VersionTLS13,
135 })
136
137 grpcOpts = append(grpcOpts, grpc.Creds(creds))
138 }
139
140 s := grpc.NewServer(grpcOpts...)
141 grpcMetrics.InitializeMetrics(s)
142
143 pb.RegisterChaosDaemonServer(s, ds)
144 reflection.Register(s)
145
146 return s, nil
147 }
148
149
150 type RegisterGatherer interface {
151 prometheus.Registerer
152 prometheus.Gatherer
153 }
154
155
156 func StartServer(conf *Config, reg RegisterGatherer) error {
157 g := &errgroup.Group{}
158
159 httpBindAddr := conf.HttpAddr()
160 httpServer := newHTTPServerBuilder().Addr(httpBindAddr).Metrics(reg).Profiling(conf.Profiling).Build()
161
162 grpcBindAddr := conf.GrpcAddr()
163 grpcListener, err := net.Listen("tcp", grpcBindAddr)
164 if err != nil {
165 log.Error(err, "failed to listen grpc address", "grpcBindAddr", grpcBindAddr)
166 return err
167 }
168
169 grpcServer, err := newGRPCServer(conf.Runtime, reg, conf.tlsConfig)
170 if err != nil {
171 log.Error(err, "failed to create grpc server")
172 return err
173 }
174
175 g.Go(func() error {
176 log.Info("Starting http endpoint", "address", httpBindAddr)
177 if err := httpServer.ListenAndServe(); err != nil {
178 log.Error(err, "failed to start http endpoint")
179 httpServer.Shutdown(context.Background())
180 return err
181 }
182 return nil
183 })
184
185 g.Go(func() error {
186 log.Info("Starting grpc endpoint", "address", grpcBindAddr, "runtime", conf.Runtime)
187 if err := grpcServer.Serve(grpcListener); err != nil {
188 log.Error(err, "failed to start grpc endpoint")
189 grpcServer.Stop()
190 return err
191 }
192 return nil
193 })
194
195 return g.Wait()
196 }
197