...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/portforward/portforward.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/portforward

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package portforward
    17  
    18  import (
    19  	"bufio"
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io"
    24  	"net/http"
    25  	"net/url"
    26  	"time"
    27  
    28  	corev1 "k8s.io/api/core/v1"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/cli-runtime/pkg/genericclioptions"
    31  	"k8s.io/cli-runtime/pkg/resource"
    32  	"k8s.io/client-go/kubernetes"
    33  	"k8s.io/client-go/kubernetes/scheme"
    34  	"k8s.io/client-go/rest"
    35  	"k8s.io/client-go/tools/portforward"
    36  	"k8s.io/client-go/transport/spdy"
    37  	"k8s.io/klog/v2"
    38  	"k8s.io/kubectl/pkg/polymorphichelpers"
    39  )
    40  
    41  const (
    42  	getPodTimeout = time.Minute
    43  )
    44  
    45  // PortForward represents an interface which can forward local ports to a pod.
    46  type PortForward interface {
    47  	Forward(namespace, resourceName string, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
    48  	ForwardPod(pod *corev1.Pod, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
    49  }
    50  
    51  // portForwarder implements PortForward interface
    52  type portForwarder struct {
    53  	genericclioptions.RESTClientGetter
    54  	ctx       context.Context
    55  	config    *rest.Config
    56  	client    kubernetes.Interface
    57  	enableLog bool
    58  }
    59  
    60  var _ PortForward = &portForwarder{}
    61  
    62  func (f *portForwarder) forwardPorts(podKey, method string, url *url.URL, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
    63  	transport, upgrader, err := spdy.RoundTripperFor(f.config)
    64  	if err != nil {
    65  		return nil, nil, err
    66  	}
    67  	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
    68  	r, w := io.Pipe()
    69  	ctx, cancel := context.WithCancel(f.ctx)
    70  	readyChan := make(chan struct{})
    71  	fw, err := portforward.NewOnAddresses(dialer, addresses, ports, ctx.Done(), readyChan, w, w)
    72  	if err != nil {
    73  		return nil, nil, err
    74  	}
    75  
    76  	// logging stdout/stderr of port forwarding
    77  	go func() {
    78  		// close pipe if the context is done
    79  		<-ctx.Done()
    80  		w.Close()
    81  	}()
    82  
    83  	go func() {
    84  		lineScanner := bufio.NewScanner(r)
    85  		for lineScanner.Scan() {
    86  			if f.enableLog {
    87  				klog.Infof("log from port forwarding %q: %s", podKey, lineScanner.Text())
    88  			}
    89  		}
    90  	}()
    91  
    92  	// run port forwarding
    93  	errChan := make(chan error)
    94  	go func() {
    95  		errChan <- fw.ForwardPorts()
    96  	}()
    97  
    98  	// wait for ready or error
    99  	select {
   100  	case <-readyChan:
   101  		break
   102  	case err := <-errChan:
   103  		cancel()
   104  		return nil, nil, err
   105  	}
   106  
   107  	forwardedPorts, err = fw.GetPorts()
   108  	if err != nil {
   109  		cancel()
   110  		return nil, nil, err
   111  	}
   112  
   113  	return forwardedPorts, cancel, nil
   114  }
   115  
   116  // Forward would port-forward to target resources
   117  func (f *portForwarder) Forward(namespace, resourceName string, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
   118  	builder := resource.NewBuilder(f).
   119  		WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
   120  		ContinueOnError().
   121  		NamespaceParam(namespace).DefaultNamespace()
   122  
   123  	builder.ResourceNames("pods", resourceName)
   124  
   125  	obj, err := builder.Do().Object()
   126  	if err != nil {
   127  		return nil, nil, err
   128  	}
   129  
   130  	forwardablePod, err := polymorphichelpers.AttachablePodForObjectFn(f, obj, getPodTimeout)
   131  	if err != nil {
   132  		return nil, nil, err
   133  	}
   134  
   135  	// FIXME: get context from parameter
   136  	pod, err := f.client.CoreV1().Pods(namespace).Get(context.TODO(), forwardablePod.Name, metav1.GetOptions{})
   137  	if err != nil {
   138  		return nil, nil, err
   139  	}
   140  	return f.ForwardPod(pod, addresses, ports)
   141  }
   142  
   143  // ForwardPod would port-forward target Pod
   144  func (f *portForwarder) ForwardPod(pod *corev1.Pod, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
   145  	if pod.Status.Phase != corev1.PodRunning {
   146  		return nil, nil, fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
   147  	}
   148  
   149  	req := f.client.CoreV1().RESTClient().Post().
   150  		Resource("pods").
   151  		Namespace(pod.Namespace).
   152  		Name(pod.Name).
   153  		SubResource("portforward")
   154  
   155  	return f.forwardPorts(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "POST", req.URL(), addresses, ports)
   156  }
   157  
   158  // NewPortForwarder would create a new port-forward
   159  func NewPortForwarder(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, enableLog bool) (PortForward, error) {
   160  	config, err := restClientGetter.ToRESTConfig()
   161  	if err != nil {
   162  		return nil, err
   163  	}
   164  	client, err := kubernetes.NewForConfig(config)
   165  	if err != nil {
   166  		return nil, err
   167  	}
   168  	f := &portForwarder{
   169  		RESTClientGetter: restClientGetter,
   170  		ctx:              ctx,
   171  		config:           config,
   172  		client:           client,
   173  		enableLog:        enableLog,
   174  	}
   175  	return f, nil
   176  }
   177  
   178  // ForwardOnePort help to utility to forward one port of Kubernetes resource.
   179  func ForwardOnePort(fw PortForward, ns, resource string, port uint16) (string, uint16, context.CancelFunc, error) {
   180  	ports := []string{fmt.Sprintf("0:%d", port)}
   181  	forwardedPorts, cancel, err := fw.Forward(ns, resource, []string{"127.0.0.1"}, ports)
   182  	if err != nil {
   183  		return "", 0, nil, err
   184  	}
   185  	var localPort uint16
   186  	var found bool
   187  	for _, p := range forwardedPorts {
   188  		if p.Remote == port {
   189  			localPort = p.Local
   190  			found = true
   191  		}
   192  	}
   193  	if !found {
   194  		cancel()
   195  		return "", 0, nil, errors.New("unexpected error")
   196  	}
   197  	return "127.0.0.1", localPort, cancel, nil
   198  }
   199