...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/stress_server_linux.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon

     1  // Copyright 2020 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package chaosdaemon
    15  
    16  import (
    17  	"bufio"
    18  	"context"
    19  	"fmt"
    20  	"io"
    21  	"os"
    22  	"path/filepath"
    23  	"strconv"
    24  	"strings"
    25  	"syscall"
    26  	"time"
    27  
    28  	"github.com/containerd/cgroups"
    29  	"github.com/golang/protobuf/ptypes/empty"
    30  	"github.com/pkg/errors"
    31  	"github.com/shirou/gopsutil/process"
    32  
    33  	"github.com/chaos-mesh/chaos-mesh/pkg/bpm"
    34  	pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    35  )
    36  
    37  var (
    38  	// Possible cgroup subsystems
    39  	cgroupSubsys = []string{"cpu", "memory", "systemd", "net_cls",
    40  		"net_prio", "freezer", "blkio", "perf_event", "devices",
    41  		"cpuset", "cpuacct", "pids", "hugetlb"}
    42  )
    43  
    44  func (s *DaemonServer) ExecStressors(ctx context.Context,
    45  	req *pb.ExecStressRequest) (*pb.ExecStressResponse, error) {
    46  	log.Info("Executing stressors", "request", req)
    47  	pid, err := s.crClient.GetPidFromContainerID(ctx, req.Target)
    48  	if err != nil {
    49  		return nil, err
    50  	}
    51  	path := pidPath(int(pid))
    52  	id, err := s.crClient.FormatContainerID(ctx, req.Target)
    53  	if err != nil {
    54  		return nil, err
    55  	}
    56  	cgroup, err := findValidCgroup(path, id)
    57  	if err != nil {
    58  		return nil, err
    59  	}
    60  	if req.Scope == pb.ExecStressRequest_POD {
    61  		cgroup, _ = filepath.Split(cgroup)
    62  	}
    63  	control, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(cgroup))
    64  	if err != nil {
    65  		return nil, err
    66  	}
    67  
    68  	processBuilder := bpm.DefaultProcessBuilder("stress-ng", strings.Fields(req.Stressors)...).
    69  		EnablePause()
    70  	if req.EnterNS {
    71  		processBuilder = processBuilder.SetNS(pid, bpm.PidNS)
    72  	}
    73  	cmd := processBuilder.Build()
    74  
    75  	err = s.backgroundProcessManager.StartProcess(cmd)
    76  	if err != nil {
    77  		return nil, err
    78  	}
    79  	log.Info("Start process successfully")
    80  
    81  	procState, err := process.NewProcess(int32(cmd.Process.Pid))
    82  	if err != nil {
    83  		return nil, err
    84  	}
    85  	ct, err := procState.CreateTime()
    86  	if err != nil {
    87  		return nil, err
    88  	}
    89  
    90  	if err = control.Add(cgroups.Process{Pid: cmd.Process.Pid}); err != nil {
    91  		if kerr := cmd.Process.Kill(); kerr != nil {
    92  			log.Error(kerr, "kill stressors failed", "request", req)
    93  		}
    94  		return nil, err
    95  	}
    96  
    97  	for {
    98  		// TODO: find a better way to resume pause process
    99  		if err := cmd.Process.Signal(syscall.SIGCONT); err != nil {
   100  			return nil, err
   101  		}
   102  
   103  		log.Info("send signal to resume process")
   104  		time.Sleep(time.Millisecond)
   105  
   106  		comm, err := ReadCommName(cmd.Process.Pid)
   107  		if err != nil {
   108  			return nil, err
   109  		}
   110  		if comm != "pause\n" {
   111  			log.Info("pause has been resumed", "comm", comm)
   112  			break
   113  		}
   114  		log.Info("the process hasn't resumed, step into the following loop", "comm", comm)
   115  	}
   116  
   117  	return &pb.ExecStressResponse{
   118  		Instance:  strconv.Itoa(cmd.Process.Pid),
   119  		StartTime: ct,
   120  	}, nil
   121  }
   122  
   123  var errFinished = "os: process already finished"
   124  
   125  func (s *DaemonServer) CancelStressors(ctx context.Context,
   126  	req *pb.CancelStressRequest) (*empty.Empty, error) {
   127  	pid, err := strconv.Atoi(req.Instance)
   128  	if err != nil {
   129  		return nil, err
   130  	}
   131  	log.Info("Canceling stressors", "request", req)
   132  
   133  	err = s.backgroundProcessManager.KillBackgroundProcess(ctx, pid, req.StartTime)
   134  	if err != nil {
   135  		return nil, err
   136  	}
   137  	log.Info("killing stressor successfully")
   138  	return &empty.Empty{}, nil
   139  }
   140  
   141  func findValidCgroup(path cgroups.Path, target string) (string, error) {
   142  	for _, subsys := range cgroupSubsys {
   143  		p, err := path(cgroups.Name(subsys))
   144  		if err != nil {
   145  			log.Error(err, "Failed to retrieve the cgroup path", "subsystem", subsys, "target", target)
   146  			continue
   147  		}
   148  		if strings.Contains(p, target) {
   149  			return p, nil
   150  		}
   151  	}
   152  	return "", fmt.Errorf("never found valid cgroup for %s", target)
   153  }
   154  
   155  // pidPath will return the correct cgroup paths for an existing process running inside a cgroup
   156  // This is commonly used for the Load function to restore an existing container.
   157  //
   158  // Note: it is migrated from cgroups.pidPath since it will find mountinfo incorrectly inside
   159  // the daemonset. Hope we can fix it in official cgroups repo to solve it.
   160  func pidPath(pid int) cgroups.Path {
   161  	p := fmt.Sprintf("/proc/%d/cgroup", pid)
   162  	paths, err := parseCgroupFile(p)
   163  	if err != nil {
   164  		return errorPath(errors.Wrapf(err, "parse cgroup file %s", p))
   165  	}
   166  	return existingPath(paths, pid, "")
   167  }
   168  
   169  func errorPath(err error) cgroups.Path {
   170  	return func(_ cgroups.Name) (string, error) {
   171  		return "", err
   172  	}
   173  }
   174  
   175  func existingPath(paths map[string]string, pid int, suffix string) cgroups.Path {
   176  	// localize the paths based on the root mount dest for nested cgroups
   177  	for n, p := range paths {
   178  		dest, err := getCgroupDestination(pid, string(n))
   179  		if err != nil {
   180  			return errorPath(err)
   181  		}
   182  		rel, err := filepath.Rel(dest, p)
   183  		if err != nil {
   184  			return errorPath(err)
   185  		}
   186  		if rel == "." {
   187  			rel = dest
   188  		}
   189  		paths[n] = filepath.Join("/", rel)
   190  	}
   191  	return func(name cgroups.Name) (string, error) {
   192  		root, ok := paths[string(name)]
   193  		if !ok {
   194  			if root, ok = paths[fmt.Sprintf("name=%s", name)]; !ok {
   195  				return "", cgroups.ErrControllerNotActive
   196  			}
   197  		}
   198  		if suffix != "" {
   199  			return filepath.Join(root, suffix), nil
   200  		}
   201  		return root, nil
   202  	}
   203  }
   204  
   205  func parseCgroupFile(path string) (map[string]string, error) {
   206  	f, err := os.Open(path)
   207  	if err != nil {
   208  		return nil, err
   209  	}
   210  	defer f.Close()
   211  	return parseCgroupFromReader(f)
   212  }
   213  
   214  func parseCgroupFromReader(r io.Reader) (map[string]string, error) {
   215  	var (
   216  		cgroups = make(map[string]string)
   217  		s       = bufio.NewScanner(r)
   218  	)
   219  	for s.Scan() {
   220  		var (
   221  			text  = s.Text()
   222  			parts = strings.SplitN(text, ":", 3)
   223  		)
   224  		if len(parts) < 3 {
   225  			return nil, fmt.Errorf("invalid cgroup entry: %q", text)
   226  		}
   227  		for _, subs := range strings.Split(parts[1], ",") {
   228  			if subs != "" {
   229  				cgroups[subs] = parts[2]
   230  			}
   231  		}
   232  	}
   233  
   234  	if err := s.Err(); err != nil {
   235  		return nil, err
   236  	}
   237  
   238  	return cgroups, nil
   239  }
   240  
   241  func getCgroupDestination(pid int, subsystem string) (string, error) {
   242  	// use the process's mount info
   243  	p := fmt.Sprintf("/proc/%d/mountinfo", pid)
   244  	f, err := os.Open(p)
   245  	if err != nil {
   246  		return "", err
   247  	}
   248  	defer f.Close()
   249  	s := bufio.NewScanner(f)
   250  	for s.Scan() {
   251  		fields := strings.Fields(s.Text())
   252  		for _, opt := range strings.Split(fields[len(fields)-1], ",") {
   253  			if opt == subsystem {
   254  				return fields[3], nil
   255  			}
   256  		}
   257  	}
   258  	if err := s.Err(); err != nil {
   259  		return "", err
   260  	}
   261  	return "", fmt.Errorf("never found desct for %s", subsystem)
   262  }
   263