1
2
3
4
5
6
7
8
9
10
11
12
13
14 package chaosdaemon
15
16 import (
17 "bufio"
18 "bytes"
19 "context"
20 "encoding/json"
21 "fmt"
22 "io/ioutil"
23 "net/http"
24 "os"
25
26 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
27 "github.com/chaos-mesh/chaos-mesh/pkg/bpm"
28 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
29 )
30
31 const (
32 tproxyBin = "/usr/local/bin/tproxy"
33 pathEnv = "PATH"
34 )
35
36 type stdioTransport struct {
37 stdio *bpm.Stdio
38 }
39
40 type tproxyConfig struct {
41 ProxyPorts []uint32 `json:"proxy_ports"`
42 Rules []v1alpha1.PodHttpChaosBaseRule `json:"rules"`
43 }
44
45 func (t stdioTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
46 t.stdio.Lock()
47 defer t.stdio.Unlock()
48
49 if t.stdio.Stdin == nil {
50 return nil, fmt.Errorf("fail to get stdin of process")
51 }
52 if t.stdio.Stdout == nil {
53 return nil, fmt.Errorf("fail to get stdout of process")
54 }
55
56 err = req.Write(t.stdio.Stdin)
57 if err != nil {
58 return
59 }
60
61 resp, err = http.ReadResponse(bufio.NewReader(t.stdio.Stdout), req)
62 return
63 }
64
65 func (s *DaemonServer) ApplyHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) (*pb.ApplyHttpChaosResponse, error) {
66 log := log.WithValues("Request", in)
67 log.Info("applying http chaos")
68
69 if in.Instance == 0 {
70 if err := s.createHttpChaos(ctx, in); err != nil {
71 return nil, err
72 }
73 }
74
75 stdio := s.backgroundProcessManager.Stdio(int(in.Instance), in.StartTime)
76 if stdio == nil {
77 return nil, fmt.Errorf("fail to get stdio of process")
78 }
79
80 transport := stdioTransport{stdio: stdio}
81
82 rules := []v1alpha1.PodHttpChaosBaseRule{}
83 err := json.Unmarshal([]byte(in.Rules), &rules)
84 if err != nil {
85 log.Error(err, "error while unmarshal json bytes")
86 return nil, err
87 }
88
89 log.Info("the length of actions", "length", len(rules))
90
91 httpChaosSpec := tproxyConfig{
92 ProxyPorts: append([]uint32{}, in.ProxyPorts...),
93 Rules: rules,
94 }
95
96 config, err := json.Marshal(&httpChaosSpec)
97 if err != nil {
98 return nil, err
99 }
100
101 log.Info("ready to apply", "config", string(config))
102
103 req, err := http.NewRequest(http.MethodPut, "/", bytes.NewReader(config))
104 if err != nil {
105 return nil, err
106 }
107
108 resp, err := transport.RoundTrip(req)
109 if err != nil {
110 return nil, err
111 }
112
113 log.Info("http chaos applied")
114
115 body, err := ioutil.ReadAll(resp.Body)
116 if err != nil {
117 return nil, err
118 }
119
120 return &pb.ApplyHttpChaosResponse{
121 Instance: int64(in.Instance),
122 StartTime: in.StartTime,
123 StatusCode: int32(resp.StatusCode),
124 Error: string(body),
125 }, nil
126 }
127
128 func (s *DaemonServer) createHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) error {
129 pid, err := s.crClient.GetPidFromContainerID(ctx, in.ContainerId)
130 if err != nil {
131 log.Error(err, "error while getting PID")
132 return err
133 }
134 processBuilder := bpm.DefaultProcessBuilder(tproxyBin, "-i", "-vv").
135 EnableLocalMnt().
136 SetIdentifier(in.ContainerId).
137 SetEnv(pathEnv, os.Getenv(pathEnv)).
138 SetStdin(bpm.NewBlockingBuffer()).
139 SetStdout(bpm.NewBlockingBuffer())
140
141 if in.EnterNS {
142 processBuilder = processBuilder.SetNS(pid, bpm.PidNS).SetNS(pid, bpm.NetNS)
143 }
144
145 cmd := processBuilder.Build()
146 cmd.Stderr = os.Stderr
147
148 procState, err := s.backgroundProcessManager.StartProcess(cmd)
149 if err != nil {
150 return err
151 }
152 ct, err := procState.CreateTime()
153 if err != nil {
154 log.Error(err, "get create time failed")
155 if kerr := cmd.Process.Kill(); kerr != nil {
156 log.Error(kerr, "kill tproxy failed", "request", in)
157 }
158 return err
159 }
160
161 in.Instance = int64(cmd.Process.Pid)
162 in.StartTime = ct
163 return nil
164 }
165