...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/chaosctl/common/common.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/chaosctl/common

     1  // Copyright 2019 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package common
    15  
    16  import (
    17  	"bytes"
    18  	"context"
    19  	"encoding/json"
    20  	"fmt"
    21  	"io"
    22  	"regexp"
    23  	"strings"
    24  	"time"
    25  
    26  	"google.golang.org/grpc"
    27  
    28  	grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
    29  	"github.com/chaos-mesh/chaos-mesh/pkg/mock"
    30  
    31  	"github.com/fatih/color"
    32  	"github.com/pkg/errors"
    33  	v1 "k8s.io/api/core/v1"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/runtime"
    36  	"k8s.io/client-go/kubernetes"
    37  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    38  	"sigs.k8s.io/controller-runtime/pkg/client"
    39  	"sigs.k8s.io/controller-runtime/pkg/client/config"
    40  
    41  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    42  	ctrlconfig "github.com/chaos-mesh/chaos-mesh/controllers/config"
    43  	daemonClient "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/client"
    44  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    45  	"github.com/chaos-mesh/chaos-mesh/pkg/portforward"
    46  	"github.com/chaos-mesh/chaos-mesh/pkg/selector"
    47  )
    48  
    49  type Color string
    50  
    51  const (
    52  	Blue    Color = "Blue"
    53  	Red     Color = "Red"
    54  	Green   Color = "Green"
    55  	Cyan    Color = "Cyan"
    56  	NoColor Color = ""
    57  )
    58  
    59  var (
    60  	colorFunc = map[Color]func(string, ...interface{}){
    61  		Blue:  color.Blue,
    62  		Red:   color.Red,
    63  		Green: color.Green,
    64  		Cyan:  color.Cyan,
    65  	}
    66  	scheme = runtime.NewScheme()
    67  )
    68  
    69  // ClientSet contains two different clients
    70  type ClientSet struct {
    71  	CtrlCli client.Client
    72  	KubeCli *kubernetes.Clientset
    73  }
    74  
    75  type ChaosResult struct {
    76  	Name string
    77  	Pods []PodResult
    78  }
    79  
    80  type PodResult struct {
    81  	Name  string
    82  	Items []ItemResult
    83  }
    84  
    85  const (
    86  	ItemSuccess = iota + 1
    87  	ItemFailure
    88  )
    89  
    90  const ChaosDaemonClientCert = "chaos-mesh-daemon-client-certs"
    91  const ChaosDaemonNamespace = "chaos-testing"
    92  
    93  var TLSFiles TLSFileConfig
    94  var Insecure bool
    95  
    96  type ItemResult struct {
    97  	Name    string
    98  	Value   string
    99  	Status  int    `json:",omitempty"`
   100  	SucInfo string `json:",omitempty"`
   101  	ErrInfo string `json:",omitempty"`
   102  }
   103  
   104  func init() {
   105  	_ = v1alpha1.AddToScheme(scheme)
   106  	_ = clientgoscheme.AddToScheme(scheme)
   107  }
   108  
   109  func upperCaseChaos(str string) string {
   110  	parts := regexp.MustCompile("(.*)(chaos)").FindStringSubmatch(str)
   111  	return strings.Title(parts[1]) + strings.Title(parts[2])
   112  }
   113  
   114  // PrettyPrint print with tab number and color
   115  func PrettyPrint(s string, indentLevel int, color Color) {
   116  	var tabStr string
   117  	for i := 0; i < indentLevel; i++ {
   118  		tabStr += "\t"
   119  	}
   120  	str := fmt.Sprintf("%s%s\n\n", tabStr, regexp.MustCompile("\n").ReplaceAllString(s, "\n"+tabStr))
   121  	if color != NoColor {
   122  		if cfunc, ok := colorFunc[color]; !ok {
   123  			fmt.Print("COLOR NOT SUPPORTED")
   124  		} else {
   125  			cfunc(str)
   126  		}
   127  	} else {
   128  		fmt.Print(str)
   129  	}
   130  }
   131  
   132  // PrintResult prints result to users in prettier format
   133  func PrintResult(result []ChaosResult) {
   134  	for _, chaos := range result {
   135  		PrettyPrint("[Chaos]: "+chaos.Name, 0, Blue)
   136  		for _, pod := range chaos.Pods {
   137  			PrettyPrint("[Pod]: "+pod.Name, 0, Blue)
   138  			for i, item := range pod.Items {
   139  				PrettyPrint(fmt.Sprintf("%d. [%s]", i+1, item.Name), 1, Cyan)
   140  				PrettyPrint(item.Value, 1, NoColor)
   141  				if item.Status == ItemSuccess {
   142  					if item.SucInfo != "" {
   143  						PrettyPrint(item.SucInfo, 1, Green)
   144  					} else {
   145  						PrettyPrint("Execute as expected", 1, Green)
   146  					}
   147  				} else if item.Status == ItemFailure {
   148  					PrettyPrint(fmt.Sprintf("Failed: %s ", item.ErrInfo), 1, Red)
   149  				}
   150  			}
   151  		}
   152  	}
   153  }
   154  
   155  // MarshalChaos returns json in readable format
   156  func MarshalChaos(s interface{}) (string, error) {
   157  	b, err := json.MarshalIndent(s, "", "  ")
   158  	if err != nil {
   159  		return "", errors.Wrapf(err, "failed to marshal indent")
   160  	}
   161  	return string(b), nil
   162  }
   163  
   164  // InitClientSet inits two different clients that would be used
   165  func InitClientSet() (*ClientSet, error) {
   166  	restconfig, err := config.GetConfig()
   167  	if err != nil {
   168  		return nil, err
   169  	}
   170  	ctrlClient, err := client.New(restconfig, client.Options{Scheme: scheme})
   171  	if err != nil {
   172  		return nil, fmt.Errorf("failed to create client")
   173  	}
   174  	kubeClient, err := kubernetes.NewForConfig(restconfig)
   175  	if err != nil {
   176  		return nil, errors.Wrap(err, "error in getting acess to k8s")
   177  	}
   178  	return &ClientSet{ctrlClient, kubeClient}, nil
   179  }
   180  
   181  // GetPods returns pod list and corresponding chaos daemon
   182  func GetPods(ctx context.Context, chaosName string, status v1alpha1.ChaosStatus, selectorSpec v1alpha1.SelectorSpec, c client.Client) ([]v1.Pod, []v1.Pod, error) {
   183  	// get podName
   184  	failedMessage := status.FailedMessage
   185  	if failedMessage != "" {
   186  		PrettyPrint(fmt.Sprintf("chaos %s failed with: %s", chaosName, failedMessage), 0, Red)
   187  	}
   188  
   189  	phase := status.Experiment.Phase
   190  	nextStart := status.Scheduler.NextStart
   191  
   192  	if phase == v1alpha1.ExperimentPhaseWaiting {
   193  		waitTime := nextStart.Sub(time.Now())
   194  		L().WithName("GetPods").V(1).Info(fmt.Sprintf("Waiting for chaos %s to start, in %s\n", chaosName, waitTime))
   195  		time.Sleep(waitTime)
   196  	}
   197  
   198  	pods, err := selector.SelectPods(ctx, c, c, selectorSpec, ctrlconfig.ControllerCfg.ClusterScoped, ctrlconfig.ControllerCfg.TargetNamespace, ctrlconfig.ControllerCfg.EnableFilterNamespace)
   199  	if err != nil {
   200  		return nil, nil, errors.Wrap(err, "failed to SelectPods")
   201  	}
   202  	L().WithName("GetPods").V(4).Info("select pods for chaos", "chaos", chaosName, "pods", pods)
   203  	if len(pods) == 0 {
   204  		return nil, nil, fmt.Errorf("no pods found for chaos %s, selector: %s", chaosName, selectorSpec)
   205  	}
   206  
   207  	// TODO: replace select daemon by
   208  	var chaosDaemons []v1.Pod
   209  	// get chaos daemon
   210  	for _, pod := range pods {
   211  		nodeName := pod.Spec.NodeName
   212  		daemonSelector := v1alpha1.SelectorSpec{
   213  			Nodes:          []string{nodeName},
   214  			LabelSelectors: map[string]string{"app.kubernetes.io/component": "chaos-daemon"},
   215  		}
   216  		daemons, err := selector.SelectPods(ctx, c, nil, daemonSelector, ctrlconfig.ControllerCfg.ClusterScoped, ctrlconfig.ControllerCfg.TargetNamespace, ctrlconfig.ControllerCfg.EnableFilterNamespace)
   217  		if err != nil {
   218  			return nil, nil, errors.Wrap(err, fmt.Sprintf("failed to select daemon pod for pod %s", pod.GetName()))
   219  		}
   220  		if len(daemons) == 0 {
   221  			return nil, nil, fmt.Errorf("no daemons found for pod %s with selector: %s", pod.GetName(), daemonSelector)
   222  		}
   223  		chaosDaemons = append(chaosDaemons, daemons[0])
   224  	}
   225  
   226  	return pods, chaosDaemons, nil
   227  }
   228  
   229  // GetChaosList returns chaos list limited by input
   230  func GetChaosList(ctx context.Context, chaosType string, chaosName string, ns string, c client.Client) ([]runtime.Object, []string, error) {
   231  	chaosType = upperCaseChaos(strings.ToLower(chaosType))
   232  	allKinds := v1alpha1.AllKinds()
   233  	chaosListInterface := allKinds[chaosType].ChaosList
   234  
   235  	if err := c.List(ctx, chaosListInterface, client.InNamespace(ns)); err != nil {
   236  		return nil, nil, errors.Wrapf(err, "failed to get chaosList with namespace %s", ns)
   237  	}
   238  	chaosList := chaosListInterface.ListChaos()
   239  	if len(chaosList) == 0 {
   240  		return nil, nil, fmt.Errorf("no chaos is found, please check your input")
   241  	}
   242  
   243  	var retList []runtime.Object
   244  	var retNameList []string
   245  	for _, ch := range chaosList {
   246  		if chaosName == "" || chaosName == ch.Name {
   247  			chaos, err := getChaos(ctx, chaosType, ch.Name, ns, c)
   248  			if err != nil {
   249  				return nil, nil, err
   250  			}
   251  			retList = append(retList, chaos)
   252  			retNameList = append(retNameList, ch.Name)
   253  		}
   254  	}
   255  	if len(retList) == 0 {
   256  		return nil, nil, fmt.Errorf("no chaos is found, please check your input")
   257  	}
   258  
   259  	return retList, retNameList, nil
   260  }
   261  
   262  func getChaos(ctx context.Context, chaosType string, chaosName string, ns string, c client.Client) (runtime.Object, error) {
   263  	allKinds := v1alpha1.AllKinds()
   264  	chaos := allKinds[chaosType].Chaos
   265  	objectKey := client.ObjectKey{
   266  		Namespace: ns,
   267  		Name:      chaosName,
   268  	}
   269  	if err := c.Get(ctx, objectKey, chaos); err != nil {
   270  		return nil, errors.Wrapf(err, "failed to get chaos %s", chaosName)
   271  	}
   272  	return chaos, nil
   273  }
   274  
   275  // GetPidFromPS returns pid-command pairs
   276  func GetPidFromPS(ctx context.Context, pod v1.Pod, daemon v1.Pod, c *kubernetes.Clientset) ([]string, []string, error) {
   277  	cmd := "ps"
   278  	out, err := ExecBypass(ctx, pod, daemon, cmd, c)
   279  	if err != nil {
   280  		return nil, nil, errors.Wrapf(err, "run command %s failed", cmd)
   281  	}
   282  	outLines := strings.Split(string(out), "\n")
   283  	if len(outLines) < 2 {
   284  		return nil, nil, fmt.Errorf("ps returns empty")
   285  	}
   286  	titles := strings.Fields(outLines[0])
   287  	var pidColumn, cmdColumn int
   288  	for i, t := range titles {
   289  		if t == "PID" {
   290  			pidColumn = i
   291  		}
   292  		if t == "COMMAND" || t == "CMD" {
   293  			cmdColumn = i
   294  		}
   295  	}
   296  	if pidColumn == 0 && cmdColumn == 0 {
   297  		return nil, nil, fmt.Errorf("parsing ps error: could not get PID and COMMAND column")
   298  	}
   299  	var pids, commands []string
   300  	for _, line := range outLines[1:] {
   301  		item := strings.Fields(line)
   302  		// break when got empty line
   303  		if len(item) == 0 {
   304  			break
   305  		}
   306  		pids = append(pids, item[pidColumn])
   307  		commands = append(commands, item[cmdColumn])
   308  	}
   309  	return pids, commands, nil
   310  }
   311  
   312  // GetPidFromPod returns pid given containerd ID in pod
   313  func GetPidFromPod(ctx context.Context, pod v1.Pod, daemon v1.Pod) (uint32, error) {
   314  	pfCancel, localPort, err := forwardPorts(ctx, daemon, uint16(ctrlconfig.ControllerCfg.ChaosDaemonPort))
   315  	if err != nil {
   316  		return 0, errors.Wrapf(err, "forward ports for daemon pod %s/%s failed", daemon.Namespace, daemon.Name)
   317  	}
   318  	L().WithName("GetPidFromPod").V(4).Info(fmt.Sprintf("port forwarding 127.0.0.1:%d -> pod/%s/%s:%d", localPort, daemon.Namespace, daemon.Name, ctrlconfig.ControllerCfg.ChaosDaemonPort))
   319  
   320  	defer func() {
   321  		pfCancel()
   322  	}()
   323  
   324  	daemonClient, err := ConnectToLocalChaosDaemon(int(localPort))
   325  	if err != nil {
   326  		return 0, errors.Wrapf(err, "failed to create new chaos daemon client with local port %d", localPort)
   327  	}
   328  	defer daemonClient.Close()
   329  
   330  	if len(pod.Status.ContainerStatuses) == 0 {
   331  		return 0, fmt.Errorf("%s %s can't get the state of container", pod.Namespace, pod.Name)
   332  	}
   333  
   334  	res, err := daemonClient.ContainerGetPid(ctx, &pb.ContainerRequest{
   335  		Action: &pb.ContainerAction{
   336  			Action: pb.ContainerAction_GETPID,
   337  		},
   338  		ContainerId: pod.Status.ContainerStatuses[0].ContainerID,
   339  	})
   340  	if err != nil {
   341  		return 0, errors.Wrapf(err, "failed get pid from pod %s/%s", pod.GetNamespace(), pod.GetName())
   342  	}
   343  	return res.Pid, nil
   344  }
   345  
   346  func forwardPorts(ctx context.Context, pod v1.Pod, port uint16) (context.CancelFunc, uint16, error) {
   347  	commonRestClientGetter := NewCommonRestClientGetter()
   348  	fw, err := portforward.NewPortForwarder(ctx, commonRestClientGetter, false)
   349  	if err != nil {
   350  		return nil, 0, errors.Wrap(err, "failed to create port forwarder")
   351  	}
   352  	_, localPort, pfCancel, err := portforward.ForwardOnePort(fw, pod.Namespace, pod.Name, port)
   353  	return pfCancel, localPort, err
   354  }
   355  
   356  // Log print log of pod
   357  func Log(pod v1.Pod, tail int64, c *kubernetes.Clientset) (string, error) {
   358  	podLogOpts := v1.PodLogOptions{}
   359  	//use negative tail to indicate no tail limit is needed
   360  	if tail >= 0 {
   361  		podLogOpts.TailLines = func(i int64) *int64 { return &i }(tail)
   362  	}
   363  
   364  	req := c.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts)
   365  	podLogs, err := req.Stream()
   366  	if err != nil {
   367  		return "", errors.Wrapf(err, "failed to open log stream for pod %s/%s", pod.GetNamespace(), pod.GetName())
   368  	}
   369  	defer podLogs.Close()
   370  
   371  	buf := new(bytes.Buffer)
   372  	_, err = io.Copy(buf, podLogs)
   373  	if err != nil {
   374  		return "", errors.Wrapf(err, "failed to copy information from podLogs to buf")
   375  	}
   376  	return buf.String(), nil
   377  }
   378  
   379  // CheckFailedMessage provide debug info and suggestions from failed message
   380  func CheckFailedMessage(ctx context.Context, failedMessage string, daemons []v1.Pod, c *ClientSet) error {
   381  	if strings.Contains(failedMessage, "rpc error: code = Unavailable desc = connection error") || strings.Contains(failedMessage, "connect: connection refused") {
   382  		if err := checkConnForCtrlAndDaemon(ctx, daemons, c); err != nil {
   383  			return fmt.Errorf("error occurs when check failed message: %s", err)
   384  		}
   385  	}
   386  	return nil
   387  }
   388  
   389  func checkConnForCtrlAndDaemon(ctx context.Context, daemons []v1.Pod, c *ClientSet) error {
   390  	ctrlSelector := v1alpha1.SelectorSpec{
   391  		LabelSelectors: map[string]string{"app.kubernetes.io/component": "controller-manager"},
   392  	}
   393  	ctrlMgrs, err := selector.SelectPods(ctx, c.CtrlCli, c.CtrlCli, ctrlSelector, ctrlconfig.ControllerCfg.ClusterScoped, ctrlconfig.ControllerCfg.TargetNamespace, ctrlconfig.ControllerCfg.EnableFilterNamespace)
   394  	if err != nil {
   395  		return errors.Wrapf(err, "failed to select pod for controller-manager")
   396  	}
   397  	if len(ctrlMgrs) == 0 {
   398  		return fmt.Errorf("could not found controller manager")
   399  	}
   400  	for _, daemon := range daemons {
   401  		daemonIP := daemon.Status.PodIP
   402  		cmd := fmt.Sprintf("ping -c 1 %s > /dev/null; echo $?", daemonIP)
   403  		out, err := Exec(ctx, ctrlMgrs[0], cmd, c.KubeCli)
   404  		if err != nil {
   405  			return errors.Wrapf(err, "run command %s failed", cmd)
   406  		}
   407  		if string(out) == "0" {
   408  			PrettyPrint(fmt.Sprintf("Connection between Controller-Manager and Daemon %s (ip address: %s) works well", daemon.Name, daemonIP), 0, Green)
   409  		} else {
   410  			PrettyPrint(fmt.Sprintf(`Connection between Controller-Manager and Daemon %s (ip address: %s) is blocked.
   411  Please check network policy / firewall, or see FAQ on website`, daemon.Name, daemonIP), 0, Red)
   412  		}
   413  
   414  	}
   415  	return nil
   416  }
   417  
   418  // ConnectToLocalChaosDaemon would connect to ChaosDaemon run in localhost
   419  func ConnectToLocalChaosDaemon(port int) (daemonClient.ChaosDaemonClientInterface, error) {
   420  	if cli := mock.On("MockChaosDaemonClient"); cli != nil {
   421  		return cli.(daemonClient.ChaosDaemonClientInterface), nil
   422  	}
   423  	if err := mock.On("NewChaosDaemonClientError"); err != nil {
   424  		return nil, err.(error)
   425  	}
   426  	cc, err := getGrpcClient(port)
   427  
   428  	if err != nil {
   429  		return nil, err
   430  	}
   431  	return daemonClient.New(cc), nil
   432  }
   433  
   434  func getGrpcClient(port int) (*grpc.ClientConn, error) {
   435  	if Insecure {
   436  		return grpcUtils.CreateGrpcConnection("localhost", port, "", "", "")
   437  	}
   438  	if TLSFiles.CaCert == "" || TLSFiles.Cert == "" || TLSFiles.Key == "" {
   439  		PrettyPrint("TLS Files are not complete, fall back to use secrets.", 0, Green)
   440  		config, err := getTLSConfigFromSecrets()
   441  		if err != nil {
   442  			return nil, err
   443  		}
   444  		return grpcUtils.CreateGrpcConnectionFromRaw("localhost", port, config.caCert, config.cert, config.key)
   445  	}
   446  	PrettyPrint("Using TLS Files.", 0, Green)
   447  	return grpcUtils.CreateGrpcConnection("localhost", port, TLSFiles.CaCert, TLSFiles.Cert, TLSFiles.Key)
   448  }
   449  
   450  func getTLSConfigFromSecrets() (*rawTLSConfig, error) {
   451  	var cfg rawTLSConfig
   452  	restconfig, err := config.GetConfig()
   453  	if err != nil {
   454  		return nil, err
   455  	}
   456  	kubeClient, err := kubernetes.NewForConfig(restconfig)
   457  	if err != nil {
   458  		return nil, err
   459  	}
   460  	secret, err := kubeClient.CoreV1().Secrets(ChaosDaemonNamespace).Get(ChaosDaemonClientCert, metav1.GetOptions{})
   461  	if err != nil {
   462  		return nil, err
   463  	}
   464  	cfg.caCert = secret.Data["ca.crt"]
   465  	cfg.cert = secret.Data["tls.crt"]
   466  	cfg.key = secret.Data["tls.key"]
   467  	return &cfg, nil
   468  }
   469  
   470  type rawTLSConfig struct {
   471  	caCert []byte
   472  	cert   []byte
   473  	key    []byte
   474  }
   475  type TLSFileConfig struct {
   476  	CaCert string
   477  	Cert   string
   478  	Key    string
   479  }
   480