1
2
3
4
5
6
7
8
9
10
11
12
13
14 package portforward
15
16 import (
17 "bufio"
18 "context"
19 "errors"
20 "fmt"
21 "io"
22 "net/http"
23 "net/url"
24 "time"
25
26 corev1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/cli-runtime/pkg/genericclioptions"
29 "k8s.io/cli-runtime/pkg/resource"
30 "k8s.io/client-go/kubernetes"
31 "k8s.io/client-go/kubernetes/scheme"
32 "k8s.io/client-go/rest"
33 "k8s.io/client-go/tools/portforward"
34 "k8s.io/client-go/transport/spdy"
35 "k8s.io/klog"
36 "k8s.io/kubectl/pkg/polymorphichelpers"
37 )
38
39 const (
40 getPodTimeout = time.Minute
41 )
42
43
44 type PortForward interface {
45 Forward(namespace, resourceName string, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
46 ForwardPod(pod *corev1.Pod, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
47 }
48
49
50 type portForwarder struct {
51 genericclioptions.RESTClientGetter
52 ctx context.Context
53 config *rest.Config
54 client kubernetes.Interface
55 enableLog bool
56 }
57
58 var _ PortForward = &portForwarder{}
59
60 func (f *portForwarder) forwardPorts(podKey, method string, url *url.URL, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
61 transport, upgrader, err := spdy.RoundTripperFor(f.config)
62 if err != nil {
63 return nil, nil, err
64 }
65 dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
66 r, w := io.Pipe()
67 ctx, cancel := context.WithCancel(f.ctx)
68 readyChan := make(chan struct{})
69 fw, err := portforward.NewOnAddresses(dialer, addresses, ports, ctx.Done(), readyChan, w, w)
70 if err != nil {
71 return nil, nil, err
72 }
73
74
75 go func() {
76
77 <-ctx.Done()
78 w.Close()
79 }()
80
81 go func() {
82 lineScanner := bufio.NewScanner(r)
83 for lineScanner.Scan() {
84 if f.enableLog {
85 klog.Infof("log from port forwarding %q: %s", podKey, lineScanner.Text())
86 }
87 }
88 }()
89
90
91 errChan := make(chan error)
92 go func() {
93 errChan <- fw.ForwardPorts()
94 }()
95
96
97 select {
98 case <-readyChan:
99 break
100 case err := <-errChan:
101 cancel()
102 return nil, nil, err
103 }
104
105 forwardedPorts, err = fw.GetPorts()
106 if err != nil {
107 cancel()
108 return nil, nil, err
109 }
110
111 return forwardedPorts, cancel, nil
112 }
113
114
115 func (f *portForwarder) Forward(namespace, resourceName string, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
116 builder := resource.NewBuilder(f).
117 WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
118 ContinueOnError().
119 NamespaceParam(namespace).DefaultNamespace()
120
121 builder.ResourceNames("pods", resourceName)
122
123 obj, err := builder.Do().Object()
124 if err != nil {
125 return nil, nil, err
126 }
127
128 forwardablePod, err := polymorphichelpers.AttachablePodForObjectFn(f, obj, getPodTimeout)
129 if err != nil {
130 return nil, nil, err
131 }
132
133 pod, err := f.client.CoreV1().Pods(namespace).Get(forwardablePod.Name, metav1.GetOptions{})
134 if err != nil {
135 return nil, nil, err
136 }
137 return f.ForwardPod(pod, addresses, ports)
138 }
139
140
141 func (f *portForwarder) ForwardPod(pod *corev1.Pod, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
142 if pod.Status.Phase != corev1.PodRunning {
143 return nil, nil, fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
144 }
145
146 req := f.client.CoreV1().RESTClient().Post().
147 Resource("pods").
148 Namespace(pod.Namespace).
149 Name(pod.Name).
150 SubResource("portforward")
151
152 return f.forwardPorts(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "POST", req.URL(), addresses, ports)
153 }
154
155
156 func NewPortForwarder(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, enableLog bool) (PortForward, error) {
157 config, err := restClientGetter.ToRESTConfig()
158 if err != nil {
159 return nil, err
160 }
161 client, err := kubernetes.NewForConfig(config)
162 if err != nil {
163 return nil, err
164 }
165 f := &portForwarder{
166 RESTClientGetter: restClientGetter,
167 ctx: ctx,
168 config: config,
169 client: client,
170 enableLog: enableLog,
171 }
172 return f, nil
173 }
174
175
176 func ForwardOnePort(fw PortForward, ns, resource string, port uint16) (string, uint16, context.CancelFunc, error) {
177 ports := []string{fmt.Sprintf("0:%d", port)}
178 forwardedPorts, cancel, err := fw.Forward(ns, resource, []string{"127.0.0.1"}, ports)
179 if err != nil {
180 return "", 0, nil, err
181 }
182 var localPort uint16
183 var found bool
184 for _, p := range forwardedPorts {
185 if p.Remote == port {
186 localPort = p.Local
187 found = true
188 }
189 }
190 if !found {
191 cancel()
192 return "", 0, nil, errors.New("unexpected error")
193 }
194 return "127.0.0.1", localPort, cancel, nil
195 }
196