1
2
3
4
5
6
7
8
9
10
11
12
13
14 package chaosdaemon
15
16 import (
17 "bufio"
18 "bytes"
19 "context"
20 "encoding/json"
21 "fmt"
22 "io/ioutil"
23 "net/http"
24 "os"
25
26 "github.com/go-logr/logr"
27 "github.com/pkg/errors"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 "github.com/chaos-mesh/chaos-mesh/pkg/bpm"
31 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
32 )
33
34 const (
35 tproxyBin = "/usr/local/bin/tproxy"
36 pathEnv = "PATH"
37 )
38
39 type stdioTransport struct {
40 stdio *bpm.Stdio
41 }
42
43 type tproxyConfig struct {
44 ProxyPorts []uint32 `json:"proxy_ports"`
45 Rules []v1alpha1.PodHttpChaosBaseRule `json:"rules"`
46 }
47
48 func (t stdioTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
49 t.stdio.Lock()
50 defer t.stdio.Unlock()
51
52 if t.stdio.Stdin == nil {
53 return nil, errors.New("fail to get stdin of process")
54 }
55 if t.stdio.Stdout == nil {
56 return nil, errors.New("fail to get stdout of process")
57 }
58
59 err = req.Write(t.stdio.Stdin)
60 if err != nil {
61 return
62 }
63
64 resp, err = http.ReadResponse(bufio.NewReader(t.stdio.Stdout), req)
65 return
66 }
67
68 func (s *DaemonServer) ApplyHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) (*pb.ApplyHttpChaosResponse, error) {
69 log := log.WithValues("Request", in)
70 log.Info("applying http chaos")
71
72 if s.backgroundProcessManager.Stdio(int(in.Instance), in.StartTime) == nil {
73
74 if err := s.backgroundProcessManager.KillBackgroundProcess(ctx, int(in.Instance), in.StartTime); err != nil {
75 return nil, errors.Wrapf(err, "kill background process(%d)", in.Instance)
76 }
77 if err := s.createHttpChaos(ctx, in); err != nil {
78 return nil, err
79 }
80 }
81
82 resp, err := s.applyHttpChaos(ctx, log, in)
83 if err != nil {
84 killError := s.backgroundProcessManager.KillBackgroundProcess(ctx, int(in.Instance), in.StartTime)
85 log.Error(killError, "kill tproxy", "instance", in.Instance)
86 return nil, errors.Wrap(err, "apply config")
87 }
88 return resp, err
89 }
90
91 func (s *DaemonServer) applyHttpChaos(ctx context.Context, logger logr.Logger, in *pb.ApplyHttpChaosRequest) (*pb.ApplyHttpChaosResponse, error) {
92 stdio := s.backgroundProcessManager.Stdio(int(in.Instance), in.StartTime)
93 if stdio == nil {
94 return nil, errors.Errorf("fail to get stdio of process")
95 }
96
97 transport := stdioTransport{stdio: stdio}
98
99 rules := []v1alpha1.PodHttpChaosBaseRule{}
100 err := json.Unmarshal([]byte(in.Rules), &rules)
101 if err != nil {
102 return nil, errors.Wrap(err, "unmarshal rules")
103 }
104
105 log.Info("the length of actions", "length", len(rules))
106
107 httpChaosSpec := tproxyConfig{
108 ProxyPorts: append([]uint32{}, in.ProxyPorts...),
109 Rules: rules,
110 }
111
112 config, err := json.Marshal(&httpChaosSpec)
113 if err != nil {
114 return nil, err
115 }
116
117 log.Info("ready to apply", "config", string(config))
118
119 req, err := http.NewRequest(http.MethodPut, "/", bytes.NewReader(config))
120 if err != nil {
121 return nil, errors.Wrap(err, "create http request")
122 }
123
124 resp, err := transport.RoundTrip(req)
125 if err != nil {
126 return nil, errors.Wrap(err, "send http request")
127 }
128
129 log.Info("http chaos applied")
130
131 body, err := ioutil.ReadAll(resp.Body)
132 if err != nil {
133 return nil, errors.Wrap(err, "read response body")
134 }
135
136 return &pb.ApplyHttpChaosResponse{
137 Instance: int64(in.Instance),
138 StartTime: in.StartTime,
139 StatusCode: int32(resp.StatusCode),
140 Error: string(body),
141 }, nil
142 }
143
144 func (s *DaemonServer) createHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) error {
145 pid, err := s.crClient.GetPidFromContainerID(ctx, in.ContainerId)
146 if err != nil {
147 return errors.Wrapf(err, "get PID of container(%s)", in.ContainerId)
148 }
149 processBuilder := bpm.DefaultProcessBuilder(tproxyBin, "-i", "-vv").
150 EnableLocalMnt().
151 SetIdentifier(fmt.Sprintf("tproxy-%s", in.ContainerId)).
152 SetEnv(pathEnv, os.Getenv(pathEnv)).
153 SetStdin(bpm.NewBlockingBuffer()).
154 SetStdout(bpm.NewBlockingBuffer())
155
156 if in.EnterNS {
157 processBuilder = processBuilder.SetNS(pid, bpm.PidNS).SetNS(pid, bpm.NetNS)
158 }
159
160 cmd := processBuilder.Build()
161 cmd.Stderr = os.Stderr
162
163 procState, err := s.backgroundProcessManager.StartProcess(cmd)
164 if err != nil {
165 return errors.Wrapf(err, "execute command(%s)", cmd)
166 }
167 ct, err := procState.CreateTime()
168 if err != nil {
169 if kerr := cmd.Process.Kill(); kerr != nil {
170 log.Error(kerr, "kill tproxy", "request", in)
171 }
172 return errors.Wrap(err, "get create time")
173 }
174
175 in.Instance = int64(cmd.Process.Pid)
176 in.StartTime = ct
177 return nil
178 }
179