1
2
3
4
5
6
7
8
9
10
11
12
13
14 package chaosdaemon
15
16 import (
17 "context"
18 "encoding/json"
19 "fmt"
20 "io"
21 "os"
22 "strings"
23 "time"
24
25 "github.com/shirou/gopsutil/process"
26
27 jrpc "github.com/ethereum/go-ethereum/rpc"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 "github.com/chaos-mesh/chaos-mesh/pkg/bpm"
31 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
32 )
33
34 const (
35 todaBin = "/usr/local/bin/toda"
36 )
37
38 func (s *DaemonServer) ApplyIoChaos(ctx context.Context, in *pb.ApplyIoChaosRequest) (*pb.ApplyIoChaosResponse, error) {
39 log.Info("applying io chaos", "Request", in)
40
41 if in.Instance != 0 {
42 err := s.killIoChaos(ctx, in.Instance, in.StartTime)
43 if err != nil {
44 return nil, err
45 }
46 }
47
48 actions := []v1alpha1.IoChaosAction{}
49 err := json.Unmarshal([]byte(in.Actions), &actions)
50 if err != nil {
51 log.Error(err, "error while unmarshal json bytes")
52 return nil, err
53 }
54
55 log.Info("the length of actions", "length", len(actions))
56 if len(actions) == 0 {
57 return &pb.ApplyIoChaosResponse{
58 Instance: 0,
59 StartTime: 0,
60 }, nil
61 }
62
63 pid, err := s.crClient.GetPidFromContainerID(ctx, in.ContainerId)
64 if err != nil {
65 log.Error(err, "error while getting PID")
66 return nil, err
67 }
68
69
70 args := fmt.Sprintf("--path %s --verbose info", in.Volume)
71 log.Info("executing", "cmd", todaBin+" "+args)
72
73 processBuilder := bpm.DefaultProcessBuilder(todaBin, strings.Split(args, " ")...).
74 EnableLocalMnt().
75 SetIdentifier(in.ContainerId)
76
77 if in.EnterNS {
78 processBuilder = processBuilder.SetNS(pid, bpm.MountNS).SetNS(pid, bpm.PidNS)
79 }
80
81 ctx, cancel := context.WithCancel(context.Background())
82 defer cancel()
83 caller, receiver := bpm.NewBlockingBuffer(), bpm.NewBlockingBuffer()
84 defer caller.Close()
85 defer receiver.Close()
86 client, err := jrpc.DialIO(ctx, receiver, caller)
87 if err != nil {
88 return nil, err
89 }
90
91 cmd := processBuilder.Build()
92 cmd.Stdin = caller
93 cmd.Stdout = io.MultiWriter(receiver, os.Stdout)
94 cmd.Stderr = os.Stderr
95 err = s.backgroundProcessManager.StartProcess(cmd)
96 if err != nil {
97 log.Error(err, "bpm failed")
98 return nil, err
99 }
100 var ret string
101
102 procState, err := process.NewProcess(int32(cmd.Process.Pid))
103 if err != nil {
104 log.Error(err, "new process failed")
105 return nil, err
106 }
107 ct, err := procState.CreateTime()
108 if err != nil {
109 log.Error(err, "get create time failed")
110 if kerr := cmd.Process.Kill(); kerr != nil {
111 log.Error(kerr, "kill toda failed", "request", in)
112 }
113 return nil, err
114 }
115
116 log.Info("Waiting for toda to start")
117 var rpcError error
118 maxWaitTime := time.Millisecond * 2000
119 timeOut, cancel := context.WithTimeout(ctx, maxWaitTime)
120 defer cancel()
121 _ = client.CallContext(timeOut, &ret, "update", actions)
122 rpcError = client.CallContext(timeOut, &ret, "get_status", "ping")
123 if rpcError != nil || ret != "ok" {
124 log.Info("Starting toda takes too long or encounter an error")
125 caller.Close()
126 receiver.Close()
127 if kerr := s.killIoChaos(ctx, int64(cmd.Process.Pid), ct); kerr != nil {
128 log.Error(kerr, "kill toda failed", "request", in)
129 }
130 return nil, fmt.Errorf("Toda startup takes too long or an error occurs: %s", ret)
131 }
132
133 return &pb.ApplyIoChaosResponse{
134 Instance: int64(cmd.Process.Pid),
135 StartTime: ct,
136 }, nil
137 }
138
139 func (s *DaemonServer) killIoChaos(ctx context.Context, pid int64, startTime int64) error {
140 log.Info("killing toda", "pid", pid)
141
142 err := s.backgroundProcessManager.KillBackgroundProcess(ctx, int(pid), startTime)
143 if err != nil {
144 return err
145 }
146 log.Info("kill toda successfully")
147 return nil
148 }
149