1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package chaosdaemon
17
18 import (
19 "context"
20 "encoding/json"
21 "fmt"
22 "io"
23 "os"
24 "strings"
25 "time"
26
27 jrpc "github.com/ethereum/go-ethereum/rpc"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 "github.com/chaos-mesh/chaos-mesh/pkg/bpm"
31 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
32 )
33
34 const (
35 todaBin = "/usr/local/bin/toda"
36 )
37
38 func (s *DaemonServer) ApplyIOChaos(ctx context.Context, in *pb.ApplyIOChaosRequest) (*pb.ApplyIOChaosResponse, error) {
39 log.Info("applying io chaos", "Request", in)
40
41 if in.Instance != 0 {
42 err := s.killIOChaos(ctx, in.Instance, in.StartTime)
43 if err != nil {
44 return nil, err
45 }
46 }
47
48 actions := []v1alpha1.IOChaosAction{}
49 err := json.Unmarshal([]byte(in.Actions), &actions)
50 if err != nil {
51 log.Error(err, "error while unmarshal json bytes")
52 return nil, err
53 }
54
55 log.Info("the length of actions", "length", len(actions))
56 if len(actions) == 0 {
57 return &pb.ApplyIOChaosResponse{
58 Instance: 0,
59 StartTime: 0,
60 }, nil
61 }
62
63 pid, err := s.crClient.GetPidFromContainerID(ctx, in.ContainerId)
64 if err != nil {
65 log.Error(err, "error while getting PID")
66 return nil, err
67 }
68
69
70 args := fmt.Sprintf("--path %s --verbose info", in.Volume)
71 log.Info("executing", "cmd", todaBin+" "+args)
72
73 processBuilder := bpm.DefaultProcessBuilder(todaBin, strings.Split(args, " ")...).
74 EnableLocalMnt().
75 SetIdentifier(in.ContainerId)
76
77 if in.EnterNS {
78 processBuilder = processBuilder.SetNS(pid, bpm.MountNS).SetNS(pid, bpm.PidNS)
79 }
80
81 ctx, cancel := context.WithCancel(context.Background())
82 defer cancel()
83 caller, receiver := bpm.NewBlockingBuffer(), bpm.NewBlockingBuffer()
84 defer caller.Close()
85 defer receiver.Close()
86 client, err := jrpc.DialIO(ctx, receiver, caller)
87 if err != nil {
88 return nil, err
89 }
90
91 cmd := processBuilder.Build()
92 cmd.Stdin = caller
93 cmd.Stdout = io.MultiWriter(receiver, os.Stdout)
94 cmd.Stderr = os.Stderr
95 procState, err := s.backgroundProcessManager.StartProcess(cmd)
96 if err != nil {
97 return nil, err
98 }
99 var ret string
100 ct, err := procState.CreateTime()
101 if err != nil {
102 log.Error(err, "get create time failed")
103 if kerr := cmd.Process.Kill(); kerr != nil {
104 log.Error(kerr, "kill toda failed", "request", in)
105 }
106 return nil, err
107 }
108
109 log.Info("Waiting for toda to start")
110 var rpcError error
111 maxWaitTime := time.Millisecond * 2000
112 timeOut, cancel := context.WithTimeout(ctx, maxWaitTime)
113 defer cancel()
114 _ = client.CallContext(timeOut, &ret, "update", actions)
115 rpcError = client.CallContext(timeOut, &ret, "get_status", "ping")
116 if rpcError != nil || ret != "ok" {
117 log.Info("Starting toda takes too long or encounter an error")
118 caller.Close()
119 receiver.Close()
120 if kerr := s.killIOChaos(ctx, int64(cmd.Process.Pid), ct); kerr != nil {
121 log.Error(kerr, "kill toda failed", "request", in)
122 }
123 return nil, fmt.Errorf("toda startup takes too long or an error occurs: %s", ret)
124 }
125
126 return &pb.ApplyIOChaosResponse{
127 Instance: int64(cmd.Process.Pid),
128 StartTime: ct,
129 }, nil
130 }
131
132 func (s *DaemonServer) killIOChaos(ctx context.Context, pid int64, startTime int64) error {
133 log.Info("killing toda", "pid", pid)
134
135 err := s.backgroundProcessManager.KillBackgroundProcess(ctx, int(pid), startTime)
136 if err != nil {
137 return err
138 }
139 log.Info("kill toda successfully")
140 return nil
141 }
142