1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package chaosdaemon
17
18 import (
19 "context"
20 "os"
21 "strings"
22
23 "github.com/chaos-mesh/chaos-driver/pkg/client"
24 "github.com/golang/protobuf/ptypes/empty"
25 "github.com/pkg/errors"
26
27 "github.com/chaos-mesh/chaos-mesh/pkg/bpm"
28 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
29 )
30
31 const chaosDaemonHelperCommand = "cdh"
32
33 func (s *DaemonServer) ApplyBlockChaos(ctx context.Context, req *pb.ApplyBlockChaosRequest) (*pb.ApplyBlockChaosResponse, error) {
34 log := s.getLoggerFromContext(ctx)
35
36 volumeName, err := normalizeVolumeName(ctx, req.VolumePath)
37 if err != nil {
38 log.Error(err, "normalize volume name", "volumePath", req.VolumePath)
39 return nil, err
40 }
41
42 err = enableIOEMElevator(volumeName)
43 if err != nil {
44 log.Error(err, "error while enabling ioem elevator", "volumeName", volumeName)
45 return nil, errors.Wrapf(err, "enable ioem elevator for volume %s", volumeName)
46 }
47
48 volumePath := "/dev/" + volumeName
49 if _, err := os.Stat(volumePath); err != nil {
50 log.Error(err, "error while getting stat of volume", "volumePath", volumePath)
51 return nil, errors.Wrapf(err, "volume path %s does not exist", volumePath)
52 }
53
54 pid, err := s.crClient.GetPidFromContainerID(ctx, req.ContainerId)
55 if err != nil {
56 log.Error(err, "error while getting PID")
57 return nil, err
58 }
59
60 c, err := client.New()
61 if err != nil {
62 log.Error(err, "create chaos-driver client")
63 return nil, err
64 }
65 defer c.Close()
66
67 if req.Action == pb.ApplyBlockChaosRequest_Delay {
68 log.Info("Injecting IOEM Delay", "delay", req.Delay.Delay, "jitter", req.Delay.Jitter, "corr", req.Delay.Correlation)
69
70 id, err := c.InjectIOEMDelay(volumePath, 0, uint(pid), req.Delay.Delay, req.Delay.Jitter, float64(req.Delay.Correlation))
71 if err != nil {
72 log.Error(err, "inject ioem delay")
73 return nil, err
74 }
75 return &pb.ApplyBlockChaosResponse{
76 InjectionId: int32(id),
77 }, nil
78 }
79
80 return nil, errors.New("unknown action")
81 }
82
83 func normalizeVolumeName(ctx context.Context, volumePath string) (string, error) {
84 volumeName, err := bpm.DefaultProcessBuilder(chaosDaemonHelperCommand, "normalize-volume-name", volumePath).
85 SetContext(ctx).
86 SetNS(1, bpm.MountNS).
87 EnableLocalMnt().
88 Build(ctx).
89 Output()
90 if err != nil {
91 return "", errors.Wrapf(err, "normalize volume name %s", volumePath)
92 }
93
94 return strings.Trim(string(volumeName), "\n\x00"), nil
95 }
96
97 func enableIOEMElevator(volumeName string) error {
98 schedulerPath := "/sys/block/" + volumeName + "/queue/scheduler"
99 rawSchedulers, err := os.ReadFile(schedulerPath)
100 if err != nil {
101 return errors.Wrapf(err, "reading schedulers %s", schedulerPath)
102 }
103
104 schedulers := strings.Split(strings.Trim(string(rawSchedulers), " \x00\n"), " ")
105 choosenScheduler := ""
106 for _, scheduler := range schedulers {
107
108
109
110
111
112
113
114
115
116 if strings.Contains(scheduler, "ioem") {
117 choosenScheduler = scheduler
118 }
119 }
120
121 if len(choosenScheduler) == 0 {
122 return errors.New("ioem scheduler not found")
123 }
124
125 if choosenScheduler[0] == '[' && choosenScheduler[len(choosenScheduler)-1] == ']' {
126
127 return nil
128 }
129
130
131 err = os.WriteFile(schedulerPath, []byte(choosenScheduler), 0000)
132 if err != nil {
133 return errors.Wrapf(err, "writing %s to %s", choosenScheduler, schedulerPath)
134 }
135
136 return nil
137 }
138
139 func (s *DaemonServer) RecoverBlockChaos(ctx context.Context, req *pb.RecoverBlockChaosRequest) (*empty.Empty, error) {
140 log := s.getLoggerFromContext(ctx)
141
142 c, err := client.New()
143 if err != nil {
144 log.Error(err, "create chaos-driver client")
145 return nil, err
146 }
147 defer c.Close()
148
149 log.Info("Recovering IOEM", "injectionId", req.InjectionId)
150 err = c.Recover(int(req.InjectionId))
151 if err != nil {
152 log.Error(err, "recover injection", "id", req.InjectionId)
153 return nil, err
154 }
155
156 return &empty.Empty{}, nil
157 }
158