...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package chaosdaemon
17
18 import (
19 "context"
20 "strings"
21
22 "github.com/pkg/errors"
23 "go.uber.org/fx"
24 v1 "k8s.io/api/core/v1"
25 discoveryv1 "k8s.io/api/discovery/v1"
26 "k8s.io/apimachinery/pkg/types"
27 ctrl "sigs.k8s.io/controller-runtime"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29
30 "github.com/chaos-mesh/chaos-mesh/controllers/config"
31 chaosdaemonclient "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/client"
32 grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
33 "github.com/chaos-mesh/chaos-mesh/pkg/mock"
34 )
35
36 var log = ctrl.Log.WithName("controller-chaos-daemon-client-utils")
37
38 func findIPOnEndpointSlice(e *discoveryv1.EndpointSliceList, nodeName string) string {
39 for _, endpointSlice := range e.Items {
40
41 if strings.HasPrefix(endpointSlice.ObjectMeta.Name, "chaos-daemon-") {
42 for _, ep := range endpointSlice.Endpoints {
43 if ep.NodeName != nil && *ep.NodeName == nodeName {
44 return ep.Addresses[0]
45 }
46 }
47 }
48 }
49
50 return ""
51 }
52
53 type ChaosDaemonClientBuilder struct {
54 client.Reader
55 }
56
57 func (b *ChaosDaemonClientBuilder) FindDaemonIP(ctx context.Context, pod *v1.Pod) (string, error) {
58 nodeName := pod.Spec.NodeName
59 log.Info("Creating client to chaos-daemon", "node", nodeName)
60
61 ns := config.ControllerCfg.Namespace
62 var endpointSliceList discoveryv1.EndpointSliceList
63 err := b.Reader.List(ctx, &endpointSliceList, client.InNamespace(ns))
64 if err != nil {
65 return "", err
66 }
67
68 daemonIP := findIPOnEndpointSlice(&endpointSliceList, nodeName)
69 if len(daemonIP) == 0 {
70 return "", errors.Errorf("cannot find daemonIP on node %s in endpointSliceList %v", nodeName, endpointSliceList)
71 }
72
73 return daemonIP, nil
74 }
75
76
77
78
79 func (b *ChaosDaemonClientBuilder) Build(ctx context.Context, pod *v1.Pod, id *types.NamespacedName) (chaosdaemonclient.ChaosDaemonClientInterface, error) {
80 if cli := mock.On("MockChaosDaemonClient"); cli != nil {
81 return cli.(chaosdaemonclient.ChaosDaemonClientInterface), nil
82 }
83 if err := mock.On("NewChaosDaemonClientError"); err != nil {
84 return nil, err.(error)
85 }
86
87 daemonIP, err := b.FindDaemonIP(ctx, pod)
88 if err != nil {
89 return nil, err
90 }
91 builder := grpcUtils.Builder(daemonIP, config.ControllerCfg.ChaosDaemonPort).WithDefaultTimeout()
92 if config.ControllerCfg.TLSConfig.ChaosMeshCACert != "" {
93 builder.TLSFromFile(config.ControllerCfg.TLSConfig.ChaosMeshCACert, config.ControllerCfg.TLSConfig.ChaosDaemonClientCert, config.ControllerCfg.TLSConfig.ChaosDaemonClientKey)
94 } else {
95 builder.Insecure()
96 }
97
98 if id != nil {
99 builder = builder.WithNamespacedName(*id)
100 }
101
102 cc, err := builder.Build()
103 if err != nil {
104 return nil, err
105 }
106 return chaosdaemonclient.New(cc), nil
107 }
108
109 type ChaosDaemonClientBuilderParams struct {
110 fx.In
111
112 NoCacheReader client.Reader `name:"no-cache"`
113 ControlPlaneCacheReader client.Reader `name:"control-plane-cache" optional:"true"`
114 }
115
116 func New(params ChaosDaemonClientBuilderParams) *ChaosDaemonClientBuilder {
117 var reader client.Reader
118 if params.ControlPlaneCacheReader != nil {
119 reader = params.ControlPlaneCacheReader
120 } else {
121 reader = params.NoCacheReader
122 }
123 return &ChaosDaemonClientBuilder{
124 Reader: reader,
125 }
126 }
127