1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package networkchaos
17
18 import (
19 "bytes"
20 "fmt"
21 "io"
22 "net/http"
23 "strconv"
24 "strings"
25 "sync"
26 "time"
27
28 "github.com/pkg/errors"
29 corev1 "k8s.io/api/core/v1"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/klog/v2"
32
33 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
34 )
35
36 const (
37 networkConditionBlocked = "blocked"
38 networkConditionSlow = "slow"
39 networkConditionGood = "good"
40 )
41
42 func recvUDPPacket(c http.Client, port uint16) (string, error) {
43 klog.Infof("sending request to http://localhost:%d/network/recv", port)
44 resp, err := c.Get(fmt.Sprintf("http://localhost:%d/network/recv", port))
45 if err != nil {
46 return "", err
47 }
48
49 out, err := io.ReadAll(resp.Body)
50 defer resp.Body.Close()
51 if err != nil {
52 return "", err
53 }
54
55 result := string(out)
56 return result, nil
57 }
58
59 func sendUDPPacket(c http.Client, port uint16, targetIP string) error {
60 body := []byte(fmt.Sprintf("{\"targetIP\":\"%s\"}", targetIP))
61 klog.Infof("sending request to http://localhost:%d/network/send with body: %s", port, string(body))
62
63 resp, err := c.Post(fmt.Sprintf("http://localhost:%d/network/send", port), "application/json", bytes.NewReader(body))
64 if err != nil {
65 return err
66 }
67
68 out, err := io.ReadAll(resp.Body)
69 defer resp.Body.Close()
70 if err != nil {
71 return err
72 }
73
74 result := string(out)
75 if result != "send successfully\n" {
76 return errors.Errorf("doesn't send successfully: %s", result)
77 }
78
79 klog.Info("send request successfully")
80 return nil
81 }
82
83 func testNetworkDelay(c http.Client, port uint16, targetIP string) (int64, error) {
84 body := []byte(fmt.Sprintf("{\"targetIP\":\"%s\"}", targetIP))
85 klog.Infof("sending request to localhost:%d with body: %s", port, string(body))
86
87 resp, err := c.Post(fmt.Sprintf("http://localhost:%d/network/ping", port), "application/json", bytes.NewReader(body))
88 if err != nil {
89 return 0, err
90 }
91
92 out, err := io.ReadAll(resp.Body)
93 defer resp.Body.Close()
94 if err != nil {
95 return 0, err
96 }
97
98 result := string(out)
99 parts := strings.Split(result, " ")
100 if len(parts) != 2 {
101 return 0, errors.Errorf("the length of parts is not 2 %v", parts)
102 }
103
104 if parts[0] != "OK" {
105 return 0, errors.New("the first part of response is not OK")
106 }
107
108 return strconv.ParseInt(parts[1], 10, 64)
109 }
110
111 func makeNetworkPartitionChaos(
112 namespace, name string, fromLabelSelectors, toLabelSelectors map[string]string,
113 fromPodMode, toPodMode v1alpha1.SelectorMode,
114 direction v1alpha1.Direction,
115 duration *string,
116 ) *v1alpha1.NetworkChaos {
117 var target *v1alpha1.PodSelector
118 if toLabelSelectors != nil {
119 target = &v1alpha1.PodSelector{
120 Selector: v1alpha1.PodSelectorSpec{
121 GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
122 Namespaces: []string{namespace},
123 LabelSelectors: toLabelSelectors,
124 },
125 },
126 Mode: toPodMode,
127 }
128 }
129
130 return &v1alpha1.NetworkChaos{
131 ObjectMeta: metav1.ObjectMeta{
132 Name: name,
133 Namespace: namespace,
134 },
135 Spec: v1alpha1.NetworkChaosSpec{
136 Action: v1alpha1.PartitionAction,
137 Direction: direction,
138 Target: target,
139 Duration: duration,
140 PodSelector: v1alpha1.PodSelector{
141 Selector: v1alpha1.PodSelectorSpec{
142 GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
143 Namespaces: []string{namespace},
144 LabelSelectors: fromLabelSelectors,
145 },
146 },
147 Mode: fromPodMode,
148 },
149 },
150 }
151 }
152
153 func makeNetworkDelayChaos(
154 namespace, name string, fromLabelSelectors, toLabelSelectors map[string]string,
155 fromPodMode, toPodMode v1alpha1.SelectorMode, direction v1alpha1.Direction, tcparam v1alpha1.TcParameter, duration *string,
156 ) *v1alpha1.NetworkChaos {
157 var target *v1alpha1.PodSelector
158 if toLabelSelectors != nil {
159 target = &v1alpha1.PodSelector{
160 Selector: v1alpha1.PodSelectorSpec{
161 GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
162 Namespaces: []string{namespace},
163 LabelSelectors: toLabelSelectors,
164 },
165 },
166 Mode: toPodMode,
167 }
168 }
169
170 return &v1alpha1.NetworkChaos{
171 ObjectMeta: metav1.ObjectMeta{
172 Name: name,
173 Namespace: namespace,
174 },
175 Spec: v1alpha1.NetworkChaosSpec{
176 Action: v1alpha1.DelayAction,
177 TcParameter: tcparam,
178 Duration: duration,
179 Target: target,
180 Direction: direction,
181 PodSelector: v1alpha1.PodSelector{
182 Selector: v1alpha1.PodSelectorSpec{
183 GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
184 Namespaces: []string{namespace},
185 LabelSelectors: fromLabelSelectors,
186 },
187 },
188 Mode: fromPodMode,
189 },
190 },
191 }
192 }
193
194 func probeNetworkCondition(c http.Client, peers []*corev1.Pod, ports []uint16, bidirection bool) map[string][][]int {
195 result := make(map[string][][]int)
196
197 testDelay := func(from int, to int) (int64, error) {
198 return testNetworkDelay(c, ports[from], peers[to].Status.PodIP)
199 }
200
201 for source := 0; source < len(peers); source++ {
202 initialTarget := source + 1
203 if bidirection {
204 initialTarget = 0
205 }
206 for target := initialTarget; target < len(peers); target++ {
207 if target == source {
208 continue
209 }
210
211 connectable := true
212
213 var (
214 wg sync.WaitGroup
215 link1, link2 bool
216 )
217 wg.Add(2)
218 go func() {
219 defer wg.Done()
220
221 klog.Infof("testing connectivity from %s to %s", peers[source].Name, peers[target].Name)
222 link1 = couldConnect(c, ports[source], peers[target].Status.PodIP, ports[target])
223
224 }()
225
226 go func() {
227 defer wg.Done()
228
229 klog.Infof("testing connectivity from %s to %s", peers[target].Name, peers[source].Name)
230 link2 = couldConnect(c, ports[target], peers[source].Status.PodIP, ports[source])
231 }()
232 wg.Wait()
233
234 if !link1 {
235 klog.Infof("%s could not connect to %s", peers[source].Name, peers[target].Name)
236 result[networkConditionBlocked] = append(result[networkConditionBlocked], []int{source, target})
237 connectable = false
238 }
239 if !link2 {
240 klog.Infof("%s could not connect to %s", peers[target].Name, peers[source].Name)
241 result[networkConditionBlocked] = append(result[networkConditionBlocked], []int{target, source})
242 connectable = false
243 }
244
245 if !connectable {
246 continue
247 }
248
249
250 klog.Infof("testing delay from %s to %s", peers[source].Name, peers[target].Name)
251 delay, err := testDelay(source, target)
252 if err != nil {
253 klog.Errorf("error from %d to %d: %v", source, target, err)
254 continue
255 }
256
257 klog.Infof("delay from %d to %d: %d", source, target, delay)
258 if delay > 100*1e6 {
259 klog.Infof("detect slow network from %s to %s", peers[source].Name, peers[target].Name)
260 result[networkConditionSlow] = append(result[networkConditionSlow], []int{source, target})
261 continue
262 }
263
264
265 klog.Infof("good network from %d to %d", source, target)
266 result[networkConditionGood] = append(result[networkConditionGood], []int{source, target})
267 }
268 }
269
270 return result
271 }
272
273 func couldConnect(c http.Client, sourcePort uint16, targetPodIP string, targetPort uint16) bool {
274 err := sendUDPPacket(c, sourcePort, targetPodIP)
275 if err != nil {
276 klog.Infof("Error: %v", err)
277 return false
278 }
279
280 time.Sleep(time.Second)
281
282 data, err := recvUDPPacket(c, targetPort)
283 if err != nil {
284 klog.Infof("Error: %v, Data: %s", err, data)
285 return false
286 }
287
288
289
290
291
292 if data != "ping\n" {
293 klog.Infof("mismatch data return: %s, it may happens under bad network", data)
294 }
295
296 return true
297 }
298