1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package grpc
17
18 import (
19 "context"
20 "crypto/tls"
21 "crypto/x509"
22 "net"
23 "os"
24 "strconv"
25 "time"
26
27 "github.com/pkg/errors"
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/credentials"
30 "google.golang.org/grpc/metadata"
31 "k8s.io/apimachinery/pkg/types"
32
33 "github.com/chaos-mesh/chaos-mesh/pkg/log"
34 )
35
36
37 const DefaultRPCTimeout = 60 * time.Second
38
39
40 var RPCTimeout = DefaultRPCTimeout
41
42 const ChaosDaemonServerName = "chaos-daemon.chaos-mesh.org"
43
44 type TLSRaw struct {
45 CaCert []byte
46 Cert []byte
47 Key []byte
48 }
49
50 type TLSFile struct {
51 CaCert string
52 Cert string
53 Key string
54 }
55
56 type FileProvider struct {
57 file TLSFile
58 }
59
60 type RawProvider struct {
61 raw TLSRaw
62 }
63
64 type InsecureProvider struct {
65 }
66
67 type CredentialProvider interface {
68 getCredentialOption() (grpc.DialOption, error)
69 }
70
71 func (it *FileProvider) getCredentialOption() (grpc.DialOption, error) {
72 caCert, err := os.ReadFile(it.file.CaCert)
73 if err != nil {
74 return nil, err
75 }
76 caCertPool := x509.NewCertPool()
77 caCertPool.AppendCertsFromPEM(caCert)
78
79 clientCert, err := tls.LoadX509KeyPair(it.file.Cert, it.file.Key)
80 if err != nil {
81 return nil, err
82 }
83
84 creds := credentials.NewTLS(&tls.Config{
85 Certificates: []tls.Certificate{clientCert},
86 RootCAs: caCertPool,
87 ServerName: ChaosDaemonServerName,
88 })
89 return grpc.WithTransportCredentials(creds), nil
90 }
91
92 func (it *RawProvider) getCredentialOption() (grpc.DialOption, error) {
93 caCertPool := x509.NewCertPool()
94 caCertPool.AppendCertsFromPEM(it.raw.CaCert)
95
96 clientCert, err := tls.X509KeyPair(it.raw.Cert, it.raw.Key)
97 if err != nil {
98 return nil, err
99 }
100
101 creds := credentials.NewTLS(&tls.Config{
102 Certificates: []tls.Certificate{clientCert},
103 RootCAs: caCertPool,
104 ServerName: ChaosDaemonServerName,
105 })
106 return grpc.WithTransportCredentials(creds), nil
107 }
108
109 func (it *InsecureProvider) getCredentialOption() (grpc.DialOption, error) {
110 return grpc.WithInsecure(), nil
111 }
112
113 type GrpcBuilder struct {
114 options []grpc.DialOption
115 credentialProvider CredentialProvider
116 address string
117 port int
118 }
119
120 func Builder(address string, port int) *GrpcBuilder {
121 return &GrpcBuilder{options: []grpc.DialOption{}, address: address, port: port}
122 }
123
124 func (it *GrpcBuilder) WithDefaultTimeout() *GrpcBuilder {
125 it.options = append(it.options, grpc.WithUnaryInterceptor(TimeoutClientInterceptor(DefaultRPCTimeout)))
126 return it
127 }
128
129 func (it *GrpcBuilder) WithTimeout(timeout time.Duration) *GrpcBuilder {
130 it.options = append(it.options, grpc.WithUnaryInterceptor(TimeoutClientInterceptor(timeout)))
131 return it
132 }
133
134 func namespacedNameInterceptor(id types.NamespacedName) grpc.UnaryClientInterceptor {
135 return func(ctx context.Context, method string, req, reply interface{}, conn *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
136 ctx = metadata.AppendToOutgoingContext(ctx, string(log.MetaNamespacedName), id.String())
137
138 return invoker(ctx, method, req, reply, conn, opts...)
139 }
140 }
141
142 func (it *GrpcBuilder) WithNamespacedName(id types.NamespacedName) *GrpcBuilder {
143 it.options = append(it.options, grpc.WithUnaryInterceptor(namespacedNameInterceptor(id)))
144 return it
145 }
146
147 func (it *GrpcBuilder) Insecure() *GrpcBuilder {
148 it.credentialProvider = &InsecureProvider{}
149 return it
150 }
151
152 func (it *GrpcBuilder) TLSFromRaw(caCert []byte, cert []byte, key []byte) *GrpcBuilder {
153 it.credentialProvider = &RawProvider{
154 raw: TLSRaw{
155 CaCert: caCert,
156 Cert: cert,
157 Key: key,
158 },
159 }
160
161 return it
162 }
163
164 func (it *GrpcBuilder) TLSFromFile(caCertPath string, certPath string, keyPath string) *GrpcBuilder {
165 it.credentialProvider = &FileProvider{
166 file: TLSFile{
167 CaCert: caCertPath,
168 Cert: certPath,
169 Key: keyPath,
170 },
171 }
172 return it
173 }
174
175 func (it *GrpcBuilder) Build() (*grpc.ClientConn, error) {
176 if it.credentialProvider == nil {
177 return nil, errors.New("an authorization method must be specified")
178 }
179 credentialOption, err := it.credentialProvider.getCredentialOption()
180 if err != nil {
181 return nil, err
182 }
183 it.options = append(it.options, credentialOption)
184 return grpc.Dial(net.JoinHostPort(it.address, strconv.Itoa(it.port)), it.options...)
185 }
186
187
188 func TimeoutClientInterceptor(timeout time.Duration) func(context.Context, string, interface{}, interface{},
189 *grpc.ClientConn, grpc.UnaryInvoker, ...grpc.CallOption) error {
190 return func(ctx context.Context, method string, req, reply interface{},
191 cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
192 ctx, cancel := context.WithTimeout(ctx, timeout)
193 defer cancel()
194 return invoker(ctx, method, req, reply, cc, opts...)
195 }
196 }
197
198
199
200 func TimeoutServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
201 handler grpc.UnaryHandler) (interface{}, error) {
202 if ctx.Err() != nil {
203 return nil, ctx.Err()
204 }
205 return handler(ctx, req)
206 }
207