1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package grpc
17
18 import (
19 "context"
20 "crypto/tls"
21 "crypto/x509"
22 "net"
23 "os"
24 "strconv"
25 "time"
26
27 "google.golang.org/grpc/credentials/insecure"
28
29 "github.com/pkg/errors"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/credentials"
32 "google.golang.org/grpc/metadata"
33 "k8s.io/apimachinery/pkg/types"
34
35 "github.com/chaos-mesh/chaos-mesh/pkg/log"
36 )
37
38
39 const DefaultRPCTimeout = 60 * time.Second
40
41
42 var RPCTimeout = DefaultRPCTimeout
43
44 const ChaosDaemonServerName = "chaos-daemon.chaos-mesh.org"
45
46 type TLSRaw struct {
47 CaCert []byte
48 Cert []byte
49 Key []byte
50 }
51
52 type TLSFile struct {
53 CaCert string
54 Cert string
55 Key string
56 }
57
58 type FileProvider struct {
59 file TLSFile
60 }
61
62 type RawProvider struct {
63 raw TLSRaw
64 }
65
66 type InsecureProvider struct {
67 }
68
69 type CredentialProvider interface {
70 getCredentialOption() (grpc.DialOption, error)
71 }
72
73 func (it *FileProvider) getCredentialOption() (grpc.DialOption, error) {
74 caCert, err := os.ReadFile(it.file.CaCert)
75 if err != nil {
76 return nil, err
77 }
78 caCertPool := x509.NewCertPool()
79 caCertPool.AppendCertsFromPEM(caCert)
80
81 clientCert, err := tls.LoadX509KeyPair(it.file.Cert, it.file.Key)
82 if err != nil {
83 return nil, err
84 }
85
86 creds := credentials.NewTLS(&tls.Config{
87 Certificates: []tls.Certificate{clientCert},
88 RootCAs: caCertPool,
89 ServerName: ChaosDaemonServerName,
90 })
91 return grpc.WithTransportCredentials(creds), nil
92 }
93
94 func (it *RawProvider) getCredentialOption() (grpc.DialOption, error) {
95 caCertPool := x509.NewCertPool()
96 caCertPool.AppendCertsFromPEM(it.raw.CaCert)
97
98 clientCert, err := tls.X509KeyPair(it.raw.Cert, it.raw.Key)
99 if err != nil {
100 return nil, err
101 }
102
103 creds := credentials.NewTLS(&tls.Config{
104 Certificates: []tls.Certificate{clientCert},
105 RootCAs: caCertPool,
106 ServerName: ChaosDaemonServerName,
107 })
108 return grpc.WithTransportCredentials(creds), nil
109 }
110
111 func (it *InsecureProvider) getCredentialOption() (grpc.DialOption, error) {
112 return grpc.WithTransportCredentials(insecure.NewCredentials()), nil
113 }
114
115 type GrpcBuilder struct {
116 options []grpc.DialOption
117 credentialProvider CredentialProvider
118 address string
119 port int
120 }
121
122 func Builder(address string, port int) *GrpcBuilder {
123 return &GrpcBuilder{options: []grpc.DialOption{}, address: address, port: port}
124 }
125
126 func (it *GrpcBuilder) WithDefaultTimeout() *GrpcBuilder {
127 it.options = append(it.options, grpc.WithUnaryInterceptor(TimeoutClientInterceptor(DefaultRPCTimeout)))
128 return it
129 }
130
131 func (it *GrpcBuilder) WithTimeout(timeout time.Duration) *GrpcBuilder {
132 it.options = append(it.options, grpc.WithUnaryInterceptor(TimeoutClientInterceptor(timeout)))
133 return it
134 }
135
136 func namespacedNameInterceptor(id types.NamespacedName) grpc.UnaryClientInterceptor {
137 return func(ctx context.Context, method string, req, reply interface{}, conn *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
138 ctx = metadata.AppendToOutgoingContext(ctx, string(log.MetaNamespacedName), id.String())
139
140 return invoker(ctx, method, req, reply, conn, opts...)
141 }
142 }
143
144 func (it *GrpcBuilder) WithNamespacedName(id types.NamespacedName) *GrpcBuilder {
145 it.options = append(it.options, grpc.WithUnaryInterceptor(namespacedNameInterceptor(id)))
146 return it
147 }
148
149 func (it *GrpcBuilder) Insecure() *GrpcBuilder {
150 it.credentialProvider = &InsecureProvider{}
151 return it
152 }
153
154 func (it *GrpcBuilder) TLSFromRaw(caCert []byte, cert []byte, key []byte) *GrpcBuilder {
155 it.credentialProvider = &RawProvider{
156 raw: TLSRaw{
157 CaCert: caCert,
158 Cert: cert,
159 Key: key,
160 },
161 }
162
163 return it
164 }
165
166 func (it *GrpcBuilder) TLSFromFile(caCertPath string, certPath string, keyPath string) *GrpcBuilder {
167 it.credentialProvider = &FileProvider{
168 file: TLSFile{
169 CaCert: caCertPath,
170 Cert: certPath,
171 Key: keyPath,
172 },
173 }
174 return it
175 }
176
177 func (it *GrpcBuilder) Build() (*grpc.ClientConn, error) {
178 if it.credentialProvider == nil {
179 return nil, errors.New("an authorization method must be specified")
180 }
181 credentialOption, err := it.credentialProvider.getCredentialOption()
182 if err != nil {
183 return nil, err
184 }
185 it.options = append(it.options, credentialOption)
186 return grpc.Dial(net.JoinHostPort(it.address, strconv.Itoa(it.port)), it.options...)
187 }
188
189
190 func TimeoutClientInterceptor(timeout time.Duration) func(context.Context, string, interface{}, interface{},
191 *grpc.ClientConn, grpc.UnaryInvoker, ...grpc.CallOption) error {
192 return func(ctx context.Context, method string, req, reply interface{},
193 cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
194 ctx, cancel := context.WithTimeout(ctx, timeout)
195 defer cancel()
196 return invoker(ctx, method, req, reply, cc, opts...)
197 }
198 }
199
200
201
202 func TimeoutServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
203 handler grpc.UnaryHandler) (interface{}, error) {
204 if ctx.Err() != nil {
205 return nil, ctx.Err()
206 }
207 return handler(ctx, req)
208 }
209