...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/httpchaos_server.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package chaosdaemon
    17  
    18  import (
    19  	"bufio"
    20  	"bytes"
    21  	"context"
    22  	"encoding/json"
    23  	"fmt"
    24  	"io"
    25  	"net/http"
    26  	"os"
    27  	"sync"
    28  
    29  	"github.com/pkg/errors"
    30  
    31  	"github.com/chaos-mesh/chaos-mesh/pkg/bpm"
    32  	pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    33  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/tproxyconfig"
    34  )
    35  
    36  const (
    37  	tproxyBin = "/usr/local/bin/tproxy"
    38  	pathEnv   = "PATH"
    39  )
    40  
    41  type stdioTransport struct {
    42  	uid    string
    43  	locker *sync.Map
    44  	pipes  bpm.Pipes
    45  }
    46  
    47  func (t *stdioTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
    48  	if _, loaded := t.locker.LoadOrStore(t.uid, true); loaded {
    49  		return &http.Response{
    50  			StatusCode: http.StatusLocked,
    51  			Status:     http.StatusText(http.StatusLocked),
    52  			Body:       io.NopCloser(bytes.NewBufferString("")),
    53  			Request:    req,
    54  		}, nil
    55  	}
    56  	defer t.locker.Delete(t.uid)
    57  	if t.pipes.Stdin == nil {
    58  		return nil, errors.New("fail to get stdin of process")
    59  	}
    60  	if t.pipes.Stdout == nil {
    61  		return nil, errors.New("fail to get stdout of process")
    62  	}
    63  
    64  	err = req.Write(t.pipes.Stdin)
    65  	if err != nil {
    66  		return
    67  	}
    68  
    69  	resp, err = http.ReadResponse(bufio.NewReader(t.pipes.Stdout), req)
    70  	return
    71  }
    72  
    73  func (s *DaemonServer) ApplyHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) (*pb.ApplyHttpChaosResponse, error) {
    74  	log := s.getLoggerFromContext(ctx)
    75  	log.Info("applying http chaos")
    76  
    77  	if in.InstanceUid == "" {
    78  		if uid, ok := s.backgroundProcessManager.GetUID(bpm.ProcessPair{Pid: int(in.Instance), CreateTime: in.StartTime}); ok {
    79  			in.InstanceUid = uid
    80  		}
    81  	}
    82  
    83  	if _, ok := s.backgroundProcessManager.GetPipes(in.InstanceUid); !ok {
    84  		if in.InstanceUid != "" {
    85  			// chaos daemon may restart, create another tproxy instance
    86  			if err := s.backgroundProcessManager.KillBackgroundProcess(ctx, in.InstanceUid); err != nil {
    87  				// ignore this error
    88  				log.Error(err, "kill background process", "uid", in.InstanceUid)
    89  			}
    90  		}
    91  
    92  		// set uid internally
    93  		if err := s.createHttpChaos(ctx, in); err != nil {
    94  			return nil, errors.Wrap(err, "create http chaos")
    95  		}
    96  	}
    97  
    98  	resp, err := s.applyHttpChaos(ctx, in)
    99  	if err != nil {
   100  		if killError := s.backgroundProcessManager.KillBackgroundProcess(ctx, in.InstanceUid); killError != nil {
   101  			log.Error(killError, "kill tproxy", "uid", in.InstanceUid)
   102  		}
   103  		return nil, errors.Wrap(err, "apply config")
   104  	}
   105  	return resp, err
   106  }
   107  
   108  func (s *DaemonServer) applyHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) (*pb.ApplyHttpChaosResponse, error) {
   109  	log := s.getLoggerFromContext(ctx)
   110  
   111  	pipes, ok := s.backgroundProcessManager.GetPipes(in.InstanceUid)
   112  	if !ok {
   113  		return nil, errors.Errorf("fail to get process(%s)", in.InstanceUid)
   114  	}
   115  
   116  	transport := &stdioTransport{
   117  		uid:    in.InstanceUid,
   118  		locker: s.tproxyLocker,
   119  		pipes:  pipes,
   120  	}
   121  
   122  	var rules []tproxyconfig.PodHttpChaosBaseRule
   123  	err := json.Unmarshal([]byte(in.Rules), &rules)
   124  	if err != nil {
   125  		return nil, errors.Wrap(err, "unmarshal rules")
   126  	}
   127  
   128  	log.Info("the length of actions", "length", len(rules))
   129  
   130  	httpChaosSpec := tproxyconfig.Config{
   131  		ProxyPorts: in.ProxyPorts,
   132  		Rules:      rules,
   133  	}
   134  
   135  	if len(in.Tls) != 0 {
   136  		httpChaosSpec.TLS = new(tproxyconfig.TLSConfig)
   137  		err = json.Unmarshal([]byte(in.Tls), httpChaosSpec.TLS)
   138  		if err != nil {
   139  			return nil, errors.Wrap(err, "unmarshal tls config")
   140  		}
   141  	}
   142  
   143  	config, err := json.Marshal(&httpChaosSpec)
   144  	if err != nil {
   145  		return nil, err
   146  	}
   147  
   148  	log.Info("ready to apply", "config", string(config))
   149  
   150  	req, err := http.NewRequest(http.MethodPut, "/", bytes.NewReader(config))
   151  	if err != nil {
   152  		return nil, errors.Wrap(err, "create http request")
   153  	}
   154  
   155  	resp, err := transport.RoundTrip(req)
   156  	if err != nil {
   157  		return nil, errors.Wrap(err, "send http request")
   158  	}
   159  
   160  	log.Info("http chaos applied")
   161  
   162  	body, err := io.ReadAll(resp.Body)
   163  	if err != nil {
   164  		return nil, errors.Wrap(err, "read response body")
   165  	}
   166  
   167  	return &pb.ApplyHttpChaosResponse{
   168  		Instance:    int64(in.Instance),
   169  		InstanceUid: in.InstanceUid,
   170  		StartTime:   in.StartTime,
   171  		StatusCode:  int32(resp.StatusCode),
   172  		Error:       string(body),
   173  	}, nil
   174  }
   175  
   176  func (s *DaemonServer) createHttpChaos(ctx context.Context, in *pb.ApplyHttpChaosRequest) error {
   177  	pid, err := s.crClient.GetPidFromContainerID(ctx, in.ContainerId)
   178  	if err != nil {
   179  		return errors.Wrapf(err, "get PID of container(%s)", in.ContainerId)
   180  	}
   181  	processBuilder := bpm.DefaultProcessBuilder(tproxyBin, "-i", "-vv").
   182  		EnableLocalMnt().
   183  		SetIdentifier(fmt.Sprintf("tproxy-%s", in.ContainerId)).
   184  		SetEnv(pathEnv, os.Getenv(pathEnv))
   185  
   186  	if in.EnterNS {
   187  		processBuilder = processBuilder.SetNS(pid, bpm.PidNS).SetNS(pid, bpm.NetNS)
   188  	}
   189  
   190  	cmd := processBuilder.Build(ctx)
   191  	cmd.Stderr = os.Stderr
   192  
   193  	proc, err := s.backgroundProcessManager.StartProcess(ctx, cmd)
   194  	if err != nil {
   195  		return errors.Wrapf(err, "execute command(%s)", cmd)
   196  	}
   197  
   198  	in.Instance = int64(proc.Pair.Pid)
   199  	in.StartTime = proc.Pair.CreateTime
   200  	in.InstanceUid = proc.Uid
   201  	return nil
   202  }
   203