...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/portforward/portforward.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/portforward

     1  // Copyright 2020 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package portforward
    15  
    16  import (
    17  	"bufio"
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"io"
    22  	"net/http"
    23  	"net/url"
    24  	"time"
    25  
    26  	corev1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/cli-runtime/pkg/genericclioptions"
    29  	"k8s.io/cli-runtime/pkg/resource"
    30  	"k8s.io/client-go/kubernetes"
    31  	"k8s.io/client-go/kubernetes/scheme"
    32  	"k8s.io/client-go/rest"
    33  	"k8s.io/client-go/tools/portforward"
    34  	"k8s.io/client-go/transport/spdy"
    35  	"k8s.io/klog"
    36  	"k8s.io/kubectl/pkg/polymorphichelpers"
    37  )
    38  
    39  const (
    40  	getPodTimeout = time.Minute
    41  )
    42  
    43  // PortForward represents an interface which can forward local ports to a pod.
    44  type PortForward interface {
    45  	Forward(namespace, resourceName string, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
    46  	ForwardPod(pod *corev1.Pod, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
    47  }
    48  
    49  // portForwarder implements PortForward interface
    50  type portForwarder struct {
    51  	genericclioptions.RESTClientGetter
    52  	ctx       context.Context
    53  	config    *rest.Config
    54  	client    kubernetes.Interface
    55  	enableLog bool
    56  }
    57  
    58  var _ PortForward = &portForwarder{}
    59  
    60  func (f *portForwarder) forwardPorts(podKey, method string, url *url.URL, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
    61  	transport, upgrader, err := spdy.RoundTripperFor(f.config)
    62  	if err != nil {
    63  		return nil, nil, err
    64  	}
    65  	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
    66  	r, w := io.Pipe()
    67  	ctx, cancel := context.WithCancel(f.ctx)
    68  	readyChan := make(chan struct{})
    69  	fw, err := portforward.NewOnAddresses(dialer, addresses, ports, ctx.Done(), readyChan, w, w)
    70  	if err != nil {
    71  		return nil, nil, err
    72  	}
    73  
    74  	// logging stdout/stderr of port forwarding
    75  	go func() {
    76  		// close pipe if the context is done
    77  		<-ctx.Done()
    78  		w.Close()
    79  	}()
    80  
    81  	go func() {
    82  		lineScanner := bufio.NewScanner(r)
    83  		for lineScanner.Scan() {
    84  			if f.enableLog {
    85  				klog.Infof("log from port forwarding %q: %s", podKey, lineScanner.Text())
    86  			}
    87  		}
    88  	}()
    89  
    90  	// run port forwarding
    91  	errChan := make(chan error)
    92  	go func() {
    93  		errChan <- fw.ForwardPorts()
    94  	}()
    95  
    96  	// wait for ready or error
    97  	select {
    98  	case <-readyChan:
    99  		break
   100  	case err := <-errChan:
   101  		cancel()
   102  		return nil, nil, err
   103  	}
   104  
   105  	forwardedPorts, err = fw.GetPorts()
   106  	if err != nil {
   107  		cancel()
   108  		return nil, nil, err
   109  	}
   110  
   111  	return forwardedPorts, cancel, nil
   112  }
   113  
   114  // Forward would port-forward to target resources
   115  func (f *portForwarder) Forward(namespace, resourceName string, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
   116  	builder := resource.NewBuilder(f).
   117  		WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
   118  		ContinueOnError().
   119  		NamespaceParam(namespace).DefaultNamespace()
   120  
   121  	builder.ResourceNames("pods", resourceName)
   122  
   123  	obj, err := builder.Do().Object()
   124  	if err != nil {
   125  		return nil, nil, err
   126  	}
   127  
   128  	forwardablePod, err := polymorphichelpers.AttachablePodForObjectFn(f, obj, getPodTimeout)
   129  	if err != nil {
   130  		return nil, nil, err
   131  	}
   132  
   133  	pod, err := f.client.CoreV1().Pods(namespace).Get(forwardablePod.Name, metav1.GetOptions{})
   134  	if err != nil {
   135  		return nil, nil, err
   136  	}
   137  	return f.ForwardPod(pod, addresses, ports)
   138  }
   139  
   140  // ForwardPod would port-forward target Pod
   141  func (f *portForwarder) ForwardPod(pod *corev1.Pod, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
   142  	if pod.Status.Phase != corev1.PodRunning {
   143  		return nil, nil, fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
   144  	}
   145  
   146  	req := f.client.CoreV1().RESTClient().Post().
   147  		Resource("pods").
   148  		Namespace(pod.Namespace).
   149  		Name(pod.Name).
   150  		SubResource("portforward")
   151  
   152  	return f.forwardPorts(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "POST", req.URL(), addresses, ports)
   153  }
   154  
   155  // NewPortForwarder would create a new port-forward
   156  func NewPortForwarder(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, enableLog bool) (PortForward, error) {
   157  	config, err := restClientGetter.ToRESTConfig()
   158  	if err != nil {
   159  		return nil, err
   160  	}
   161  	client, err := kubernetes.NewForConfig(config)
   162  	if err != nil {
   163  		return nil, err
   164  	}
   165  	f := &portForwarder{
   166  		RESTClientGetter: restClientGetter,
   167  		ctx:              ctx,
   168  		config:           config,
   169  		client:           client,
   170  		enableLog:        enableLog,
   171  	}
   172  	return f, nil
   173  }
   174  
   175  // ForwardOnePort help to utility to forward one port of Kubernetes resource.
   176  func ForwardOnePort(fw PortForward, ns, resource string, port uint16) (string, uint16, context.CancelFunc, error) {
   177  	ports := []string{fmt.Sprintf("0:%d", port)}
   178  	forwardedPorts, cancel, err := fw.Forward(ns, resource, []string{"127.0.0.1"}, ports)
   179  	if err != nil {
   180  		return "", 0, nil, err
   181  	}
   182  	var localPort uint16
   183  	var found bool
   184  	for _, p := range forwardedPorts {
   185  		if p.Remote == port {
   186  			localPort = p.Local
   187  			found = true
   188  		}
   189  	}
   190  	if !found {
   191  		cancel()
   192  		return "", 0, nil, errors.New("unexpected error")
   193  	}
   194  	return "127.0.0.1", localPort, cancel, nil
   195  }
   196