1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package common
17
18 import (
19 "bytes"
20 "context"
21 "encoding/json"
22 "fmt"
23 "io"
24 "regexp"
25 "strings"
26
27 "github.com/fatih/color"
28 "github.com/pkg/errors"
29 "google.golang.org/grpc"
30 v1 "k8s.io/api/core/v1"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/runtime"
33 "k8s.io/client-go/kubernetes"
34 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
35 "sigs.k8s.io/controller-runtime/pkg/client"
36 "sigs.k8s.io/controller-runtime/pkg/client/config"
37
38 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
39 ctrlconfig "github.com/chaos-mesh/chaos-mesh/controllers/config"
40 daemonClient "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/client"
41 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
42 grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
43 "github.com/chaos-mesh/chaos-mesh/pkg/mock"
44 "github.com/chaos-mesh/chaos-mesh/pkg/portforward"
45 "github.com/chaos-mesh/chaos-mesh/pkg/selector/pod"
46 )
47
48 type Color string
49
50 const (
51 Blue Color = "Blue"
52 Red Color = "Red"
53 Green Color = "Green"
54 Cyan Color = "Cyan"
55 NoColor Color = ""
56 )
57
58 var (
59 colorFunc = map[Color]func(string, ...interface{}){
60 Blue: color.Blue,
61 Red: color.Red,
62 Green: color.Green,
63 Cyan: color.Cyan,
64 }
65 scheme = runtime.NewScheme()
66 )
67
68
69 type ClientSet struct {
70 CtrlCli client.Client
71 KubeCli *kubernetes.Clientset
72 }
73
74 type ChaosResult struct {
75 Name string
76 Pods []PodResult
77 }
78
79 type PodResult struct {
80 Name string
81 Items []ItemResult
82 }
83
84 const (
85 ItemSuccess = iota + 1
86 ItemFailure
87 )
88
89 const ChaosDaemonClientCert = "chaos-mesh-daemon-client-certs"
90 const ChaosDaemonNamespace = "chaos-testing"
91
92 var TLSFiles grpcUtils.TLSFile
93 var Insecure bool
94
95 type ItemResult struct {
96 Name string
97 Value string
98 Status int `json:",omitempty"`
99 SucInfo string `json:",omitempty"`
100 ErrInfo string `json:",omitempty"`
101 }
102
103 func init() {
104 _ = v1alpha1.AddToScheme(scheme)
105 _ = clientgoscheme.AddToScheme(scheme)
106 }
107
108 func upperCaseChaos(str string) string {
109 parts := regexp.MustCompile("(.*)(chaos)").FindStringSubmatch(str)
110 return strings.Title(parts[1]) + strings.Title(parts[2])
111 }
112
113
114 func PrettyPrint(s string, indentLevel int, color Color) {
115 var tabStr string
116 for i := 0; i < indentLevel; i++ {
117 tabStr += "\t"
118 }
119 str := fmt.Sprintf("%s%s\n\n", tabStr, regexp.MustCompile("\n").ReplaceAllString(s, "\n"+tabStr))
120 if color != NoColor {
121 if cfunc, ok := colorFunc[color]; !ok {
122 fmt.Print("COLOR NOT SUPPORTED")
123 } else {
124 cfunc(str)
125 }
126 } else {
127 fmt.Print(str)
128 }
129 }
130
131
132 func PrintResult(result []ChaosResult) {
133 for _, chaos := range result {
134 PrettyPrint("[Chaos]: "+chaos.Name, 0, Blue)
135 for _, pod := range chaos.Pods {
136 PrettyPrint("[Pod]: "+pod.Name, 0, Blue)
137 for i, item := range pod.Items {
138 PrettyPrint(fmt.Sprintf("%d. [%s]", i+1, item.Name), 1, Cyan)
139 PrettyPrint(item.Value, 1, NoColor)
140 if item.Status == ItemSuccess {
141 if item.SucInfo != "" {
142 PrettyPrint(item.SucInfo, 1, Green)
143 } else {
144 PrettyPrint("Execute as expected", 1, Green)
145 }
146 } else if item.Status == ItemFailure {
147 PrettyPrint(fmt.Sprintf("Failed: %s ", item.ErrInfo), 1, Red)
148 }
149 }
150 }
151 }
152 }
153
154
155 func MarshalChaos(s interface{}) (string, error) {
156 b, err := json.MarshalIndent(s, "", " ")
157 if err != nil {
158 return "", errors.Wrapf(err, "failed to marshal indent")
159 }
160 return string(b), nil
161 }
162
163
164 func InitClientSet() (*ClientSet, error) {
165 restconfig, err := config.GetConfig()
166 if err != nil {
167 return nil, err
168 }
169 ctrlClient, err := client.New(restconfig, client.Options{Scheme: scheme})
170 if err != nil {
171 return nil, fmt.Errorf("failed to create client")
172 }
173 kubeClient, err := kubernetes.NewForConfig(restconfig)
174 if err != nil {
175 return nil, errors.Wrap(err, "error in getting acess to k8s")
176 }
177 return &ClientSet{ctrlClient, kubeClient}, nil
178 }
179
180
181 func GetPods(ctx context.Context, chaosName string, status v1alpha1.ChaosStatus, selectorSpec v1alpha1.PodSelectorSpec, c client.Client) ([]v1.Pod, []v1.Pod, error) {
182
183 failedMessage := ""
184 if failedMessage != "" {
185 PrettyPrint(fmt.Sprintf("chaos %s failed with: %s", chaosName, failedMessage), 0, Red)
186 }
187
188 pods, err := pod.SelectPods(ctx, c, c, selectorSpec, ctrlconfig.ControllerCfg.ClusterScoped, ctrlconfig.ControllerCfg.TargetNamespace, false)
189 if err != nil {
190 return nil, nil, errors.Wrap(err, "failed to SelectPods")
191 }
192 L().WithName("GetPods").V(4).Info("select pods for chaos", "chaos", chaosName, "pods", pods)
193 if len(pods) == 0 {
194 return nil, nil, fmt.Errorf("no pods found for chaos %s, selector: %s", chaosName, selectorSpec)
195 }
196
197 var chaosDaemons []v1.Pod
198
199 for _, chaosPod := range pods {
200 nodeName := chaosPod.Spec.NodeName
201 daemonSelector := v1alpha1.PodSelectorSpec{
202 Nodes: []string{nodeName},
203 GenericSelectorSpec: v1alpha1.GenericSelectorSpec{
204 LabelSelectors: map[string]string{"app.kubernetes.io/component": "chaos-daemon"},
205 },
206 }
207 daemons, err := pod.SelectPods(ctx, c, nil, daemonSelector, ctrlconfig.ControllerCfg.ClusterScoped, ctrlconfig.ControllerCfg.TargetNamespace, false)
208 if err != nil {
209 return nil, nil, errors.Wrap(err, fmt.Sprintf("failed to select daemon pod for pod %s", chaosPod.GetName()))
210 }
211 if len(daemons) == 0 {
212 return nil, nil, fmt.Errorf("no daemons found for pod %s with selector: %s", chaosPod.GetName(), daemonSelector)
213 }
214 chaosDaemons = append(chaosDaemons, daemons[0])
215 }
216
217 return pods, chaosDaemons, nil
218 }
219
220
221 func GetChaosList(ctx context.Context, chaosType string, chaosName string, ns string, c client.Client) ([]runtime.Object, []string, error) {
222 chaosType = upperCaseChaos(strings.ToLower(chaosType))
223 allKinds := v1alpha1.AllKinds()
224 chaosListInterface := allKinds[chaosType].SpawnList()
225
226 if err := c.List(ctx, chaosListInterface, client.InNamespace(ns)); err != nil {
227 return nil, nil, errors.Wrapf(err, "failed to get chaosList with namespace %s", ns)
228 }
229 chaosList := chaosListInterface.GetItems()
230 if len(chaosList) == 0 {
231 return nil, nil, fmt.Errorf("no chaos is found, please check your input")
232 }
233
234 var retList []runtime.Object
235 var retNameList []string
236 for _, ch := range chaosList {
237 if chaosName == "" || chaosName == ch.GetName() {
238 chaos, err := getChaos(ctx, chaosType, ch.GetName(), ns, c)
239 if err != nil {
240 return nil, nil, err
241 }
242 retList = append(retList, chaos)
243 retNameList = append(retNameList, ch.GetName())
244 }
245 }
246 if len(retList) == 0 {
247 return nil, nil, fmt.Errorf("no chaos is found, please check your input")
248 }
249
250 return retList, retNameList, nil
251 }
252
253 func getChaos(ctx context.Context, chaosType string, chaosName string, ns string, c client.Client) (runtime.Object, error) {
254 allKinds := v1alpha1.AllKinds()
255 chaos := allKinds[chaosType].SpawnObject()
256 objectKey := client.ObjectKey{
257 Namespace: ns,
258 Name: chaosName,
259 }
260 if err := c.Get(ctx, objectKey, chaos); err != nil {
261 return nil, errors.Wrapf(err, "failed to get chaos %s", chaosName)
262 }
263 return chaos, nil
264 }
265
266
267 func GetPidFromPS(ctx context.Context, pod v1.Pod, daemon v1.Pod, c *kubernetes.Clientset) ([]string, []string, error) {
268 cmd := "ps"
269 out, err := ExecBypass(ctx, pod, daemon, cmd, c)
270 if err != nil {
271 return nil, nil, errors.Wrapf(err, "run command %s failed", cmd)
272 }
273 outLines := strings.Split(string(out), "\n")
274 if len(outLines) < 2 {
275 return nil, nil, fmt.Errorf("ps returns empty")
276 }
277 titles := strings.Fields(outLines[0])
278 var pidColumn, cmdColumn int
279 for i, t := range titles {
280 if t == "PID" {
281 pidColumn = i
282 }
283 if t == "COMMAND" || t == "CMD" {
284 cmdColumn = i
285 }
286 }
287 if pidColumn == 0 && cmdColumn == 0 {
288 return nil, nil, fmt.Errorf("parsing ps error: could not get PID and COMMAND column")
289 }
290 var pids, commands []string
291 for _, line := range outLines[1:] {
292 item := strings.Fields(line)
293
294 if len(item) == 0 {
295 break
296 }
297 pids = append(pids, item[pidColumn])
298 commands = append(commands, item[cmdColumn])
299 }
300 return pids, commands, nil
301 }
302
303
304 func GetPidFromPod(ctx context.Context, pod v1.Pod, daemon v1.Pod) (uint32, error) {
305 pfCancel, localPort, err := forwardPorts(ctx, daemon, uint16(ctrlconfig.ControllerCfg.ChaosDaemonPort))
306 if err != nil {
307 return 0, errors.Wrapf(err, "forward ports for daemon pod %s/%s failed", daemon.Namespace, daemon.Name)
308 }
309 L().WithName("GetPidFromPod").V(4).Info(fmt.Sprintf("port forwarding 127.0.0.1:%d -> pod/%s/%s:%d", localPort, daemon.Namespace, daemon.Name, ctrlconfig.ControllerCfg.ChaosDaemonPort))
310
311 defer func() {
312 pfCancel()
313 }()
314
315 daemonClient, err := ConnectToLocalChaosDaemon(int(localPort))
316 if err != nil {
317 return 0, errors.Wrapf(err, "failed to create new chaos daemon client with local port %d", localPort)
318 }
319 defer daemonClient.Close()
320
321 if len(pod.Status.ContainerStatuses) == 0 {
322 return 0, fmt.Errorf("%s %s can't get the state of container", pod.Namespace, pod.Name)
323 }
324
325 res, err := daemonClient.ContainerGetPid(ctx, &pb.ContainerRequest{
326 Action: &pb.ContainerAction{
327 Action: pb.ContainerAction_GETPID,
328 },
329 ContainerId: pod.Status.ContainerStatuses[0].ContainerID,
330 })
331 if err != nil {
332 return 0, errors.Wrapf(err, "failed get pid from pod %s/%s", pod.GetNamespace(), pod.GetName())
333 }
334 return res.Pid, nil
335 }
336
337 func forwardPorts(ctx context.Context, pod v1.Pod, port uint16) (context.CancelFunc, uint16, error) {
338 commonRestClientGetter := NewCommonRestClientGetter()
339 fw, err := portforward.NewPortForwarder(ctx, commonRestClientGetter, false)
340 if err != nil {
341 return nil, 0, errors.Wrap(err, "failed to create port forwarder")
342 }
343 _, localPort, pfCancel, err := portforward.ForwardOnePort(fw, pod.Namespace, pod.Name, port)
344 return pfCancel, localPort, err
345 }
346
347 func ForwardSvcPorts(ctx context.Context, ns, svc string, port uint16) (context.CancelFunc, uint16, error) {
348 commonRestClientGetter := NewCommonRestClientGetter()
349 fw, err := portforward.NewPortForwarder(ctx, commonRestClientGetter, false)
350 if err != nil {
351 return nil, 0, errors.Wrap(err, "failed to create port forwarder")
352 }
353 _, localPort, pfCancel, err := portforward.ForwardOnePort(fw, ns, svc, port)
354 return pfCancel, localPort, err
355 }
356
357
358 func Log(pod v1.Pod, tail int64, c *kubernetes.Clientset) (string, error) {
359 podLogOpts := v1.PodLogOptions{}
360
361 if tail >= 0 {
362 podLogOpts.TailLines = func(i int64) *int64 { return &i }(tail)
363 }
364
365 req := c.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts)
366
367 podLogs, err := req.Stream(context.TODO())
368 if err != nil {
369 return "", errors.Wrapf(err, "failed to open log stream for pod %s/%s", pod.GetNamespace(), pod.GetName())
370 }
371 defer podLogs.Close()
372
373 buf := new(bytes.Buffer)
374 _, err = io.Copy(buf, podLogs)
375 if err != nil {
376 return "", errors.Wrapf(err, "failed to copy information from podLogs to buf")
377 }
378 return buf.String(), nil
379 }
380
381
382 func ConnectToLocalChaosDaemon(port int) (daemonClient.ChaosDaemonClientInterface, error) {
383 if cli := mock.On("MockChaosDaemonClient"); cli != nil {
384 return cli.(daemonClient.ChaosDaemonClientInterface), nil
385 }
386 if err := mock.On("NewChaosDaemonClientError"); err != nil {
387 return nil, err.(error)
388 }
389 cc, err := getGrpcClient(port)
390
391 if err != nil {
392 return nil, err
393 }
394 return daemonClient.New(cc), nil
395 }
396
397 func getGrpcClient(port int) (*grpc.ClientConn, error) {
398 builder := grpcUtils.Builder("localhost", port)
399 if Insecure {
400 builder.Insecure()
401 } else {
402 if TLSFiles.CaCert == "" || TLSFiles.Cert == "" || TLSFiles.Key == "" {
403 PrettyPrint("TLS Files are not complete, fall back to use secrets.", 0, Green)
404 config, err := getTLSConfigFromSecrets()
405 if err != nil {
406 return nil, err
407 }
408 builder.TLSFromRaw(config.CaCert, config.Cert, config.Key)
409 } else {
410 PrettyPrint("Using TLS Files.", 0, Green)
411 builder.TLSFromFile(TLSFiles.CaCert, TLSFiles.Cert, TLSFiles.Key)
412 }
413 }
414 return builder.Build()
415 }
416
417 func getTLSConfigFromSecrets() (*grpcUtils.TLSRaw, error) {
418 restconfig, err := config.GetConfig()
419 if err != nil {
420 return nil, err
421 }
422 kubeClient, err := kubernetes.NewForConfig(restconfig)
423 if err != nil {
424 return nil, err
425 }
426
427 secret, err := kubeClient.CoreV1().Secrets(ChaosDaemonNamespace).Get(context.TODO(), ChaosDaemonClientCert, metav1.GetOptions{})
428 if err != nil {
429 return nil, err
430 }
431 cfg := grpcUtils.TLSRaw{
432 CaCert: secret.Data["ca.crt"],
433 Cert: secret.Data["tls.crt"],
434 Key: secret.Data["tls.key"],
435 }
436 return &cfg, nil
437 }
438