1
2
3
4
5
6
7
8
9
10
11
12
13
14 package grpc
15
16 import (
17 "context"
18 "fmt"
19 "time"
20
21 "github.com/pkg/errors"
22 "google.golang.org/grpc"
23 v1 "k8s.io/api/core/v1"
24 "k8s.io/apimachinery/pkg/types"
25 ctrl "sigs.k8s.io/controller-runtime"
26
27 "github.com/chaos-mesh/chaos-mesh/controllers/config"
28
29 "sigs.k8s.io/controller-runtime/pkg/client"
30 )
31
32
33 const DefaultRPCTimeout = 60 * time.Second
34
35
36 var RPCTimeout = DefaultRPCTimeout
37
38 var log = ctrl.Log.WithName("util")
39
40
41 func CreateGrpcConnection(ctx context.Context, c client.Client, pod *v1.Pod, port int) (*grpc.ClientConn, error) {
42 nodeName := pod.Spec.NodeName
43 log.Info("Creating client to chaos-daemon", "node", nodeName)
44
45 ns := config.ControllerCfg.Namespace
46 var endpoints v1.Endpoints
47 err := c.Get(ctx, types.NamespacedName{
48 Namespace: ns,
49 Name: "chaos-daemon",
50 }, &endpoints)
51 if err != nil {
52 return nil, err
53 }
54
55 daemonIP := findIPOnEndpoints(&endpoints, nodeName)
56 if len(daemonIP) == 0 {
57 return nil, errors.Errorf("cannot find daemonIP on node %s in related Endpoints %v", nodeName, endpoints)
58 }
59 return CreateGrpcConnectionWithAddress(daemonIP, port)
60 }
61
62
63 func CreateGrpcConnectionWithAddress(address string, port int) (*grpc.ClientConn, error) {
64 conn, err := grpc.Dial(fmt.Sprintf("%s:%d", address, port),
65 grpc.WithInsecure(),
66 grpc.WithUnaryInterceptor(TimeoutClientInterceptor))
67 if err != nil {
68 return nil, err
69 }
70 return conn, nil
71 }
72
73 func findIPOnEndpoints(e *v1.Endpoints, nodeName string) string {
74 for _, subset := range e.Subsets {
75 for _, addr := range subset.Addresses {
76 if addr.NodeName != nil && *addr.NodeName == nodeName {
77 return addr.IP
78 }
79 }
80 }
81
82 return ""
83 }
84
85
86 func TimeoutClientInterceptor(ctx context.Context, method string, req, reply interface{},
87 cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
88 ctx, cancel := context.WithTimeout(ctx, RPCTimeout)
89 defer cancel()
90 return invoker(ctx, method, req, reply, cc, opts...)
91 }
92
93
94
95 func TimeoutServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
96 handler grpc.UnaryHandler) (interface{}, error) {
97 if ctx.Err() != nil {
98 return nil, ctx.Err()
99 }
100 return handler(ctx, req)
101 }
102