1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package chaosdaemon
17
18 import (
19 "context"
20 "crypto/tls"
21 "crypto/x509"
22 "fmt"
23 "net"
24 "net/http"
25 "os"
26 "sync"
27
28 "github.com/go-logr/logr"
29 grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
30 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
31 "github.com/moby/locker"
32 "github.com/pkg/errors"
33 "github.com/prometheus/client_golang/prometheus"
34 "golang.org/x/sync/errgroup"
35 "google.golang.org/grpc"
36 "google.golang.org/grpc/credentials"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/reflection"
39
40 "github.com/chaos-mesh/chaos-mesh/pkg/bpm"
41 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/crclients"
42 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
43 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/tasks"
44 grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
45 "github.com/chaos-mesh/chaos-mesh/pkg/log"
46 "github.com/chaos-mesh/chaos-mesh/pkg/metrics"
47 )
48
49
50
51
52 type Config struct {
53 HTTPPort int
54 GRPCPort int
55 Host string
56 CrClientConfig *crclients.CrClientConfig
57 Profiling bool
58
59 tlsConfig
60 }
61
62
63 type tlsConfig struct {
64 CaCert string
65 Cert string
66 Key string
67 }
68
69
70 func (c *Config) HttpAddr() string {
71 return net.JoinHostPort(c.Host, fmt.Sprintf("%d", c.HTTPPort))
72 }
73
74
75 func (c *Config) GrpcAddr() string {
76 return net.JoinHostPort(c.Host, fmt.Sprintf("%d", c.GRPCPort))
77 }
78
79
80 type DaemonServer struct {
81 crClient crclients.ContainerRuntimeInfoClient
82 backgroundProcessManager *bpm.BackgroundProcessManager
83 rootLogger logr.Logger
84
85
86 tproxyLocker *sync.Map
87
88 IPSetLocker *locker.Locker
89 timeChaosServer TimeChaosServer
90 }
91
92 func (s *DaemonServer) getLoggerFromContext(ctx context.Context) logr.Logger {
93 return log.EnrichLoggerWithContext(ctx, s.rootLogger)
94 }
95
96 func newDaemonServer(clientConfig *crclients.CrClientConfig, reg prometheus.Registerer, log logr.Logger) (*DaemonServer, error) {
97 crClient, err := crclients.CreateContainerRuntimeInfoClient(clientConfig)
98 if err != nil {
99 return nil, err
100 }
101
102 return NewDaemonServerWithCRClient(crClient, reg, log), nil
103 }
104
105
106 func NewDaemonServerWithCRClient(crClient crclients.ContainerRuntimeInfoClient, reg prometheus.Registerer, log logr.Logger) *DaemonServer {
107 return &DaemonServer{
108 IPSetLocker: locker.New(),
109 crClient: crClient,
110 backgroundProcessManager: bpm.StartBackgroundProcessManager(reg, log),
111 tproxyLocker: new(sync.Map),
112 rootLogger: log,
113 timeChaosServer: TimeChaosServer{
114 podContainerNameProcessMap: tasks.NewPodProcessMap(),
115 manager: tasks.NewTaskManager(logr.New(log.GetSink()).WithName("TimeChaos")),
116 nameLocker: tasks.NewLockMap[tasks.PodContainerName](),
117 logger: logr.New(log.GetSink()).WithName("TimeChaos"),
118 },
119 }
120 }
121
122 func newGRPCServer(daemonServer *DaemonServer, reg prometheus.Registerer, tlsConf tlsConfig) (*grpc.Server, error) {
123 grpcMetrics := grpc_prometheus.NewServerMetrics()
124 grpcMetrics.EnableHandlingTimeHistogram(
125 grpc_prometheus.WithHistogramBuckets(metrics.ChaosDaemonGrpcServerBuckets),
126 metrics.WithHistogramName("chaos_daemon_grpc_server_handling_seconds"),
127 )
128 reg.MustRegister(
129 grpcMetrics,
130 metrics.DefaultChaosDaemonMetricsCollector.InjectCrClient(daemonServer.crClient),
131 )
132
133 grpcOpts := []grpc.ServerOption{
134 grpc_middleware.WithUnaryServerChain(
135 grpcUtils.TimeoutServerInterceptor,
136 grpcMetrics.UnaryServerInterceptor(),
137 MetadataExtractor(log.MetaNamespacedName),
138 ),
139 }
140
141 if tlsConf != (tlsConfig{}) {
142 caCert, err := os.ReadFile(tlsConf.CaCert)
143 if err != nil {
144 return nil, err
145 }
146 caCertPool := x509.NewCertPool()
147 caCertPool.AppendCertsFromPEM(caCert)
148
149 serverCert, err := tls.LoadX509KeyPair(tlsConf.Cert, tlsConf.Key)
150 if err != nil {
151 return nil, err
152 }
153
154 creds := credentials.NewTLS(&tls.Config{
155 Certificates: []tls.Certificate{serverCert},
156 ClientCAs: caCertPool,
157 ClientAuth: tls.RequireAndVerifyClientCert,
158 MinVersion: tls.VersionTLS13,
159 })
160
161 grpcOpts = append(grpcOpts, grpc.Creds(creds))
162 }
163
164 s := grpc.NewServer(grpcOpts...)
165 grpcMetrics.InitializeMetrics(s)
166
167 pb.RegisterChaosDaemonServer(s, daemonServer)
168 reflection.Register(s)
169
170 return s, nil
171 }
172
173 func MetadataExtractor(keys ...log.Metadatkey) grpc.UnaryServerInterceptor {
174 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
175
176 md, ok := metadata.FromIncomingContext(ctx)
177 if !ok {
178 return nil, errors.New("couldn't parse incoming context metadata")
179 }
180 for _, key := range keys {
181 values := md.Get(string(key))
182 if len(values) > 0 {
183 ctx = context.WithValue(ctx, key, values[0])
184 }
185 }
186
187 return handler(ctx, req)
188 }
189 }
190
191
192 type RegisterGatherer interface {
193 prometheus.Registerer
194 prometheus.Gatherer
195 }
196
197
198 type Server struct {
199 daemonServer *DaemonServer
200 httpServer *http.Server
201 grpcServer *grpc.Server
202
203 conf *Config
204 logger logr.Logger
205 }
206
207
208 func BuildServer(conf *Config, reg RegisterGatherer, log logr.Logger) (*Server, error) {
209 server := &Server{conf: conf, logger: log}
210 var err error
211 server.daemonServer, err = newDaemonServer(conf.CrClientConfig, reg, log)
212 if err != nil {
213 return nil, errors.Wrap(err, "create daemon server")
214 }
215
216 server.httpServer = newHTTPServerBuilder().Addr(conf.HttpAddr()).Metrics(reg).Profiling(conf.Profiling).Build()
217 server.grpcServer, err = newGRPCServer(server.daemonServer, reg, conf.tlsConfig)
218 if err != nil {
219 return nil, errors.Wrap(err, "create grpc server")
220 }
221
222 return server, nil
223 }
224
225
226 func (s *Server) Start() error {
227 grpcBindAddr := s.conf.GrpcAddr()
228 grpcListener, err := net.Listen("tcp", grpcBindAddr)
229 if err != nil {
230 return errors.Wrapf(err, "listen grpc address %s", grpcBindAddr)
231 }
232
233 var eg errgroup.Group
234
235 eg.Go(func() error {
236 s.logger.Info("Starting http endpoint", "address", s.conf.HttpAddr())
237 if err := s.httpServer.ListenAndServe(); err != nil {
238 return errors.Wrap(err, "start http endpoint")
239 }
240 return nil
241 })
242
243 eg.Go(func() error {
244 s.logger.Info("Starting grpc endpoint", "address", grpcBindAddr, "runtime", s.conf.CrClientConfig.Runtime)
245 if err := s.grpcServer.Serve(grpcListener); err != nil {
246 return errors.Wrap(err, "start grpc endpoint")
247 }
248 return nil
249 })
250
251 return eg.Wait()
252 }
253
254 func (s *Server) Shutdown() error {
255 if err := s.httpServer.Shutdown(context.TODO()); err != nil {
256 return errors.Wrap(err, "shut grpc endpoint down")
257 }
258 s.grpcServer.GracefulStop()
259 s.daemonServer.backgroundProcessManager.Shutdown(context.TODO())
260 return nil
261 }
262