1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package chaosdaemon
17
18 import (
19 "context"
20 "encoding/json"
21 "fmt"
22 "os"
23 "strings"
24 "time"
25
26 jrpc "github.com/ethereum/go-ethereum/rpc"
27 "github.com/pkg/errors"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 "github.com/chaos-mesh/chaos-mesh/pkg/bpm"
31 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
32 )
33
34 const (
35 todaBin = "/usr/local/bin/toda"
36 )
37
38 func (s *DaemonServer) ApplyIOChaos(ctx context.Context, in *pb.ApplyIOChaosRequest) (*pb.ApplyIOChaosResponse, error) {
39 log := s.getLoggerFromContext(ctx)
40 log.Info("applying io chaos", "Request", in)
41
42 if in.InstanceUid == "" {
43 if uid, ok := s.backgroundProcessManager.GetUID(bpm.ProcessPair{Pid: int(in.Instance), CreateTime: in.StartTime}); ok {
44 in.InstanceUid = uid
45 }
46 }
47
48 if in.InstanceUid != "" {
49 err := s.killIOChaos(ctx, in.InstanceUid)
50 if err != nil {
51 return nil, err
52 }
53 }
54
55 actions := []v1alpha1.IOChaosAction{}
56 err := json.Unmarshal([]byte(in.Actions), &actions)
57 if err != nil {
58 return nil, errors.Wrap(err, "unmarshal json bytes")
59 }
60
61 log.Info("the length of actions", "length", len(actions))
62 if len(actions) == 0 {
63 return &pb.ApplyIOChaosResponse{
64 Instance: 0,
65 StartTime: 0,
66 }, nil
67 }
68
69 pid, err := s.crClient.GetPidFromContainerID(ctx, in.ContainerId)
70 if err != nil {
71 return nil, errors.Wrap(err, "getting PID")
72 }
73
74
75 args := fmt.Sprintf("--path %s --verbose info", in.Volume)
76 log.Info("executing", "cmd", todaBin+" "+args)
77
78 processBuilder := bpm.DefaultProcessBuilder(todaBin, strings.Split(args, " ")...).
79 EnableLocalMnt().
80 SetIdentifier(fmt.Sprintf("toda-%s", in.ContainerId))
81
82 if in.EnterNS {
83 processBuilder = processBuilder.SetNS(pid, bpm.MountNS).SetNS(pid, bpm.PidNS)
84 }
85
86 ctx, cancel := context.WithCancel(context.Background())
87 defer cancel()
88 cmd := processBuilder.Build(ctx)
89 cmd.Stderr = os.Stderr
90 proc, err := s.backgroundProcessManager.StartProcess(ctx, cmd)
91 if err != nil {
92 return nil, errors.Wrapf(err, "start process `%s`", cmd)
93 }
94
95 client, err := jrpc.DialIO(ctx, proc.Pipes.Stdout, proc.Pipes.Stdin)
96 if err != nil {
97 return nil, errors.Wrapf(err, "dialing rpc client")
98 }
99
100 var ret string
101 log.Info("Waiting for toda to start")
102 var rpcError error
103 maxWaitTime := time.Millisecond * 2000
104 timeOut, cancel := context.WithTimeout(ctx, maxWaitTime)
105 defer cancel()
106 _ = client.CallContext(timeOut, &ret, "update", actions)
107 rpcError = client.CallContext(timeOut, &ret, "get_status", "ping")
108 if rpcError != nil || ret != "ok" {
109 log.Info("Starting toda takes too long or encounter an error")
110 if kerr := s.killIOChaos(ctx, proc.Uid); kerr != nil {
111 log.Error(kerr, "kill toda", "request", in)
112 }
113 return nil, errors.Errorf("toda startup takes too long or an error occurs: %s", ret)
114 }
115
116 return &pb.ApplyIOChaosResponse{
117 Instance: int64(proc.Pair.Pid),
118 StartTime: proc.Pair.CreateTime,
119 InstanceUid: proc.Uid,
120 }, nil
121 }
122
123 func (s *DaemonServer) killIOChaos(ctx context.Context, uid string) error {
124 log := s.getLoggerFromContext(ctx)
125
126 err := s.backgroundProcessManager.KillBackgroundProcess(ctx, uid)
127 if err != nil {
128 return errors.Wrapf(err, "kill toda %s", uid)
129 }
130 log.Info("kill toda successfully", "uid", uid)
131 return nil
132 }
133