1
2
3
4
5
6
7
8
9
10
11
12
13
14 package chaosdaemon
15
16 import (
17 "context"
18 "crypto/tls"
19 "crypto/x509"
20 "fmt"
21 "io/ioutil"
22 "net"
23
24 grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
25 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
26 "github.com/moby/locker"
27 "github.com/prometheus/client_golang/prometheus"
28 "golang.org/x/sync/errgroup"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/credentials"
31 "google.golang.org/grpc/reflection"
32 ctrl "sigs.k8s.io/controller-runtime"
33
34 "github.com/chaos-mesh/chaos-mesh/pkg/bpm"
35 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/crclients"
36 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
37 grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
38 )
39
40 var log = ctrl.Log.WithName("chaos-daemon-server")
41
42
43
44
45 type Config struct {
46 HTTPPort int
47 GRPCPort int
48 Host string
49 Runtime string
50 Profiling bool
51
52 tlsConfig
53 }
54
55
56 type tlsConfig struct {
57 CaCert string
58 Cert string
59 Key string
60 }
61
62
63 func (c *Config) HttpAddr() string {
64 return net.JoinHostPort(c.Host, fmt.Sprintf("%d", c.HTTPPort))
65 }
66
67
68 func (c *Config) GrpcAddr() string {
69 return net.JoinHostPort(c.Host, fmt.Sprintf("%d", c.GRPCPort))
70 }
71
72
73 type DaemonServer struct {
74 crClient crclients.ContainerRuntimeInfoClient
75 backgroundProcessManager bpm.BackgroundProcessManager
76
77 IPSetLocker *locker.Locker
78 }
79
80 func newDaemonServer(containerRuntime string) (*DaemonServer, error) {
81 crClient, err := crclients.CreateContainerRuntimeInfoClient(containerRuntime)
82 if err != nil {
83 return nil, err
84 }
85
86 return NewDaemonServerWithCRClient(crClient), nil
87 }
88
89
90 func NewDaemonServerWithCRClient(crClient crclients.ContainerRuntimeInfoClient) *DaemonServer {
91 return &DaemonServer{
92 IPSetLocker: locker.New(),
93 crClient: crClient,
94 backgroundProcessManager: bpm.NewBackgroundProcessManager(),
95 }
96 }
97
98 func newGRPCServer(containerRuntime string, reg prometheus.Registerer, tlsConf tlsConfig) (*grpc.Server, error) {
99 ds, err := newDaemonServer(containerRuntime)
100 if err != nil {
101 return nil, err
102 }
103
104 grpcMetrics := grpc_prometheus.NewServerMetrics()
105 grpcMetrics.EnableHandlingTimeHistogram(
106 grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10}),
107 )
108 reg.MustRegister(grpcMetrics)
109
110 grpcOpts := []grpc.ServerOption{
111 grpc_middleware.WithUnaryServerChain(
112 grpcUtils.TimeoutServerInterceptor,
113 grpcMetrics.UnaryServerInterceptor(),
114 ),
115 }
116
117 if tlsConf != (tlsConfig{}) {
118 caCert, err := ioutil.ReadFile(tlsConf.CaCert)
119 if err != nil {
120 return nil, err
121 }
122 caCertPool := x509.NewCertPool()
123 caCertPool.AppendCertsFromPEM(caCert)
124
125 serverCert, err := tls.LoadX509KeyPair(tlsConf.Cert, tlsConf.Key)
126 if err != nil {
127 return nil, err
128 }
129
130 creds := credentials.NewTLS(&tls.Config{
131 Certificates: []tls.Certificate{serverCert},
132 ClientCAs: caCertPool,
133 ClientAuth: tls.RequireAndVerifyClientCert,
134 })
135
136 grpcOpts = append(grpcOpts, grpc.Creds(creds))
137 }
138
139 s := grpc.NewServer(grpcOpts...)
140 grpcMetrics.InitializeMetrics(s)
141
142 pb.RegisterChaosDaemonServer(s, ds)
143 reflection.Register(s)
144
145 return s, nil
146 }
147
148
149 type RegisterGatherer interface {
150 prometheus.Registerer
151 prometheus.Gatherer
152 }
153
154
155 func StartServer(conf *Config, reg RegisterGatherer) error {
156 g := &errgroup.Group{}
157
158 httpBindAddr := conf.HttpAddr()
159 httpServer := newHTTPServerBuilder().Addr(httpBindAddr).Metrics(reg).Profiling(conf.Profiling).Build()
160
161 grpcBindAddr := conf.GrpcAddr()
162 grpcListener, err := net.Listen("tcp", grpcBindAddr)
163 if err != nil {
164 log.Error(err, "failed to listen grpc address", "grpcBindAddr", grpcBindAddr)
165 return err
166 }
167
168 grpcServer, err := newGRPCServer(conf.Runtime, reg, conf.tlsConfig)
169 if err != nil {
170 log.Error(err, "failed to create grpc server")
171 return err
172 }
173
174 g.Go(func() error {
175 log.Info("Starting http endpoint", "address", httpBindAddr)
176 if err := httpServer.ListenAndServe(); err != nil {
177 log.Error(err, "failed to start http endpoint")
178 httpServer.Shutdown(context.Background())
179 return err
180 }
181 return nil
182 })
183
184 g.Go(func() error {
185 log.Info("Starting grpc endpoint", "address", grpcBindAddr, "runtime", conf.Runtime)
186 if err := grpcServer.Serve(grpcListener); err != nil {
187 log.Error(err, "failed to start grpc endpoint")
188 grpcServer.Stop()
189 return err
190 }
191 return nil
192 })
193
194 return g.Wait()
195 }
196