1
2
3
4
5
6
7
8
9
10
11
12
13
14 package chaosdaemon
15
16 import (
17 "bufio"
18 "context"
19 "fmt"
20 "io"
21 "os"
22 "path/filepath"
23 "strconv"
24 "strings"
25 "syscall"
26 "time"
27
28 "github.com/containerd/cgroups"
29 "github.com/golang/protobuf/ptypes/empty"
30 "github.com/pkg/errors"
31 "github.com/shirou/gopsutil/process"
32
33 "github.com/chaos-mesh/chaos-mesh/pkg/bpm"
34 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
35 )
36
37 var (
38
39 cgroupSubsys = []string{"cpu", "memory", "systemd", "net_cls",
40 "net_prio", "freezer", "blkio", "perf_event", "devices",
41 "cpuset", "cpuacct", "pids", "hugetlb"}
42 )
43
44 func (s *DaemonServer) ExecStressors(ctx context.Context,
45 req *pb.ExecStressRequest) (*pb.ExecStressResponse, error) {
46 log.Info("Executing stressors", "request", req)
47 pid, err := s.crClient.GetPidFromContainerID(ctx, req.Target)
48 if err != nil {
49 return nil, err
50 }
51 path := pidPath(int(pid))
52 id, err := s.crClient.FormatContainerID(ctx, req.Target)
53 if err != nil {
54 return nil, err
55 }
56 cgroup, err := findValidCgroup(path, id)
57 if err != nil {
58 return nil, err
59 }
60 if req.Scope == pb.ExecStressRequest_POD {
61 cgroup, _ = filepath.Split(cgroup)
62 }
63 control, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(cgroup))
64 if err != nil {
65 return nil, err
66 }
67
68 processBuilder := bpm.DefaultProcessBuilder("stress-ng", strings.Fields(req.Stressors)...).
69 EnablePause()
70 if req.EnterNS {
71 processBuilder = processBuilder.SetNS(pid, bpm.PidNS)
72 }
73 cmd := processBuilder.Build()
74
75 err = s.backgroundProcessManager.StartProcess(cmd)
76 if err != nil {
77 return nil, err
78 }
79 log.Info("Start process successfully")
80
81 procState, err := process.NewProcess(int32(cmd.Process.Pid))
82 if err != nil {
83 return nil, err
84 }
85 ct, err := procState.CreateTime()
86 if err != nil {
87 return nil, err
88 }
89
90 if err = control.Add(cgroups.Process{Pid: cmd.Process.Pid}); err != nil {
91 if kerr := cmd.Process.Kill(); kerr != nil {
92 log.Error(kerr, "kill stressors failed", "request", req)
93 }
94 return nil, err
95 }
96
97 for {
98
99 if err := cmd.Process.Signal(syscall.SIGCONT); err != nil {
100 return nil, err
101 }
102
103 log.Info("send signal to resume process")
104 time.Sleep(time.Millisecond)
105
106 comm, err := ReadCommName(cmd.Process.Pid)
107 if err != nil {
108 return nil, err
109 }
110 if comm != "pause\n" {
111 log.Info("pause has been resumed", "comm", comm)
112 break
113 }
114 log.Info("the process hasn't resumed, step into the following loop", "comm", comm)
115 }
116
117 return &pb.ExecStressResponse{
118 Instance: strconv.Itoa(cmd.Process.Pid),
119 StartTime: ct,
120 }, nil
121 }
122
123 var errFinished = "os: process already finished"
124
125 func (s *DaemonServer) CancelStressors(ctx context.Context,
126 req *pb.CancelStressRequest) (*empty.Empty, error) {
127 pid, err := strconv.Atoi(req.Instance)
128 if err != nil {
129 return nil, err
130 }
131 log.Info("Canceling stressors", "request", req)
132
133 err = s.backgroundProcessManager.KillBackgroundProcess(ctx, pid, req.StartTime)
134 if err != nil {
135 return nil, err
136 }
137 log.Info("killing stressor successfully")
138 return &empty.Empty{}, nil
139 }
140
141 func findValidCgroup(path cgroups.Path, target string) (string, error) {
142 for _, subsys := range cgroupSubsys {
143 p, err := path(cgroups.Name(subsys))
144 if err != nil {
145 log.Error(err, "Failed to retrieve the cgroup path", "subsystem", subsys, "target", target)
146 continue
147 }
148 if strings.Contains(p, target) {
149 return p, nil
150 }
151 }
152 return "", fmt.Errorf("never found valid cgroup for %s", target)
153 }
154
155
156
157
158
159
160 func pidPath(pid int) cgroups.Path {
161 p := fmt.Sprintf("/proc/%d/cgroup", pid)
162 paths, err := parseCgroupFile(p)
163 if err != nil {
164 return errorPath(errors.Wrapf(err, "parse cgroup file %s", p))
165 }
166 return existingPath(paths, pid, "")
167 }
168
169 func errorPath(err error) cgroups.Path {
170 return func(_ cgroups.Name) (string, error) {
171 return "", err
172 }
173 }
174
175 func existingPath(paths map[string]string, pid int, suffix string) cgroups.Path {
176
177 for n, p := range paths {
178 dest, err := getCgroupDestination(pid, string(n))
179 if err != nil {
180 return errorPath(err)
181 }
182 rel, err := filepath.Rel(dest, p)
183 if err != nil {
184 return errorPath(err)
185 }
186 if rel == "." {
187 rel = dest
188 }
189 paths[n] = filepath.Join("/", rel)
190 }
191 return func(name cgroups.Name) (string, error) {
192 root, ok := paths[string(name)]
193 if !ok {
194 if root, ok = paths[fmt.Sprintf("name=%s", name)]; !ok {
195 return "", cgroups.ErrControllerNotActive
196 }
197 }
198 if suffix != "" {
199 return filepath.Join(root, suffix), nil
200 }
201 return root, nil
202 }
203 }
204
205 func parseCgroupFile(path string) (map[string]string, error) {
206 f, err := os.Open(path)
207 if err != nil {
208 return nil, err
209 }
210 defer f.Close()
211 return parseCgroupFromReader(f)
212 }
213
214 func parseCgroupFromReader(r io.Reader) (map[string]string, error) {
215 var (
216 cgroups = make(map[string]string)
217 s = bufio.NewScanner(r)
218 )
219 for s.Scan() {
220 var (
221 text = s.Text()
222 parts = strings.SplitN(text, ":", 3)
223 )
224 if len(parts) < 3 {
225 return nil, fmt.Errorf("invalid cgroup entry: %q", text)
226 }
227 for _, subs := range strings.Split(parts[1], ",") {
228 if subs != "" {
229 cgroups[subs] = parts[2]
230 }
231 }
232 }
233
234 if err := s.Err(); err != nil {
235 return nil, err
236 }
237
238 return cgroups, nil
239 }
240
241 func getCgroupDestination(pid int, subsystem string) (string, error) {
242
243 p := fmt.Sprintf("/proc/%d/mountinfo", pid)
244 f, err := os.Open(p)
245 if err != nil {
246 return "", err
247 }
248 defer f.Close()
249 s := bufio.NewScanner(f)
250 for s.Scan() {
251 fields := strings.Fields(s.Text())
252 for _, opt := range strings.Split(fields[len(fields)-1], ",") {
253 if opt == subsystem {
254 return fields[3], nil
255 }
256 }
257 }
258 if err := s.Err(); err != nil {
259 return "", err
260 }
261 return "", fmt.Errorf("never found desct for %s", subsystem)
262 }
263