...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/httpchaos_server.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package chaosdaemon
    15  
    16  import (
    17  	"bufio"
    18  	"bytes"
    19  	"context"
    20  	"encoding/json"
    21  	"fmt"
    22  	"io/ioutil"
    23  	"net/http"
    24  	"os"
    25  
    26  	"github.com/go-logr/logr"
    27  	"github.com/pkg/errors"
    28  
    29  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    30  	"github.com/chaos-mesh/chaos-mesh/pkg/bpm"
    31  	pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    32  )
    33  
    34  const (
    35  	tproxyBin = "/usr/local/bin/tproxy"
    36  	pathEnv   = "PATH"
    37  )
    38  
    39  type stdioTransport struct {
    40  	stdio *bpm.Stdio
    41  }
    42  
    43  type tproxyConfig struct {
    44  	ProxyPorts []uint32                        `json:"proxy_ports"`
    45  	Rules      []v1alpha1.PodHttpChaosBaseRule `json:"rules"`
    46  }
    47  
    48  func (t stdioTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
    49  	t.stdio.Lock()
    50  	defer t.stdio.Unlock()
    51  
    52  	if t.stdio.Stdin == nil {
    53  		return nil, errors.New("fail to get stdin of process")
    54  	}
    55  	if t.stdio.Stdout == nil {
    56  		return nil, errors.New("fail to get stdout of process")
    57  	}
    58  
    59  	err = req.Write(t.stdio.Stdin)
    60  	if err != nil {
    61  		return
    62  	}
    63  
    64  	resp, err = http.ReadResponse(bufio.NewReader(t.stdio.Stdout), req)
    65  	return
    66  }
    67  
    68  func (s *DaemonServer) ApplyHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) (*pb.ApplyHttpChaosResponse, error) {
    69  	log := log.WithValues("Request", in)
    70  	log.Info("applying http chaos")
    71  
    72  	if s.backgroundProcessManager.Stdio(int(in.Instance), in.StartTime) == nil {
    73  		// chaos daemon may restart, create another tproxy instance
    74  		if err := s.backgroundProcessManager.KillBackgroundProcess(ctx, int(in.Instance), in.StartTime); err != nil {
    75  			return nil, errors.Wrapf(err, "kill background process(%d)", in.Instance)
    76  		}
    77  		if err := s.createHttpChaos(ctx, in); err != nil {
    78  			return nil, err
    79  		}
    80  	}
    81  
    82  	resp, err := s.applyHttpChaos(ctx, log, in)
    83  	if err != nil {
    84  		killError := s.backgroundProcessManager.KillBackgroundProcess(ctx, int(in.Instance), in.StartTime)
    85  		log.Error(killError, "kill tproxy", "instance", in.Instance)
    86  		return nil, errors.Wrap(err, "apply config")
    87  	}
    88  	return resp, err
    89  }
    90  
    91  func (s *DaemonServer) applyHttpChaos(ctx context.Context, logger logr.Logger, in *pb.ApplyHttpChaosRequest) (*pb.ApplyHttpChaosResponse, error) {
    92  	stdio := s.backgroundProcessManager.Stdio(int(in.Instance), in.StartTime)
    93  	if stdio == nil {
    94  		return nil, errors.Errorf("fail to get stdio of process")
    95  	}
    96  
    97  	transport := stdioTransport{stdio: stdio}
    98  
    99  	rules := []v1alpha1.PodHttpChaosBaseRule{}
   100  	err := json.Unmarshal([]byte(in.Rules), &rules)
   101  	if err != nil {
   102  		return nil, errors.Wrap(err, "unmarshal rules")
   103  	}
   104  
   105  	log.Info("the length of actions", "length", len(rules))
   106  
   107  	httpChaosSpec := tproxyConfig{
   108  		ProxyPorts: append([]uint32{}, in.ProxyPorts...),
   109  		Rules:      rules,
   110  	}
   111  
   112  	config, err := json.Marshal(&httpChaosSpec)
   113  	if err != nil {
   114  		return nil, err
   115  	}
   116  
   117  	log.Info("ready to apply", "config", string(config))
   118  
   119  	req, err := http.NewRequest(http.MethodPut, "/", bytes.NewReader(config))
   120  	if err != nil {
   121  		return nil, errors.Wrap(err, "create http request")
   122  	}
   123  
   124  	resp, err := transport.RoundTrip(req)
   125  	if err != nil {
   126  		return nil, errors.Wrap(err, "send http request")
   127  	}
   128  
   129  	log.Info("http chaos applied")
   130  
   131  	body, err := ioutil.ReadAll(resp.Body)
   132  	if err != nil {
   133  		return nil, errors.Wrap(err, "read response body")
   134  	}
   135  
   136  	return &pb.ApplyHttpChaosResponse{
   137  		Instance:   int64(in.Instance),
   138  		StartTime:  in.StartTime,
   139  		StatusCode: int32(resp.StatusCode),
   140  		Error:      string(body),
   141  	}, nil
   142  }
   143  
   144  func (s *DaemonServer) createHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) error {
   145  	pid, err := s.crClient.GetPidFromContainerID(ctx, in.ContainerId)
   146  	if err != nil {
   147  		return errors.Wrapf(err, "get PID of container(%s)", in.ContainerId)
   148  	}
   149  	processBuilder := bpm.DefaultProcessBuilder(tproxyBin, "-i", "-vv").
   150  		EnableLocalMnt().
   151  		SetIdentifier(fmt.Sprintf("tproxy-%s", in.ContainerId)).
   152  		SetEnv(pathEnv, os.Getenv(pathEnv)).
   153  		SetStdin(bpm.NewBlockingBuffer()).
   154  		SetStdout(bpm.NewBlockingBuffer())
   155  
   156  	if in.EnterNS {
   157  		processBuilder = processBuilder.SetNS(pid, bpm.PidNS).SetNS(pid, bpm.NetNS)
   158  	}
   159  
   160  	cmd := processBuilder.Build()
   161  	cmd.Stderr = os.Stderr
   162  
   163  	procState, err := s.backgroundProcessManager.StartProcess(cmd)
   164  	if err != nil {
   165  		return errors.Wrapf(err, "execute command(%s)", cmd)
   166  	}
   167  	ct, err := procState.CreateTime()
   168  	if err != nil {
   169  		if kerr := cmd.Process.Kill(); kerr != nil {
   170  			log.Error(kerr, "kill tproxy", "request", in)
   171  		}
   172  		return errors.Wrap(err, "get create time")
   173  	}
   174  
   175  	in.Instance = int64(cmd.Process.Pid)
   176  	in.StartTime = ct
   177  	return nil
   178  }
   179