1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package portforward
17
18 import (
19 "bufio"
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "net/http"
25 "net/url"
26 "time"
27
28 corev1 "k8s.io/api/core/v1"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/cli-runtime/pkg/genericclioptions"
31 "k8s.io/cli-runtime/pkg/resource"
32 "k8s.io/client-go/kubernetes"
33 "k8s.io/client-go/kubernetes/scheme"
34 "k8s.io/client-go/rest"
35 "k8s.io/client-go/tools/portforward"
36 "k8s.io/client-go/transport/spdy"
37 "k8s.io/klog/v2"
38 "k8s.io/kubectl/pkg/polymorphichelpers"
39 )
40
41 const (
42 getPodTimeout = time.Minute
43 )
44
45
46 type PortForward interface {
47 Forward(namespace, resourceName string, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
48 ForwardPod(pod *corev1.Pod, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
49 }
50
51
52 type portForwarder struct {
53 genericclioptions.RESTClientGetter
54 ctx context.Context
55 config *rest.Config
56 client kubernetes.Interface
57 enableLog bool
58 }
59
60 var _ PortForward = &portForwarder{}
61
62 func (f *portForwarder) forwardPorts(podKey, method string, url *url.URL, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
63 transport, upgrader, err := spdy.RoundTripperFor(f.config)
64 if err != nil {
65 return nil, nil, err
66 }
67 dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
68 r, w := io.Pipe()
69 ctx, cancel := context.WithCancel(f.ctx)
70 readyChan := make(chan struct{})
71 fw, err := portforward.NewOnAddresses(dialer, addresses, ports, ctx.Done(), readyChan, w, w)
72 if err != nil {
73 return nil, nil, err
74 }
75
76
77 go func() {
78
79 <-ctx.Done()
80 w.Close()
81 }()
82
83 go func() {
84 lineScanner := bufio.NewScanner(r)
85 for lineScanner.Scan() {
86 if f.enableLog {
87 klog.Infof("log from port forwarding %q: %s", podKey, lineScanner.Text())
88 }
89 }
90 }()
91
92
93 errChan := make(chan error)
94 go func() {
95 errChan <- fw.ForwardPorts()
96 }()
97
98
99 select {
100 case <-readyChan:
101 break
102 case err := <-errChan:
103 cancel()
104 return nil, nil, err
105 }
106
107 forwardedPorts, err = fw.GetPorts()
108 if err != nil {
109 cancel()
110 return nil, nil, err
111 }
112
113 return forwardedPorts, cancel, nil
114 }
115
116
117 func (f *portForwarder) Forward(namespace, resourceName string, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
118 builder := resource.NewBuilder(f).
119 WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
120 ContinueOnError().
121 NamespaceParam(namespace).DefaultNamespace()
122
123 builder.ResourceNames("pods", resourceName)
124
125 obj, err := builder.Do().Object()
126 if err != nil {
127 return nil, nil, err
128 }
129
130 forwardablePod, err := polymorphichelpers.AttachablePodForObjectFn(f, obj, getPodTimeout)
131 if err != nil {
132 return nil, nil, err
133 }
134
135
136 pod, err := f.client.CoreV1().Pods(namespace).Get(context.TODO(), forwardablePod.Name, metav1.GetOptions{})
137 if err != nil {
138 return nil, nil, err
139 }
140 return f.ForwardPod(pod, addresses, ports)
141 }
142
143
144 func (f *portForwarder) ForwardPod(pod *corev1.Pod, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
145 if pod.Status.Phase != corev1.PodRunning {
146 return nil, nil, fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
147 }
148
149 req := f.client.CoreV1().RESTClient().Post().
150 Resource("pods").
151 Namespace(pod.Namespace).
152 Name(pod.Name).
153 SubResource("portforward")
154
155 return f.forwardPorts(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "POST", req.URL(), addresses, ports)
156 }
157
158
159 func NewPortForwarder(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, enableLog bool) (PortForward, error) {
160 config, err := restClientGetter.ToRESTConfig()
161 if err != nil {
162 return nil, err
163 }
164 client, err := kubernetes.NewForConfig(config)
165 if err != nil {
166 return nil, err
167 }
168 f := &portForwarder{
169 RESTClientGetter: restClientGetter,
170 ctx: ctx,
171 config: config,
172 client: client,
173 enableLog: enableLog,
174 }
175 return f, nil
176 }
177
178
179 func ForwardOnePort(fw PortForward, ns, resource string, port uint16) (string, uint16, context.CancelFunc, error) {
180 ports := []string{fmt.Sprintf("0:%d", port)}
181 forwardedPorts, cancel, err := fw.Forward(ns, resource, []string{"127.0.0.1"}, ports)
182 if err != nil {
183 return "", 0, nil, err
184 }
185 var localPort uint16
186 var found bool
187 for _, p := range forwardedPorts {
188 if p.Remote == port {
189 localPort = p.Local
190 found = true
191 }
192 }
193 if !found {
194 cancel()
195 return "", 0, nil, errors.New("unexpected error")
196 }
197 return "127.0.0.1", localPort, cancel, nil
198 }
199