...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/httpchaos_server.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package chaosdaemon
    15  
    16  import (
    17  	"bufio"
    18  	"bytes"
    19  	"context"
    20  	"encoding/json"
    21  	"fmt"
    22  	"io/ioutil"
    23  	"net/http"
    24  	"os"
    25  
    26  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    27  	"github.com/chaos-mesh/chaos-mesh/pkg/bpm"
    28  	pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    29  )
    30  
    31  const (
    32  	tproxyBin = "/usr/local/bin/tproxy"
    33  	pathEnv   = "PATH"
    34  )
    35  
    36  type stdioTransport struct {
    37  	stdio *bpm.Stdio
    38  }
    39  
    40  type tproxyConfig struct {
    41  	ProxyPorts []uint32                        `json:"proxy_ports"`
    42  	Rules      []v1alpha1.PodHttpChaosBaseRule `json:"rules"`
    43  }
    44  
    45  func (t stdioTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
    46  	t.stdio.Lock()
    47  	defer t.stdio.Unlock()
    48  
    49  	if t.stdio.Stdin == nil {
    50  		return nil, fmt.Errorf("fail to get stdin of process")
    51  	}
    52  	if t.stdio.Stdout == nil {
    53  		return nil, fmt.Errorf("fail to get stdout of process")
    54  	}
    55  
    56  	err = req.Write(t.stdio.Stdin)
    57  	if err != nil {
    58  		return
    59  	}
    60  
    61  	resp, err = http.ReadResponse(bufio.NewReader(t.stdio.Stdout), req)
    62  	return
    63  }
    64  
    65  func (s *DaemonServer) ApplyHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) (*pb.ApplyHttpChaosResponse, error) {
    66  	log := log.WithValues("Request", in)
    67  	log.Info("applying http chaos")
    68  
    69  	if in.Instance == 0 {
    70  		if err := s.createHttpChaos(ctx, in); err != nil {
    71  			return nil, err
    72  		}
    73  	}
    74  
    75  	stdio := s.backgroundProcessManager.Stdio(int(in.Instance), in.StartTime)
    76  	if stdio == nil {
    77  		return nil, fmt.Errorf("fail to get stdio of process")
    78  	}
    79  
    80  	transport := stdioTransport{stdio: stdio}
    81  
    82  	rules := []v1alpha1.PodHttpChaosBaseRule{}
    83  	err := json.Unmarshal([]byte(in.Rules), &rules)
    84  	if err != nil {
    85  		log.Error(err, "error while unmarshal json bytes")
    86  		return nil, err
    87  	}
    88  
    89  	log.Info("the length of actions", "length", len(rules))
    90  
    91  	httpChaosSpec := tproxyConfig{
    92  		ProxyPorts: append([]uint32{}, in.ProxyPorts...),
    93  		Rules:      rules,
    94  	}
    95  
    96  	config, err := json.Marshal(&httpChaosSpec)
    97  	if err != nil {
    98  		return nil, err
    99  	}
   100  
   101  	log.Info("ready to apply", "config", string(config))
   102  
   103  	req, err := http.NewRequest(http.MethodPut, "/", bytes.NewReader(config))
   104  	if err != nil {
   105  		return nil, err
   106  	}
   107  
   108  	resp, err := transport.RoundTrip(req)
   109  	if err != nil {
   110  		return nil, err
   111  	}
   112  
   113  	log.Info("http chaos applied")
   114  
   115  	body, err := ioutil.ReadAll(resp.Body)
   116  	if err != nil {
   117  		return nil, err
   118  	}
   119  
   120  	return &pb.ApplyHttpChaosResponse{
   121  		Instance:   int64(in.Instance),
   122  		StartTime:  in.StartTime,
   123  		StatusCode: int32(resp.StatusCode),
   124  		Error:      string(body),
   125  	}, nil
   126  }
   127  
   128  func (s *DaemonServer) createHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) error {
   129  	pid, err := s.crClient.GetPidFromContainerID(ctx, in.ContainerId)
   130  	if err != nil {
   131  		log.Error(err, "error while getting PID")
   132  		return err
   133  	}
   134  	processBuilder := bpm.DefaultProcessBuilder(tproxyBin, "-i", "-vv").
   135  		EnableLocalMnt().
   136  		SetIdentifier(in.ContainerId).
   137  		SetEnv(pathEnv, os.Getenv(pathEnv)).
   138  		SetStdin(bpm.NewBlockingBuffer()).
   139  		SetStdout(bpm.NewBlockingBuffer())
   140  
   141  	if in.EnterNS {
   142  		processBuilder = processBuilder.SetNS(pid, bpm.PidNS).SetNS(pid, bpm.NetNS)
   143  	}
   144  
   145  	cmd := processBuilder.Build()
   146  	cmd.Stderr = os.Stderr
   147  
   148  	procState, err := s.backgroundProcessManager.StartProcess(cmd)
   149  	if err != nil {
   150  		return err
   151  	}
   152  	ct, err := procState.CreateTime()
   153  	if err != nil {
   154  		log.Error(err, "get create time failed")
   155  		if kerr := cmd.Process.Kill(); kerr != nil {
   156  			log.Error(kerr, "kill tproxy failed", "request", in)
   157  		}
   158  		return err
   159  	}
   160  
   161  	in.Instance = int64(cmd.Process.Pid)
   162  	in.StartTime = ct
   163  	return nil
   164  }
   165