...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/grpc/utils.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/grpc

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package grpc
    17  
    18  import (
    19  	"context"
    20  	"crypto/tls"
    21  	"crypto/x509"
    22  	"net"
    23  	"os"
    24  	"strconv"
    25  	"time"
    26  
    27  	"google.golang.org/grpc/credentials/insecure"
    28  
    29  	"github.com/pkg/errors"
    30  	"google.golang.org/grpc"
    31  	"google.golang.org/grpc/credentials"
    32  	"google.golang.org/grpc/metadata"
    33  	"k8s.io/apimachinery/pkg/types"
    34  
    35  	"github.com/chaos-mesh/chaos-mesh/pkg/log"
    36  )
    37  
    38  // DefaultRPCTimeout specifies default timeout of RPC between controller and chaos-operator
    39  const DefaultRPCTimeout = 60 * time.Second
    40  
    41  // RPCTimeout specifies timeout of RPC between controller and chaos-operator
    42  var RPCTimeout = DefaultRPCTimeout
    43  
    44  const ChaosDaemonServerName = "chaos-daemon.chaos-mesh.org"
    45  
    46  type TLSRaw struct {
    47  	CaCert []byte
    48  	Cert   []byte
    49  	Key    []byte
    50  }
    51  
    52  type TLSFile struct {
    53  	CaCert string
    54  	Cert   string
    55  	Key    string
    56  }
    57  
    58  type FileProvider struct {
    59  	file TLSFile
    60  }
    61  
    62  type RawProvider struct {
    63  	raw TLSRaw
    64  }
    65  
    66  type InsecureProvider struct {
    67  }
    68  
    69  type CredentialProvider interface {
    70  	getCredentialOption() (grpc.DialOption, error)
    71  }
    72  
    73  func (it *FileProvider) getCredentialOption() (grpc.DialOption, error) {
    74  	caCert, err := os.ReadFile(it.file.CaCert)
    75  	if err != nil {
    76  		return nil, err
    77  	}
    78  	caCertPool := x509.NewCertPool()
    79  	caCertPool.AppendCertsFromPEM(caCert)
    80  
    81  	clientCert, err := tls.LoadX509KeyPair(it.file.Cert, it.file.Key)
    82  	if err != nil {
    83  		return nil, err
    84  	}
    85  
    86  	creds := credentials.NewTLS(&tls.Config{
    87  		Certificates: []tls.Certificate{clientCert},
    88  		RootCAs:      caCertPool,
    89  		ServerName:   ChaosDaemonServerName,
    90  	})
    91  	return grpc.WithTransportCredentials(creds), nil
    92  }
    93  
    94  func (it *RawProvider) getCredentialOption() (grpc.DialOption, error) {
    95  	caCertPool := x509.NewCertPool()
    96  	caCertPool.AppendCertsFromPEM(it.raw.CaCert)
    97  
    98  	clientCert, err := tls.X509KeyPair(it.raw.Cert, it.raw.Key)
    99  	if err != nil {
   100  		return nil, err
   101  	}
   102  
   103  	creds := credentials.NewTLS(&tls.Config{
   104  		Certificates: []tls.Certificate{clientCert},
   105  		RootCAs:      caCertPool,
   106  		ServerName:   ChaosDaemonServerName,
   107  	})
   108  	return grpc.WithTransportCredentials(creds), nil
   109  }
   110  
   111  func (it *InsecureProvider) getCredentialOption() (grpc.DialOption, error) {
   112  	return grpc.WithTransportCredentials(insecure.NewCredentials()), nil
   113  }
   114  
   115  type GrpcBuilder struct {
   116  	options            []grpc.DialOption
   117  	credentialProvider CredentialProvider
   118  	address            string
   119  	port               int
   120  }
   121  
   122  func Builder(address string, port int) *GrpcBuilder {
   123  	return &GrpcBuilder{options: []grpc.DialOption{}, address: address, port: port}
   124  }
   125  
   126  func (it *GrpcBuilder) WithDefaultTimeout() *GrpcBuilder {
   127  	it.options = append(it.options, grpc.WithUnaryInterceptor(TimeoutClientInterceptor(DefaultRPCTimeout)))
   128  	return it
   129  }
   130  
   131  func (it *GrpcBuilder) WithTimeout(timeout time.Duration) *GrpcBuilder {
   132  	it.options = append(it.options, grpc.WithUnaryInterceptor(TimeoutClientInterceptor(timeout)))
   133  	return it
   134  }
   135  
   136  func namespacedNameInterceptor(id types.NamespacedName) grpc.UnaryClientInterceptor {
   137  	return func(ctx context.Context, method string, req, reply interface{}, conn *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
   138  		ctx = metadata.AppendToOutgoingContext(ctx, string(log.MetaNamespacedName), id.String())
   139  
   140  		return invoker(ctx, method, req, reply, conn, opts...)
   141  	}
   142  }
   143  
   144  func (it *GrpcBuilder) WithNamespacedName(id types.NamespacedName) *GrpcBuilder {
   145  	it.options = append(it.options, grpc.WithUnaryInterceptor(namespacedNameInterceptor(id)))
   146  	return it
   147  }
   148  
   149  func (it *GrpcBuilder) Insecure() *GrpcBuilder {
   150  	it.credentialProvider = &InsecureProvider{}
   151  	return it
   152  }
   153  
   154  func (it *GrpcBuilder) TLSFromRaw(caCert []byte, cert []byte, key []byte) *GrpcBuilder {
   155  	it.credentialProvider = &RawProvider{
   156  		raw: TLSRaw{
   157  			CaCert: caCert,
   158  			Cert:   cert,
   159  			Key:    key,
   160  		},
   161  	}
   162  
   163  	return it
   164  }
   165  
   166  func (it *GrpcBuilder) TLSFromFile(caCertPath string, certPath string, keyPath string) *GrpcBuilder {
   167  	it.credentialProvider = &FileProvider{
   168  		file: TLSFile{
   169  			CaCert: caCertPath,
   170  			Cert:   certPath,
   171  			Key:    keyPath,
   172  		},
   173  	}
   174  	return it
   175  }
   176  
   177  func (it *GrpcBuilder) Build() (*grpc.ClientConn, error) {
   178  	if it.credentialProvider == nil {
   179  		return nil, errors.New("an authorization method must be specified")
   180  	}
   181  	credentialOption, err := it.credentialProvider.getCredentialOption()
   182  	if err != nil {
   183  		return nil, err
   184  	}
   185  	it.options = append(it.options, credentialOption)
   186  	return grpc.Dial(net.JoinHostPort(it.address, strconv.Itoa(it.port)), it.options...)
   187  }
   188  
   189  // TimeoutClientInterceptor wraps the RPC with a timeout.
   190  func TimeoutClientInterceptor(timeout time.Duration) func(context.Context, string, interface{}, interface{},
   191  	*grpc.ClientConn, grpc.UnaryInvoker, ...grpc.CallOption) error {
   192  	return func(ctx context.Context, method string, req, reply interface{},
   193  		cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
   194  		ctx, cancel := context.WithTimeout(ctx, timeout)
   195  		defer cancel()
   196  		return invoker(ctx, method, req, reply, cc, opts...)
   197  	}
   198  }
   199  
   200  // TimeoutServerInterceptor ensures the context is intact before handling over the
   201  // request to application.
   202  func TimeoutServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
   203  	handler grpc.UnaryHandler) (interface{}, error) {
   204  	if ctx.Err() != nil {
   205  		return nil, ctx.Err()
   206  	}
   207  	return handler(ctx, req)
   208  }
   209