...
1
2
3
4
5
6
7
8
9
10
11
12
13
14 package chaosdaemon
15
16 import (
17 "context"
18
19 "github.com/pkg/errors"
20 "go.uber.org/fx"
21 v1 "k8s.io/api/core/v1"
22 "k8s.io/apimachinery/pkg/types"
23 ctrl "sigs.k8s.io/controller-runtime"
24 "sigs.k8s.io/controller-runtime/pkg/client"
25
26 "github.com/chaos-mesh/chaos-mesh/controllers/config"
27 chaosdaemonclient "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/client"
28 grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
29 "github.com/chaos-mesh/chaos-mesh/pkg/mock"
30 )
31
32 var log = ctrl.Log.WithName("controller-chaos-daemon-client-utils")
33
34 func findIPOnEndpoints(e *v1.Endpoints, nodeName string) string {
35 for _, subset := range e.Subsets {
36 for _, addr := range subset.Addresses {
37 if addr.NodeName != nil && *addr.NodeName == nodeName {
38 return addr.IP
39 }
40 }
41 }
42
43 return ""
44 }
45
46 type ChaosDaemonClientBuilder struct {
47 client.Reader
48 }
49
50 func (b *ChaosDaemonClientBuilder) FindDaemonIP(ctx context.Context, pod *v1.Pod) (string, error) {
51 nodeName := pod.Spec.NodeName
52 log.Info("Creating client to chaos-daemon", "node", nodeName)
53
54 ns := config.ControllerCfg.Namespace
55 var endpoints v1.Endpoints
56 err := b.Reader.Get(ctx, types.NamespacedName{
57 Namespace: ns,
58 Name: "chaos-daemon",
59 }, &endpoints)
60 if err != nil {
61 return "", err
62 }
63
64 daemonIP := findIPOnEndpoints(&endpoints, nodeName)
65 if len(daemonIP) == 0 {
66 return "", errors.Errorf("cannot find daemonIP on node %s in related Endpoints %v", nodeName, endpoints)
67 }
68
69 return daemonIP, nil
70 }
71
72 func (b *ChaosDaemonClientBuilder) Build(ctx context.Context, pod *v1.Pod) (chaosdaemonclient.ChaosDaemonClientInterface, error) {
73 if cli := mock.On("MockChaosDaemonClient"); cli != nil {
74 return cli.(chaosdaemonclient.ChaosDaemonClientInterface), nil
75 }
76 if err := mock.On("NewChaosDaemonClientError"); err != nil {
77 return nil, err.(error)
78 }
79
80 daemonIP, err := b.FindDaemonIP(ctx, pod)
81 if err != nil {
82 return nil, err
83 }
84 builder := grpcUtils.Builder(daemonIP, config.ControllerCfg.ChaosDaemonPort).WithDefaultTimeout()
85 if config.ControllerCfg.TLSConfig.ChaosMeshCACert != "" {
86 builder.TLSFromFile(config.ControllerCfg.TLSConfig.ChaosMeshCACert, config.ControllerCfg.TLSConfig.ChaosDaemonClientCert, config.ControllerCfg.TLSConfig.ChaosDaemonClientKey)
87 } else {
88 builder.Insecure()
89 }
90 cc, err := builder.Build()
91 if err != nil {
92 return nil, err
93 }
94 return chaosdaemonclient.New(cc), nil
95 }
96
97 type ChaosDaemonClientBuilderParams struct {
98 fx.In
99
100 NoCacheReader client.Reader `name:"no-cache"`
101 ControlPlaneCacheReader client.Reader `name:"control-plane-cache" optional:"true"`
102 }
103
104 func New(params ChaosDaemonClientBuilderParams) *ChaosDaemonClientBuilder {
105 var reader client.Reader
106 if params.ControlPlaneCacheReader != nil {
107 reader = params.ControlPlaneCacheReader
108 } else {
109 reader = params.NoCacheReader
110 }
111 return &ChaosDaemonClientBuilder{
112 Reader: reader,
113 }
114 }
115