1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package portforward
17
18 import (
19 "bufio"
20 "context"
21 "fmt"
22 "io"
23 "net/http"
24 "net/url"
25 "time"
26
27 "github.com/go-logr/logr"
28 "github.com/pkg/errors"
29 corev1 "k8s.io/api/core/v1"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/cli-runtime/pkg/genericclioptions"
32 "k8s.io/cli-runtime/pkg/resource"
33 "k8s.io/client-go/kubernetes"
34 "k8s.io/client-go/kubernetes/scheme"
35 "k8s.io/client-go/rest"
36 "k8s.io/client-go/tools/portforward"
37 "k8s.io/client-go/transport/spdy"
38 "k8s.io/kubectl/pkg/polymorphichelpers"
39 )
40
41 const (
42 getPodTimeout = time.Minute
43 )
44
45
46 type PortForward interface {
47 Forward(namespace, resourceName string, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
48 ForwardPod(pod *corev1.Pod, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
49 }
50
51
52 type portForwarder struct {
53 genericclioptions.RESTClientGetter
54 ctx context.Context
55 config *rest.Config
56 client kubernetes.Interface
57 enableLog bool
58 logger logr.Logger
59 }
60
61 var _ PortForward = &portForwarder{}
62
63 func (f *portForwarder) forwardPorts(podKey, method string, url *url.URL, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
64 transport, upgrader, err := spdy.RoundTripperFor(f.config)
65 if err != nil {
66 return nil, nil, err
67 }
68 dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
69 r, w := io.Pipe()
70 ctx, cancel := context.WithCancel(f.ctx)
71 readyChan := make(chan struct{})
72 fw, err := portforward.NewOnAddresses(dialer, addresses, ports, ctx.Done(), readyChan, w, w)
73 if err != nil {
74 cancel()
75 return nil, nil, err
76 }
77
78
79 go func() {
80
81 <-ctx.Done()
82 w.Close()
83 }()
84
85 go func() {
86 lineScanner := bufio.NewScanner(r)
87 for lineScanner.Scan() {
88 if f.enableLog {
89 f.logger.Info(fmt.Sprintf("log from port forwarding %q: %s", podKey, lineScanner.Text()))
90 }
91 }
92 }()
93
94
95 errChan := make(chan error, 1)
96 go func() {
97 errChan <- fw.ForwardPorts()
98 }()
99
100
101 select {
102 case <-readyChan:
103 break
104 case err := <-errChan:
105 cancel()
106 return nil, nil, err
107 }
108
109 forwardedPorts, err = fw.GetPorts()
110 if err != nil {
111 cancel()
112 return nil, nil, err
113 }
114
115 return forwardedPorts, cancel, nil
116 }
117
118
119 func (f *portForwarder) Forward(namespace, resourceName string, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
120 builder := resource.NewBuilder(f).
121 WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
122 ContinueOnError().
123 NamespaceParam(namespace).DefaultNamespace()
124
125 builder.ResourceNames("pods", resourceName)
126
127 obj, err := builder.Do().Object()
128 if err != nil {
129 return nil, nil, err
130 }
131
132 forwardablePod, err := polymorphichelpers.AttachablePodForObjectFn(f, obj, getPodTimeout)
133 if err != nil {
134 return nil, nil, err
135 }
136
137
138 pod, err := f.client.CoreV1().Pods(namespace).Get(context.TODO(), forwardablePod.Name, metav1.GetOptions{})
139 if err != nil {
140 return nil, nil, err
141 }
142 return f.ForwardPod(pod, addresses, ports)
143 }
144
145
146 func (f *portForwarder) ForwardPod(pod *corev1.Pod, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
147 if pod.Status.Phase != corev1.PodRunning {
148 return nil, nil, errors.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
149 }
150
151 req := f.client.CoreV1().RESTClient().Post().
152 Resource("pods").
153 Namespace(pod.Namespace).
154 Name(pod.Name).
155 SubResource("portforward")
156
157 return f.forwardPorts(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "POST", req.URL(), addresses, ports)
158 }
159
160
161 func NewPortForwarder(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, enableLog bool, logger logr.Logger) (PortForward, error) {
162 config, err := restClientGetter.ToRESTConfig()
163 if err != nil {
164 return nil, err
165 }
166 client, err := kubernetes.NewForConfig(config)
167 if err != nil {
168 return nil, err
169 }
170 f := &portForwarder{
171 RESTClientGetter: restClientGetter,
172 ctx: ctx,
173 config: config,
174 client: client,
175 enableLog: enableLog,
176 logger: logger,
177 }
178 return f, nil
179 }
180
181
182 func ForwardOnePort(fw PortForward, ns, resource string, port uint16) (string, uint16, context.CancelFunc, error) {
183 ports := []string{fmt.Sprintf("0:%d", port)}
184 forwardedPorts, cancel, err := fw.Forward(ns, resource, []string{"127.0.0.1"}, ports)
185 if err != nil {
186 return "", 0, nil, err
187 }
188 var localPort uint16
189 var found bool
190 for _, p := range forwardedPorts {
191 if p.Remote == port {
192 localPort = p.Local
193 found = true
194 }
195 }
196 if !found {
197 cancel()
198 return "", 0, nil, errors.New("unexpected error")
199 }
200 return "127.0.0.1", localPort, cancel, nil
201 }
202