1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package clientpool
17
18 import (
19 "net/http"
20 "strings"
21 "sync"
22
23 lru "github.com/hashicorp/golang-lru/v2"
24 "github.com/pkg/errors"
25 "k8s.io/apimachinery/pkg/runtime"
26 authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
27 "k8s.io/client-go/rest"
28 pkgclient "sigs.k8s.io/controller-runtime/pkg/client"
29
30 "github.com/chaos-mesh/chaos-mesh/pkg/mock"
31 )
32
33
34 var K8sClients Clients
35
36 type Clients interface {
37 Client(token string) (pkgclient.Client, error)
38 AuthClient(token string) (authorizationv1.AuthorizationV1Interface, error)
39 Num() int
40 Contains(token string) bool
41 }
42
43 type LocalClient struct {
44 client pkgclient.Client
45 authClient authorizationv1.AuthorizationV1Interface
46 }
47
48 func NewLocalClient(localConfig *rest.Config, scheme *runtime.Scheme) (Clients, error) {
49 client, err := pkgclient.New(localConfig, pkgclient.Options{
50 Scheme: scheme,
51 })
52 if err != nil {
53 return nil, err
54 }
55
56 authCli, err := authorizationv1.NewForConfig(localConfig)
57 if err != nil {
58 return nil, err
59 }
60
61 return &LocalClient{
62 client: client,
63 authClient: authCli,
64 }, nil
65 }
66
67
68 func (c *LocalClient) Client(token string) (pkgclient.Client, error) {
69 return c.client, nil
70 }
71
72 func (c *LocalClient) AuthClient(token string) (authorizationv1.AuthorizationV1Interface, error) {
73 return c.authClient, nil
74 }
75
76
77 func (c *LocalClient) Num() int {
78 return 1
79 }
80
81
82 func (c *LocalClient) Contains(token string) bool {
83 return false
84 }
85
86
87 type ClientsPool struct {
88 sync.RWMutex
89
90 scheme *runtime.Scheme
91 localConfig *rest.Config
92 clients *lru.Cache[string, pkgclient.Client]
93 authClients *lru.Cache[string, *authorizationv1.AuthorizationV1Client]
94 }
95
96
97 func NewClientPool(localConfig *rest.Config, scheme *runtime.Scheme, maxClientNum int) (Clients, error) {
98 clients, err := lru.New[string, pkgclient.Client](maxClientNum)
99 if err != nil {
100 return nil, err
101 }
102
103 authClients, err := lru.New[string, *authorizationv1.AuthorizationV1Client](maxClientNum)
104 if err != nil {
105 return nil, err
106 }
107
108 return &ClientsPool{
109 localConfig: localConfig,
110 scheme: scheme,
111 clients: clients,
112 authClients: authClients,
113 }, nil
114 }
115
116
117 func (c *ClientsPool) Client(token string) (pkgclient.Client, error) {
118 c.Lock()
119 defer c.Unlock()
120
121 if len(token) == 0 {
122 return nil, errors.New("token is empty")
123 }
124
125 value, ok := c.clients.Get(token)
126 if ok {
127 return value, nil
128 }
129
130 config := rest.CopyConfig(c.localConfig)
131 config.BearerToken = token
132 config.BearerTokenFile = ""
133
134 newFunc := pkgclient.New
135
136 if mockNew := mock.On("MockCreateK8sClient"); mockNew != nil {
137 newFunc = mockNew.(func(config *rest.Config, options pkgclient.Options) (pkgclient.Client, error))
138 }
139
140 client, err := newFunc(config, pkgclient.Options{
141 Scheme: c.scheme,
142 })
143 if err != nil {
144 return nil, err
145 }
146
147 _ = c.clients.Add(token, client)
148
149 return client, nil
150 }
151
152 func (c *ClientsPool) AuthClient(token string) (authorizationv1.AuthorizationV1Interface, error) {
153 c.Lock()
154 defer c.Unlock()
155
156 if len(token) == 0 {
157 return nil, errors.New("token is empty")
158 }
159
160 value, ok := c.authClients.Get(token)
161 if ok {
162 return value, nil
163 }
164
165 config := rest.CopyConfig(c.localConfig)
166 config.BearerToken = token
167 config.BearerTokenFile = ""
168
169 authCli, err := authorizationv1.NewForConfig(config)
170 if err != nil {
171 return nil, err
172 }
173
174 _ = c.authClients.Add(token, authCli)
175
176 return authCli, nil
177 }
178
179
180 func (c *ClientsPool) Num() int {
181 return c.clients.Len()
182 }
183
184
185 func (c *ClientsPool) Contains(token string) bool {
186 c.RLock()
187 defer c.RUnlock()
188
189 _, ok := c.clients.Get(token)
190 return ok
191 }
192
193
194 func ExtractTokenFromHeader(header http.Header) string {
195 auth := header.Get("Authorization")
196 if strings.HasPrefix(auth, "Bearer ") {
197 return strings.TrimPrefix(auth, "Bearer ")
198 }
199
200 return ""
201 }
202
203
204 func ExtractTokenAndGetClient(header http.Header) (pkgclient.Client, error) {
205 token := ExtractTokenFromHeader(header)
206 return K8sClients.Client(token)
207 }
208
209
210 func ExtractTokenAndGetAuthClient(header http.Header) (authorizationv1.AuthorizationV1Interface, error) {
211 token := ExtractTokenFromHeader(header)
212 return K8sClients.AuthClient(token)
213 }
214