...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/portforward/portforward.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/portforward

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package portforward
    17  
    18  import (
    19  	"bufio"
    20  	"context"
    21  	"fmt"
    22  	"io"
    23  	"net/http"
    24  	"net/url"
    25  	"time"
    26  
    27  	"github.com/go-logr/logr"
    28  	"github.com/pkg/errors"
    29  	corev1 "k8s.io/api/core/v1"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/cli-runtime/pkg/genericclioptions"
    32  	"k8s.io/cli-runtime/pkg/resource"
    33  	"k8s.io/client-go/kubernetes"
    34  	"k8s.io/client-go/kubernetes/scheme"
    35  	"k8s.io/client-go/rest"
    36  	"k8s.io/client-go/tools/portforward"
    37  	"k8s.io/client-go/transport/spdy"
    38  	"k8s.io/kubectl/pkg/polymorphichelpers"
    39  )
    40  
    41  const (
    42  	getPodTimeout = time.Minute
    43  )
    44  
    45  // PortForward represents an interface which can forward local ports to a pod.
    46  type PortForward interface {
    47  	Forward(namespace, resourceName string, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
    48  	ForwardPod(pod *corev1.Pod, addresses []string, ports []string) ([]portforward.ForwardedPort, context.CancelFunc, error)
    49  }
    50  
    51  // portForwarder implements PortForward interface
    52  type portForwarder struct {
    53  	genericclioptions.RESTClientGetter
    54  	ctx       context.Context
    55  	config    *rest.Config
    56  	client    kubernetes.Interface
    57  	enableLog bool
    58  	logger    logr.Logger
    59  }
    60  
    61  var _ PortForward = &portForwarder{}
    62  
    63  func (f *portForwarder) forwardPorts(podKey, method string, url *url.URL, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
    64  	transport, upgrader, err := spdy.RoundTripperFor(f.config)
    65  	if err != nil {
    66  		return nil, nil, err
    67  	}
    68  	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
    69  	r, w := io.Pipe()
    70  	ctx, cancel := context.WithCancel(f.ctx)
    71  	readyChan := make(chan struct{})
    72  	fw, err := portforward.NewOnAddresses(dialer, addresses, ports, ctx.Done(), readyChan, w, w)
    73  	if err != nil {
    74  		cancel()
    75  		return nil, nil, err
    76  	}
    77  
    78  	// logging stdout/stderr of port forwarding
    79  	go func() {
    80  		// close pipe if the context is done
    81  		<-ctx.Done()
    82  		w.Close()
    83  	}()
    84  
    85  	go func() {
    86  		lineScanner := bufio.NewScanner(r)
    87  		for lineScanner.Scan() {
    88  			if f.enableLog {
    89  				f.logger.Info(fmt.Sprintf("log from port forwarding %q: %s", podKey, lineScanner.Text()))
    90  			}
    91  		}
    92  	}()
    93  
    94  	// run port forwarding
    95  	errChan := make(chan error, 1)
    96  	go func() {
    97  		errChan <- fw.ForwardPorts()
    98  	}()
    99  
   100  	// wait for ready or error
   101  	select {
   102  	case <-readyChan:
   103  		break
   104  	case err := <-errChan:
   105  		cancel()
   106  		return nil, nil, err
   107  	}
   108  
   109  	forwardedPorts, err = fw.GetPorts()
   110  	if err != nil {
   111  		cancel()
   112  		return nil, nil, err
   113  	}
   114  
   115  	return forwardedPorts, cancel, nil
   116  }
   117  
   118  // Forward would port-forward to target resources
   119  func (f *portForwarder) Forward(namespace, resourceName string, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
   120  	builder := resource.NewBuilder(f).
   121  		WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
   122  		ContinueOnError().
   123  		NamespaceParam(namespace).DefaultNamespace()
   124  
   125  	builder.ResourceNames("pods", resourceName)
   126  
   127  	obj, err := builder.Do().Object()
   128  	if err != nil {
   129  		return nil, nil, err
   130  	}
   131  
   132  	forwardablePod, err := polymorphichelpers.AttachablePodForObjectFn(f, obj, getPodTimeout)
   133  	if err != nil {
   134  		return nil, nil, err
   135  	}
   136  
   137  	// FIXME: get context from parameter
   138  	pod, err := f.client.CoreV1().Pods(namespace).Get(context.TODO(), forwardablePod.Name, metav1.GetOptions{})
   139  	if err != nil {
   140  		return nil, nil, err
   141  	}
   142  	return f.ForwardPod(pod, addresses, ports)
   143  }
   144  
   145  // ForwardPod would port-forward target Pod
   146  func (f *portForwarder) ForwardPod(pod *corev1.Pod, addresses []string, ports []string) (forwardedPorts []portforward.ForwardedPort, cancel context.CancelFunc, err error) {
   147  	if pod.Status.Phase != corev1.PodRunning {
   148  		return nil, nil, errors.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
   149  	}
   150  
   151  	req := f.client.CoreV1().RESTClient().Post().
   152  		Resource("pods").
   153  		Namespace(pod.Namespace).
   154  		Name(pod.Name).
   155  		SubResource("portforward")
   156  
   157  	return f.forwardPorts(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "POST", req.URL(), addresses, ports)
   158  }
   159  
   160  // NewPortForwarder would create a new port-forward
   161  func NewPortForwarder(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, enableLog bool, logger logr.Logger) (PortForward, error) {
   162  	config, err := restClientGetter.ToRESTConfig()
   163  	if err != nil {
   164  		return nil, err
   165  	}
   166  	client, err := kubernetes.NewForConfig(config)
   167  	if err != nil {
   168  		return nil, err
   169  	}
   170  	f := &portForwarder{
   171  		RESTClientGetter: restClientGetter,
   172  		ctx:              ctx,
   173  		config:           config,
   174  		client:           client,
   175  		enableLog:        enableLog,
   176  		logger:           logger,
   177  	}
   178  	return f, nil
   179  }
   180  
   181  // ForwardOnePort help to utility to forward one port of Kubernetes resource.
   182  func ForwardOnePort(fw PortForward, ns, resource string, port uint16) (string, uint16, context.CancelFunc, error) {
   183  	ports := []string{fmt.Sprintf("0:%d", port)}
   184  	forwardedPorts, cancel, err := fw.Forward(ns, resource, []string{"127.0.0.1"}, ports)
   185  	if err != nil {
   186  		return "", 0, nil, err
   187  	}
   188  	var localPort uint16
   189  	var found bool
   190  	for _, p := range forwardedPorts {
   191  		if p.Remote == port {
   192  			localPort = p.Local
   193  			found = true
   194  		}
   195  	}
   196  	if !found {
   197  		cancel()
   198  		return "", 0, nil, errors.New("unexpected error")
   199  	}
   200  	return "127.0.0.1", localPort, cancel, nil
   201  }
   202