1
2
3
4
5
6
7
8
9
10
11
12
13
14 package common
15
16 import (
17 "bytes"
18 "context"
19 "encoding/json"
20 "fmt"
21 "io"
22 "regexp"
23 "strings"
24 "time"
25
26 "google.golang.org/grpc"
27
28 grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
29 "github.com/chaos-mesh/chaos-mesh/pkg/mock"
30
31 "github.com/fatih/color"
32 "github.com/pkg/errors"
33 v1 "k8s.io/api/core/v1"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/runtime"
36 "k8s.io/client-go/kubernetes"
37 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
38 "sigs.k8s.io/controller-runtime/pkg/client"
39 "sigs.k8s.io/controller-runtime/pkg/client/config"
40
41 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
42 ctrlconfig "github.com/chaos-mesh/chaos-mesh/controllers/config"
43 daemonClient "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/client"
44 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
45 "github.com/chaos-mesh/chaos-mesh/pkg/portforward"
46 "github.com/chaos-mesh/chaos-mesh/pkg/selector"
47 )
48
49 type Color string
50
51 const (
52 Blue Color = "Blue"
53 Red Color = "Red"
54 Green Color = "Green"
55 Cyan Color = "Cyan"
56 NoColor Color = ""
57 )
58
59 var (
60 colorFunc = map[Color]func(string, ...interface{}){
61 Blue: color.Blue,
62 Red: color.Red,
63 Green: color.Green,
64 Cyan: color.Cyan,
65 }
66 scheme = runtime.NewScheme()
67 )
68
69
70 type ClientSet struct {
71 CtrlCli client.Client
72 KubeCli *kubernetes.Clientset
73 }
74
75 type ChaosResult struct {
76 Name string
77 Pods []PodResult
78 }
79
80 type PodResult struct {
81 Name string
82 Items []ItemResult
83 }
84
85 const (
86 ItemSuccess = iota + 1
87 ItemFailure
88 )
89
90 const ChaosDaemonClientCert = "chaos-mesh-daemon-client-certs"
91 const ChaosDaemonNamespace = "chaos-testing"
92
93 var TLSFiles TLSFileConfig
94 var Insecure bool
95
96 type ItemResult struct {
97 Name string
98 Value string
99 Status int `json:",omitempty"`
100 SucInfo string `json:",omitempty"`
101 ErrInfo string `json:",omitempty"`
102 }
103
104 func init() {
105 _ = v1alpha1.AddToScheme(scheme)
106 _ = clientgoscheme.AddToScheme(scheme)
107 }
108
109 func upperCaseChaos(str string) string {
110 parts := regexp.MustCompile("(.*)(chaos)").FindStringSubmatch(str)
111 return strings.Title(parts[1]) + strings.Title(parts[2])
112 }
113
114
115 func PrettyPrint(s string, indentLevel int, color Color) {
116 var tabStr string
117 for i := 0; i < indentLevel; i++ {
118 tabStr += "\t"
119 }
120 str := fmt.Sprintf("%s%s\n\n", tabStr, regexp.MustCompile("\n").ReplaceAllString(s, "\n"+tabStr))
121 if color != NoColor {
122 if cfunc, ok := colorFunc[color]; !ok {
123 fmt.Print("COLOR NOT SUPPORTED")
124 } else {
125 cfunc(str)
126 }
127 } else {
128 fmt.Print(str)
129 }
130 }
131
132
133 func PrintResult(result []ChaosResult) {
134 for _, chaos := range result {
135 PrettyPrint("[Chaos]: "+chaos.Name, 0, Blue)
136 for _, pod := range chaos.Pods {
137 PrettyPrint("[Pod]: "+pod.Name, 0, Blue)
138 for i, item := range pod.Items {
139 PrettyPrint(fmt.Sprintf("%d. [%s]", i+1, item.Name), 1, Cyan)
140 PrettyPrint(item.Value, 1, NoColor)
141 if item.Status == ItemSuccess {
142 if item.SucInfo != "" {
143 PrettyPrint(item.SucInfo, 1, Green)
144 } else {
145 PrettyPrint("Execute as expected", 1, Green)
146 }
147 } else if item.Status == ItemFailure {
148 PrettyPrint(fmt.Sprintf("Failed: %s ", item.ErrInfo), 1, Red)
149 }
150 }
151 }
152 }
153 }
154
155
156 func MarshalChaos(s interface{}) (string, error) {
157 b, err := json.MarshalIndent(s, "", " ")
158 if err != nil {
159 return "", errors.Wrapf(err, "failed to marshal indent")
160 }
161 return string(b), nil
162 }
163
164
165 func InitClientSet() (*ClientSet, error) {
166 restconfig, err := config.GetConfig()
167 if err != nil {
168 return nil, err
169 }
170 ctrlClient, err := client.New(restconfig, client.Options{Scheme: scheme})
171 if err != nil {
172 return nil, fmt.Errorf("failed to create client")
173 }
174 kubeClient, err := kubernetes.NewForConfig(restconfig)
175 if err != nil {
176 return nil, errors.Wrap(err, "error in getting acess to k8s")
177 }
178 return &ClientSet{ctrlClient, kubeClient}, nil
179 }
180
181
182 func GetPods(ctx context.Context, chaosName string, status v1alpha1.ChaosStatus, selectorSpec v1alpha1.SelectorSpec, c client.Client) ([]v1.Pod, []v1.Pod, error) {
183
184 failedMessage := status.FailedMessage
185 if failedMessage != "" {
186 PrettyPrint(fmt.Sprintf("chaos %s failed with: %s", chaosName, failedMessage), 0, Red)
187 }
188
189 phase := status.Experiment.Phase
190 nextStart := status.Scheduler.NextStart
191
192 if phase == v1alpha1.ExperimentPhaseWaiting {
193 waitTime := nextStart.Sub(time.Now())
194 L().WithName("GetPods").V(1).Info(fmt.Sprintf("Waiting for chaos %s to start, in %s\n", chaosName, waitTime))
195 time.Sleep(waitTime)
196 }
197
198 pods, err := selector.SelectPods(ctx, c, c, selectorSpec, ctrlconfig.ControllerCfg.ClusterScoped, ctrlconfig.ControllerCfg.TargetNamespace, ctrlconfig.ControllerCfg.EnableFilterNamespace)
199 if err != nil {
200 return nil, nil, errors.Wrap(err, "failed to SelectPods")
201 }
202 L().WithName("GetPods").V(4).Info("select pods for chaos", "chaos", chaosName, "pods", pods)
203 if len(pods) == 0 {
204 return nil, nil, fmt.Errorf("no pods found for chaos %s, selector: %s", chaosName, selectorSpec)
205 }
206
207
208 var chaosDaemons []v1.Pod
209
210 for _, pod := range pods {
211 nodeName := pod.Spec.NodeName
212 daemonSelector := v1alpha1.SelectorSpec{
213 Nodes: []string{nodeName},
214 LabelSelectors: map[string]string{"app.kubernetes.io/component": "chaos-daemon"},
215 }
216 daemons, err := selector.SelectPods(ctx, c, nil, daemonSelector, ctrlconfig.ControllerCfg.ClusterScoped, ctrlconfig.ControllerCfg.TargetNamespace, ctrlconfig.ControllerCfg.EnableFilterNamespace)
217 if err != nil {
218 return nil, nil, errors.Wrap(err, fmt.Sprintf("failed to select daemon pod for pod %s", pod.GetName()))
219 }
220 if len(daemons) == 0 {
221 return nil, nil, fmt.Errorf("no daemons found for pod %s with selector: %s", pod.GetName(), daemonSelector)
222 }
223 chaosDaemons = append(chaosDaemons, daemons[0])
224 }
225
226 return pods, chaosDaemons, nil
227 }
228
229
230 func GetChaosList(ctx context.Context, chaosType string, chaosName string, ns string, c client.Client) ([]runtime.Object, []string, error) {
231 chaosType = upperCaseChaos(strings.ToLower(chaosType))
232 allKinds := v1alpha1.AllKinds()
233 chaosListInterface := allKinds[chaosType].ChaosList
234
235 if err := c.List(ctx, chaosListInterface, client.InNamespace(ns)); err != nil {
236 return nil, nil, errors.Wrapf(err, "failed to get chaosList with namespace %s", ns)
237 }
238 chaosList := chaosListInterface.ListChaos()
239 if len(chaosList) == 0 {
240 return nil, nil, fmt.Errorf("no chaos is found, please check your input")
241 }
242
243 var retList []runtime.Object
244 var retNameList []string
245 for _, ch := range chaosList {
246 if chaosName == "" || chaosName == ch.Name {
247 chaos, err := getChaos(ctx, chaosType, ch.Name, ns, c)
248 if err != nil {
249 return nil, nil, err
250 }
251 retList = append(retList, chaos)
252 retNameList = append(retNameList, ch.Name)
253 }
254 }
255 if len(retList) == 0 {
256 return nil, nil, fmt.Errorf("no chaos is found, please check your input")
257 }
258
259 return retList, retNameList, nil
260 }
261
262 func getChaos(ctx context.Context, chaosType string, chaosName string, ns string, c client.Client) (runtime.Object, error) {
263 allKinds := v1alpha1.AllKinds()
264 chaos := allKinds[chaosType].Chaos
265 objectKey := client.ObjectKey{
266 Namespace: ns,
267 Name: chaosName,
268 }
269 if err := c.Get(ctx, objectKey, chaos); err != nil {
270 return nil, errors.Wrapf(err, "failed to get chaos %s", chaosName)
271 }
272 return chaos, nil
273 }
274
275
276 func GetPidFromPS(ctx context.Context, pod v1.Pod, daemon v1.Pod, c *kubernetes.Clientset) ([]string, []string, error) {
277 cmd := "ps"
278 out, err := ExecBypass(ctx, pod, daemon, cmd, c)
279 if err != nil {
280 return nil, nil, errors.Wrapf(err, "run command %s failed", cmd)
281 }
282 outLines := strings.Split(string(out), "\n")
283 if len(outLines) < 2 {
284 return nil, nil, fmt.Errorf("ps returns empty")
285 }
286 titles := strings.Fields(outLines[0])
287 var pidColumn, cmdColumn int
288 for i, t := range titles {
289 if t == "PID" {
290 pidColumn = i
291 }
292 if t == "COMMAND" || t == "CMD" {
293 cmdColumn = i
294 }
295 }
296 if pidColumn == 0 && cmdColumn == 0 {
297 return nil, nil, fmt.Errorf("parsing ps error: could not get PID and COMMAND column")
298 }
299 var pids, commands []string
300 for _, line := range outLines[1:] {
301 item := strings.Fields(line)
302
303 if len(item) == 0 {
304 break
305 }
306 pids = append(pids, item[pidColumn])
307 commands = append(commands, item[cmdColumn])
308 }
309 return pids, commands, nil
310 }
311
312
313 func GetPidFromPod(ctx context.Context, pod v1.Pod, daemon v1.Pod) (uint32, error) {
314 pfCancel, localPort, err := forwardPorts(ctx, daemon, uint16(ctrlconfig.ControllerCfg.ChaosDaemonPort))
315 if err != nil {
316 return 0, errors.Wrapf(err, "forward ports for daemon pod %s/%s failed", daemon.Namespace, daemon.Name)
317 }
318 L().WithName("GetPidFromPod").V(4).Info(fmt.Sprintf("port forwarding 127.0.0.1:%d -> pod/%s/%s:%d", localPort, daemon.Namespace, daemon.Name, ctrlconfig.ControllerCfg.ChaosDaemonPort))
319
320 defer func() {
321 pfCancel()
322 }()
323
324 daemonClient, err := ConnectToLocalChaosDaemon(int(localPort))
325 if err != nil {
326 return 0, errors.Wrapf(err, "failed to create new chaos daemon client with local port %d", localPort)
327 }
328 defer daemonClient.Close()
329
330 if len(pod.Status.ContainerStatuses) == 0 {
331 return 0, fmt.Errorf("%s %s can't get the state of container", pod.Namespace, pod.Name)
332 }
333
334 res, err := daemonClient.ContainerGetPid(ctx, &pb.ContainerRequest{
335 Action: &pb.ContainerAction{
336 Action: pb.ContainerAction_GETPID,
337 },
338 ContainerId: pod.Status.ContainerStatuses[0].ContainerID,
339 })
340 if err != nil {
341 return 0, errors.Wrapf(err, "failed get pid from pod %s/%s", pod.GetNamespace(), pod.GetName())
342 }
343 return res.Pid, nil
344 }
345
346 func forwardPorts(ctx context.Context, pod v1.Pod, port uint16) (context.CancelFunc, uint16, error) {
347 commonRestClientGetter := NewCommonRestClientGetter()
348 fw, err := portforward.NewPortForwarder(ctx, commonRestClientGetter, false)
349 if err != nil {
350 return nil, 0, errors.Wrap(err, "failed to create port forwarder")
351 }
352 _, localPort, pfCancel, err := portforward.ForwardOnePort(fw, pod.Namespace, pod.Name, port)
353 return pfCancel, localPort, err
354 }
355
356
357 func Log(pod v1.Pod, tail int64, c *kubernetes.Clientset) (string, error) {
358 podLogOpts := v1.PodLogOptions{}
359
360 if tail >= 0 {
361 podLogOpts.TailLines = func(i int64) *int64 { return &i }(tail)
362 }
363
364 req := c.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts)
365 podLogs, err := req.Stream()
366 if err != nil {
367 return "", errors.Wrapf(err, "failed to open log stream for pod %s/%s", pod.GetNamespace(), pod.GetName())
368 }
369 defer podLogs.Close()
370
371 buf := new(bytes.Buffer)
372 _, err = io.Copy(buf, podLogs)
373 if err != nil {
374 return "", errors.Wrapf(err, "failed to copy information from podLogs to buf")
375 }
376 return buf.String(), nil
377 }
378
379
380 func CheckFailedMessage(ctx context.Context, failedMessage string, daemons []v1.Pod, c *ClientSet) error {
381 if strings.Contains(failedMessage, "rpc error: code = Unavailable desc = connection error") || strings.Contains(failedMessage, "connect: connection refused") {
382 if err := checkConnForCtrlAndDaemon(ctx, daemons, c); err != nil {
383 return fmt.Errorf("error occurs when check failed message: %s", err)
384 }
385 }
386 return nil
387 }
388
389 func checkConnForCtrlAndDaemon(ctx context.Context, daemons []v1.Pod, c *ClientSet) error {
390 ctrlSelector := v1alpha1.SelectorSpec{
391 LabelSelectors: map[string]string{"app.kubernetes.io/component": "controller-manager"},
392 }
393 ctrlMgrs, err := selector.SelectPods(ctx, c.CtrlCli, c.CtrlCli, ctrlSelector, ctrlconfig.ControllerCfg.ClusterScoped, ctrlconfig.ControllerCfg.TargetNamespace, ctrlconfig.ControllerCfg.EnableFilterNamespace)
394 if err != nil {
395 return errors.Wrapf(err, "failed to select pod for controller-manager")
396 }
397 if len(ctrlMgrs) == 0 {
398 return fmt.Errorf("could not found controller manager")
399 }
400 for _, daemon := range daemons {
401 daemonIP := daemon.Status.PodIP
402 cmd := fmt.Sprintf("ping -c 1 %s > /dev/null; echo $?", daemonIP)
403 out, err := Exec(ctx, ctrlMgrs[0], cmd, c.KubeCli)
404 if err != nil {
405 return errors.Wrapf(err, "run command %s failed", cmd)
406 }
407 if string(out) == "0" {
408 PrettyPrint(fmt.Sprintf("Connection between Controller-Manager and Daemon %s (ip address: %s) works well", daemon.Name, daemonIP), 0, Green)
409 } else {
410 PrettyPrint(fmt.Sprintf(`Connection between Controller-Manager and Daemon %s (ip address: %s) is blocked.
411 Please check network policy / firewall, or see FAQ on website`, daemon.Name, daemonIP), 0, Red)
412 }
413
414 }
415 return nil
416 }
417
418
419 func ConnectToLocalChaosDaemon(port int) (daemonClient.ChaosDaemonClientInterface, error) {
420 if cli := mock.On("MockChaosDaemonClient"); cli != nil {
421 return cli.(daemonClient.ChaosDaemonClientInterface), nil
422 }
423 if err := mock.On("NewChaosDaemonClientError"); err != nil {
424 return nil, err.(error)
425 }
426 cc, err := getGrpcClient(port)
427
428 if err != nil {
429 return nil, err
430 }
431 return daemonClient.New(cc), nil
432 }
433
434 func getGrpcClient(port int) (*grpc.ClientConn, error) {
435 if Insecure {
436 return grpcUtils.CreateGrpcConnection("localhost", port, "", "", "")
437 }
438 if TLSFiles.CaCert == "" || TLSFiles.Cert == "" || TLSFiles.Key == "" {
439 PrettyPrint("TLS Files are not complete, fall back to use secrets.", 0, Green)
440 config, err := getTLSConfigFromSecrets()
441 if err != nil {
442 return nil, err
443 }
444 return grpcUtils.CreateGrpcConnectionFromRaw("localhost", port, config.caCert, config.cert, config.key)
445 }
446 PrettyPrint("Using TLS Files.", 0, Green)
447 return grpcUtils.CreateGrpcConnection("localhost", port, TLSFiles.CaCert, TLSFiles.Cert, TLSFiles.Key)
448 }
449
450 func getTLSConfigFromSecrets() (*rawTLSConfig, error) {
451 var cfg rawTLSConfig
452 restconfig, err := config.GetConfig()
453 if err != nil {
454 return nil, err
455 }
456 kubeClient, err := kubernetes.NewForConfig(restconfig)
457 if err != nil {
458 return nil, err
459 }
460 secret, err := kubeClient.CoreV1().Secrets(ChaosDaemonNamespace).Get(ChaosDaemonClientCert, metav1.GetOptions{})
461 if err != nil {
462 return nil, err
463 }
464 cfg.caCert = secret.Data["ca.crt"]
465 cfg.cert = secret.Data["tls.crt"]
466 cfg.key = secret.Data["tls.key"]
467 return &cfg, nil
468 }
469
470 type rawTLSConfig struct {
471 caCert []byte
472 cert []byte
473 key []byte
474 }
475 type TLSFileConfig struct {
476 CaCert string
477 Cert string
478 Key string
479 }
480