...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/chaosctl/common/common.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/chaosctl/common

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package common
    17  
    18  import (
    19  	"bytes"
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"io"
    24  	"regexp"
    25  	"strings"
    26  
    27  	"github.com/fatih/color"
    28  	"github.com/pkg/errors"
    29  	"google.golang.org/grpc"
    30  	v1 "k8s.io/api/core/v1"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/runtime"
    33  	"k8s.io/client-go/kubernetes"
    34  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    35  	"sigs.k8s.io/controller-runtime/pkg/client"
    36  	"sigs.k8s.io/controller-runtime/pkg/client/config"
    37  
    38  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    39  	ctrlconfig "github.com/chaos-mesh/chaos-mesh/controllers/config"
    40  	daemonClient "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/client"
    41  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    42  	grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
    43  	"github.com/chaos-mesh/chaos-mesh/pkg/mock"
    44  	"github.com/chaos-mesh/chaos-mesh/pkg/portforward"
    45  	"github.com/chaos-mesh/chaos-mesh/pkg/selector/pod"
    46  )
    47  
    48  type Color string
    49  
    50  const (
    51  	Blue    Color = "Blue"
    52  	Red     Color = "Red"
    53  	Green   Color = "Green"
    54  	Cyan    Color = "Cyan"
    55  	NoColor Color = ""
    56  )
    57  
    58  var (
    59  	colorFunc = map[Color]func(string, ...interface{}){
    60  		Blue:  color.Blue,
    61  		Red:   color.Red,
    62  		Green: color.Green,
    63  		Cyan:  color.Cyan,
    64  	}
    65  	scheme = runtime.NewScheme()
    66  )
    67  
    68  // ClientSet contains two different clients
    69  type ClientSet struct {
    70  	CtrlCli client.Client
    71  	KubeCli *kubernetes.Clientset
    72  }
    73  
    74  type ChaosResult struct {
    75  	Name string
    76  	Pods []PodResult
    77  }
    78  
    79  type PodResult struct {
    80  	Name  string
    81  	Items []ItemResult
    82  }
    83  
    84  const (
    85  	ItemSuccess = iota + 1
    86  	ItemFailure
    87  )
    88  
    89  const ChaosDaemonClientCert = "chaos-mesh-daemon-client-certs"
    90  const ChaosDaemonNamespace = "chaos-testing"
    91  
    92  var TLSFiles grpcUtils.TLSFile
    93  var Insecure bool
    94  
    95  type ItemResult struct {
    96  	Name    string
    97  	Value   string
    98  	Status  int    `json:",omitempty"`
    99  	SucInfo string `json:",omitempty"`
   100  	ErrInfo string `json:",omitempty"`
   101  }
   102  
   103  func init() {
   104  	_ = v1alpha1.AddToScheme(scheme)
   105  	_ = clientgoscheme.AddToScheme(scheme)
   106  }
   107  
   108  func upperCaseChaos(str string) string {
   109  	parts := regexp.MustCompile("(.*)(chaos)").FindStringSubmatch(str)
   110  	return strings.Title(parts[1]) + strings.Title(parts[2])
   111  }
   112  
   113  // PrettyPrint print with tab number and color
   114  func PrettyPrint(s string, indentLevel int, color Color) {
   115  	var tabStr string
   116  	for i := 0; i < indentLevel; i++ {
   117  		tabStr += "\t"
   118  	}
   119  	str := fmt.Sprintf("%s%s\n\n", tabStr, regexp.MustCompile("\n").ReplaceAllString(s, "\n"+tabStr))
   120  	if color != NoColor {
   121  		if cfunc, ok := colorFunc[color]; !ok {
   122  			fmt.Print("COLOR NOT SUPPORTED")
   123  		} else {
   124  			cfunc(str)
   125  		}
   126  	} else {
   127  		fmt.Print(str)
   128  	}
   129  }
   130  
   131  // PrintResult prints result to users in prettier format
   132  func PrintResult(result []ChaosResult) {
   133  	for _, chaos := range result {
   134  		PrettyPrint("[Chaos]: "+chaos.Name, 0, Blue)
   135  		for _, pod := range chaos.Pods {
   136  			PrettyPrint("[Pod]: "+pod.Name, 0, Blue)
   137  			for i, item := range pod.Items {
   138  				PrettyPrint(fmt.Sprintf("%d. [%s]", i+1, item.Name), 1, Cyan)
   139  				PrettyPrint(item.Value, 1, NoColor)
   140  				if item.Status == ItemSuccess {
   141  					if item.SucInfo != "" {
   142  						PrettyPrint(item.SucInfo, 1, Green)
   143  					} else {
   144  						PrettyPrint("Execute as expected", 1, Green)
   145  					}
   146  				} else if item.Status == ItemFailure {
   147  					PrettyPrint(fmt.Sprintf("Failed: %s ", item.ErrInfo), 1, Red)
   148  				}
   149  			}
   150  		}
   151  	}
   152  }
   153  
   154  // MarshalChaos returns json in readable format
   155  func MarshalChaos(s interface{}) (string, error) {
   156  	b, err := json.MarshalIndent(s, "", "  ")
   157  	if err != nil {
   158  		return "", errors.Wrapf(err, "failed to marshal indent")
   159  	}
   160  	return string(b), nil
   161  }
   162  
   163  // InitClientSet inits two different clients that would be used
   164  func InitClientSet() (*ClientSet, error) {
   165  	restconfig, err := config.GetConfig()
   166  	if err != nil {
   167  		return nil, err
   168  	}
   169  	ctrlClient, err := client.New(restconfig, client.Options{Scheme: scheme})
   170  	if err != nil {
   171  		return nil, fmt.Errorf("failed to create client")
   172  	}
   173  	kubeClient, err := kubernetes.NewForConfig(restconfig)
   174  	if err != nil {
   175  		return nil, errors.Wrap(err, "error in getting acess to k8s")
   176  	}
   177  	return &ClientSet{ctrlClient, kubeClient}, nil
   178  }
   179  
   180  // GetPods returns pod list and corresponding chaos daemon
   181  func GetPods(ctx context.Context, chaosName string, status v1alpha1.ChaosStatus, selectorSpec v1alpha1.PodSelectorSpec, c client.Client) ([]v1.Pod, []v1.Pod, error) {
   182  	// get podName
   183  	failedMessage := "" // TODO: fill in message
   184  	if failedMessage != "" {
   185  		PrettyPrint(fmt.Sprintf("chaos %s failed with: %s", chaosName, failedMessage), 0, Red)
   186  	}
   187  
   188  	pods, err := pod.SelectPods(ctx, c, c, selectorSpec, ctrlconfig.ControllerCfg.ClusterScoped, ctrlconfig.ControllerCfg.TargetNamespace, false)
   189  	if err != nil {
   190  		return nil, nil, errors.Wrap(err, "failed to SelectPods")
   191  	}
   192  	L().WithName("GetPods").V(4).Info("select pods for chaos", "chaos", chaosName, "pods", pods)
   193  	if len(pods) == 0 {
   194  		return nil, nil, fmt.Errorf("no pods found for chaos %s, selector: %s", chaosName, selectorSpec)
   195  	}
   196  
   197  	var chaosDaemons []v1.Pod
   198  	// get chaos daemon
   199  	for _, chaosPod := range pods {
   200  		nodeName := chaosPod.Spec.NodeName
   201  		daemonSelector := v1alpha1.PodSelectorSpec{
   202  			Nodes: []string{nodeName},
   203  			GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
   204  				LabelSelectors: map[string]string{"app.kubernetes.io/component": "chaos-daemon"},
   205  			},
   206  		}
   207  		daemons, err := pod.SelectPods(ctx, c, nil, daemonSelector, ctrlconfig.ControllerCfg.ClusterScoped, ctrlconfig.ControllerCfg.TargetNamespace, false)
   208  		if err != nil {
   209  			return nil, nil, errors.Wrap(err, fmt.Sprintf("failed to select daemon pod for pod %s", chaosPod.GetName()))
   210  		}
   211  		if len(daemons) == 0 {
   212  			return nil, nil, fmt.Errorf("no daemons found for pod %s with selector: %s", chaosPod.GetName(), daemonSelector)
   213  		}
   214  		chaosDaemons = append(chaosDaemons, daemons[0])
   215  	}
   216  
   217  	return pods, chaosDaemons, nil
   218  }
   219  
   220  // GetChaosList returns chaos list limited by input
   221  func GetChaosList(ctx context.Context, chaosType string, chaosName string, ns string, c client.Client) ([]runtime.Object, []string, error) {
   222  	chaosType = upperCaseChaos(strings.ToLower(chaosType))
   223  	allKinds := v1alpha1.AllKinds()
   224  	chaosListInterface := allKinds[chaosType].SpawnList()
   225  
   226  	if err := c.List(ctx, chaosListInterface, client.InNamespace(ns)); err != nil {
   227  		return nil, nil, errors.Wrapf(err, "failed to get chaosList with namespace %s", ns)
   228  	}
   229  	chaosList := chaosListInterface.GetItems()
   230  	if len(chaosList) == 0 {
   231  		return nil, nil, fmt.Errorf("no chaos is found, please check your input")
   232  	}
   233  
   234  	var retList []runtime.Object
   235  	var retNameList []string
   236  	for _, ch := range chaosList {
   237  		if chaosName == "" || chaosName == ch.GetName() {
   238  			chaos, err := getChaos(ctx, chaosType, ch.GetName(), ns, c)
   239  			if err != nil {
   240  				return nil, nil, err
   241  			}
   242  			retList = append(retList, chaos)
   243  			retNameList = append(retNameList, ch.GetName())
   244  		}
   245  	}
   246  	if len(retList) == 0 {
   247  		return nil, nil, fmt.Errorf("no chaos is found, please check your input")
   248  	}
   249  
   250  	return retList, retNameList, nil
   251  }
   252  
   253  func getChaos(ctx context.Context, chaosType string, chaosName string, ns string, c client.Client) (runtime.Object, error) {
   254  	allKinds := v1alpha1.AllKinds()
   255  	chaos := allKinds[chaosType].SpawnObject()
   256  	objectKey := client.ObjectKey{
   257  		Namespace: ns,
   258  		Name:      chaosName,
   259  	}
   260  	if err := c.Get(ctx, objectKey, chaos); err != nil {
   261  		return nil, errors.Wrapf(err, "failed to get chaos %s", chaosName)
   262  	}
   263  	return chaos, nil
   264  }
   265  
   266  // GetPidFromPS returns pid-command pairs
   267  func GetPidFromPS(ctx context.Context, pod v1.Pod, daemon v1.Pod, c *kubernetes.Clientset) ([]string, []string, error) {
   268  	cmd := "ps"
   269  	out, err := ExecBypass(ctx, pod, daemon, cmd, c)
   270  	if err != nil {
   271  		return nil, nil, errors.Wrapf(err, "run command %s failed", cmd)
   272  	}
   273  	outLines := strings.Split(string(out), "\n")
   274  	if len(outLines) < 2 {
   275  		return nil, nil, fmt.Errorf("ps returns empty")
   276  	}
   277  	titles := strings.Fields(outLines[0])
   278  	var pidColumn, cmdColumn int
   279  	for i, t := range titles {
   280  		if t == "PID" {
   281  			pidColumn = i
   282  		}
   283  		if t == "COMMAND" || t == "CMD" {
   284  			cmdColumn = i
   285  		}
   286  	}
   287  	if pidColumn == 0 && cmdColumn == 0 {
   288  		return nil, nil, fmt.Errorf("parsing ps error: could not get PID and COMMAND column")
   289  	}
   290  	var pids, commands []string
   291  	for _, line := range outLines[1:] {
   292  		item := strings.Fields(line)
   293  		// break when got empty line
   294  		if len(item) == 0 {
   295  			break
   296  		}
   297  		pids = append(pids, item[pidColumn])
   298  		commands = append(commands, item[cmdColumn])
   299  	}
   300  	return pids, commands, nil
   301  }
   302  
   303  // GetPidFromPod returns pid given containerd ID in pod
   304  func GetPidFromPod(ctx context.Context, pod v1.Pod, daemon v1.Pod) (uint32, error) {
   305  	pfCancel, localPort, err := forwardPorts(ctx, daemon, uint16(ctrlconfig.ControllerCfg.ChaosDaemonPort))
   306  	if err != nil {
   307  		return 0, errors.Wrapf(err, "forward ports for daemon pod %s/%s failed", daemon.Namespace, daemon.Name)
   308  	}
   309  	L().WithName("GetPidFromPod").V(4).Info(fmt.Sprintf("port forwarding 127.0.0.1:%d -> pod/%s/%s:%d", localPort, daemon.Namespace, daemon.Name, ctrlconfig.ControllerCfg.ChaosDaemonPort))
   310  
   311  	defer func() {
   312  		pfCancel()
   313  	}()
   314  
   315  	daemonClient, err := ConnectToLocalChaosDaemon(int(localPort))
   316  	if err != nil {
   317  		return 0, errors.Wrapf(err, "failed to create new chaos daemon client with local port %d", localPort)
   318  	}
   319  	defer daemonClient.Close()
   320  
   321  	if len(pod.Status.ContainerStatuses) == 0 {
   322  		return 0, fmt.Errorf("%s %s can't get the state of container", pod.Namespace, pod.Name)
   323  	}
   324  
   325  	res, err := daemonClient.ContainerGetPid(ctx, &pb.ContainerRequest{
   326  		Action: &pb.ContainerAction{
   327  			Action: pb.ContainerAction_GETPID,
   328  		},
   329  		ContainerId: pod.Status.ContainerStatuses[0].ContainerID,
   330  	})
   331  	if err != nil {
   332  		return 0, errors.Wrapf(err, "failed get pid from pod %s/%s", pod.GetNamespace(), pod.GetName())
   333  	}
   334  	return res.Pid, nil
   335  }
   336  
   337  func forwardPorts(ctx context.Context, pod v1.Pod, port uint16) (context.CancelFunc, uint16, error) {
   338  	commonRestClientGetter := NewCommonRestClientGetter()
   339  	fw, err := portforward.NewPortForwarder(ctx, commonRestClientGetter, false)
   340  	if err != nil {
   341  		return nil, 0, errors.Wrap(err, "failed to create port forwarder")
   342  	}
   343  	_, localPort, pfCancel, err := portforward.ForwardOnePort(fw, pod.Namespace, pod.Name, port)
   344  	return pfCancel, localPort, err
   345  }
   346  
   347  func ForwardSvcPorts(ctx context.Context, ns, svc string, port uint16) (context.CancelFunc, uint16, error) {
   348  	commonRestClientGetter := NewCommonRestClientGetter()
   349  	fw, err := portforward.NewPortForwarder(ctx, commonRestClientGetter, false)
   350  	if err != nil {
   351  		return nil, 0, errors.Wrap(err, "failed to create port forwarder")
   352  	}
   353  	_, localPort, pfCancel, err := portforward.ForwardOnePort(fw, ns, svc, port)
   354  	return pfCancel, localPort, err
   355  }
   356  
   357  // Log print log of pod
   358  func Log(pod v1.Pod, tail int64, c *kubernetes.Clientset) (string, error) {
   359  	podLogOpts := v1.PodLogOptions{}
   360  	//use negative tail to indicate no tail limit is needed
   361  	if tail >= 0 {
   362  		podLogOpts.TailLines = func(i int64) *int64 { return &i }(tail)
   363  	}
   364  
   365  	req := c.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts)
   366  	// FIXME: get context from parameter
   367  	podLogs, err := req.Stream(context.TODO())
   368  	if err != nil {
   369  		return "", errors.Wrapf(err, "failed to open log stream for pod %s/%s", pod.GetNamespace(), pod.GetName())
   370  	}
   371  	defer podLogs.Close()
   372  
   373  	buf := new(bytes.Buffer)
   374  	_, err = io.Copy(buf, podLogs)
   375  	if err != nil {
   376  		return "", errors.Wrapf(err, "failed to copy information from podLogs to buf")
   377  	}
   378  	return buf.String(), nil
   379  }
   380  
   381  // ConnectToLocalChaosDaemon would connect to ChaosDaemon run in localhost
   382  func ConnectToLocalChaosDaemon(port int) (daemonClient.ChaosDaemonClientInterface, error) {
   383  	if cli := mock.On("MockChaosDaemonClient"); cli != nil {
   384  		return cli.(daemonClient.ChaosDaemonClientInterface), nil
   385  	}
   386  	if err := mock.On("NewChaosDaemonClientError"); err != nil {
   387  		return nil, err.(error)
   388  	}
   389  	cc, err := getGrpcClient(port)
   390  
   391  	if err != nil {
   392  		return nil, err
   393  	}
   394  	return daemonClient.New(cc), nil
   395  }
   396  
   397  func getGrpcClient(port int) (*grpc.ClientConn, error) {
   398  	builder := grpcUtils.Builder("localhost", port)
   399  	if Insecure {
   400  		builder.Insecure()
   401  	} else {
   402  		if TLSFiles.CaCert == "" || TLSFiles.Cert == "" || TLSFiles.Key == "" {
   403  			PrettyPrint("TLS Files are not complete, fall back to use secrets.", 0, Green)
   404  			config, err := getTLSConfigFromSecrets()
   405  			if err != nil {
   406  				return nil, err
   407  			}
   408  			builder.TLSFromRaw(config.CaCert, config.Cert, config.Key)
   409  		} else {
   410  			PrettyPrint("Using TLS Files.", 0, Green)
   411  			builder.TLSFromFile(TLSFiles.CaCert, TLSFiles.Cert, TLSFiles.Key)
   412  		}
   413  	}
   414  	return builder.Build()
   415  }
   416  
   417  func getTLSConfigFromSecrets() (*grpcUtils.TLSRaw, error) {
   418  	restconfig, err := config.GetConfig()
   419  	if err != nil {
   420  		return nil, err
   421  	}
   422  	kubeClient, err := kubernetes.NewForConfig(restconfig)
   423  	if err != nil {
   424  		return nil, err
   425  	}
   426  	// FIXME: get context from parameter
   427  	secret, err := kubeClient.CoreV1().Secrets(ChaosDaemonNamespace).Get(context.TODO(), ChaosDaemonClientCert, metav1.GetOptions{})
   428  	if err != nil {
   429  		return nil, err
   430  	}
   431  	cfg := grpcUtils.TLSRaw{
   432  		CaCert: secret.Data["ca.crt"],
   433  		Cert:   secret.Data["tls.crt"],
   434  		Key:    secret.Data["tls.key"],
   435  	}
   436  	return &cfg, nil
   437  }
   438