1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package chaosdaemon
17
18 import (
19 "bufio"
20 "bytes"
21 "context"
22 "encoding/json"
23 "fmt"
24 "io"
25 "net/http"
26 "os"
27 "sync"
28
29 "github.com/pkg/errors"
30
31 "github.com/chaos-mesh/chaos-mesh/pkg/bpm"
32 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
33 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/tproxyconfig"
34 )
35
36 const (
37 tproxyBin = "/usr/local/bin/tproxy"
38 pathEnv = "PATH"
39 )
40
41 type stdioTransport struct {
42 uid string
43 locker *sync.Map
44 pipes bpm.Pipes
45 }
46
47 func (t *stdioTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
48 if _, loaded := t.locker.LoadOrStore(t.uid, true); loaded {
49 return &http.Response{
50 StatusCode: http.StatusLocked,
51 Status: http.StatusText(http.StatusLocked),
52 Body: io.NopCloser(bytes.NewBufferString("")),
53 Request: req,
54 }, nil
55 }
56 defer t.locker.Delete(t.uid)
57 if t.pipes.Stdin == nil {
58 return nil, errors.New("fail to get stdin of process")
59 }
60 if t.pipes.Stdout == nil {
61 return nil, errors.New("fail to get stdout of process")
62 }
63
64 err = req.Write(t.pipes.Stdin)
65 if err != nil {
66 return
67 }
68
69 resp, err = http.ReadResponse(bufio.NewReader(t.pipes.Stdout), req)
70 return
71 }
72
73 func (s *DaemonServer) ApplyHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) (*pb.ApplyHttpChaosResponse, error) {
74 log := s.getLoggerFromContext(ctx)
75 log.Info("applying http chaos")
76
77 if in.InstanceUid == "" {
78 if uid, ok := s.backgroundProcessManager.GetUID(bpm.ProcessPair{Pid: int(in.Instance), CreateTime: in.StartTime}); ok {
79 in.InstanceUid = uid
80 }
81 }
82
83 if _, ok := s.backgroundProcessManager.GetPipes(in.InstanceUid); !ok {
84 if in.InstanceUid != "" {
85
86 if err := s.backgroundProcessManager.KillBackgroundProcess(ctx, in.InstanceUid); err != nil {
87
88 log.Error(err, "kill background process", "uid", in.InstanceUid)
89 }
90 }
91
92
93 if err := s.createHttpChaos(ctx, in); err != nil {
94 return nil, errors.Wrap(err, "create http chaos")
95 }
96 }
97
98 resp, err := s.applyHttpChaos(ctx, in)
99 if err != nil {
100 if killError := s.backgroundProcessManager.KillBackgroundProcess(ctx, in.InstanceUid); killError != nil {
101 log.Error(killError, "kill tproxy", "uid", in.InstanceUid)
102 }
103 return nil, errors.Wrap(err, "apply config")
104 }
105 return resp, err
106 }
107
108 func (s *DaemonServer) applyHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) (*pb.ApplyHttpChaosResponse, error) {
109 log := s.getLoggerFromContext(ctx)
110
111 pipes, ok := s.backgroundProcessManager.GetPipes(in.InstanceUid)
112 if !ok {
113 return nil, errors.Errorf("fail to get process(%s)", in.InstanceUid)
114 }
115
116 transport := &stdioTransport{
117 uid: in.InstanceUid,
118 locker: s.tproxyLocker,
119 pipes: pipes,
120 }
121
122 var rules []tproxyconfig.PodHttpChaosBaseRule
123 err := json.Unmarshal([]byte(in.Rules), &rules)
124 if err != nil {
125 return nil, errors.Wrap(err, "unmarshal rules")
126 }
127
128 log.Info("the length of actions", "length", len(rules))
129
130 httpChaosSpec := tproxyconfig.Config{
131 ProxyPorts: in.ProxyPorts,
132 Rules: rules,
133 }
134
135 if len(in.Tls) != 0 {
136 httpChaosSpec.TLS = new(tproxyconfig.TLSConfig)
137 err = json.Unmarshal([]byte(in.Tls), httpChaosSpec.TLS)
138 if err != nil {
139 return nil, errors.Wrap(err, "unmarshal tls config")
140 }
141 }
142
143 config, err := json.Marshal(&httpChaosSpec)
144 if err != nil {
145 return nil, err
146 }
147
148 log.Info("ready to apply", "config", string(config))
149
150 req, err := http.NewRequest(http.MethodPut, "/", bytes.NewReader(config))
151 if err != nil {
152 return nil, errors.Wrap(err, "create http request")
153 }
154
155 resp, err := transport.RoundTrip(req)
156 if err != nil {
157 return nil, errors.Wrap(err, "send http request")
158 }
159
160 log.Info("http chaos applied")
161
162 body, err := io.ReadAll(resp.Body)
163 if err != nil {
164 return nil, errors.Wrap(err, "read response body")
165 }
166
167 return &pb.ApplyHttpChaosResponse{
168 Instance: int64(in.Instance),
169 InstanceUid: in.InstanceUid,
170 StartTime: in.StartTime,
171 StatusCode: int32(resp.StatusCode),
172 Error: string(body),
173 }, nil
174 }
175
176 func (s *DaemonServer) createHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) error {
177 pid, err := s.crClient.GetPidFromContainerID(ctx, in.ContainerId)
178 if err != nil {
179 return errors.Wrapf(err, "get PID of container(%s)", in.ContainerId)
180 }
181 processBuilder := bpm.DefaultProcessBuilder(tproxyBin, "-i", "-vv").
182 EnableLocalMnt().
183 SetIdentifier(fmt.Sprintf("tproxy-%s", in.ContainerId)).
184 SetEnv(pathEnv, os.Getenv(pathEnv))
185
186 if in.EnterNS {
187 processBuilder = processBuilder.SetNS(pid, bpm.PidNS).SetNS(pid, bpm.NetNS)
188 }
189
190 cmd := processBuilder.Build(ctx)
191 cmd.Stderr = os.Stderr
192
193 proc, err := s.backgroundProcessManager.StartProcess(ctx, cmd)
194 if err != nil {
195 return errors.Wrapf(err, "execute command(%s)", cmd)
196 }
197
198 in.Instance = int64(proc.Pair.Pid)
199 in.StartTime = proc.Pair.CreateTime
200 in.InstanceUid = proc.Uid
201 return nil
202 }
203