...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/chaosfs/server.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/chaosfs

     1  // Copyright 2019 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package chaosfs
    15  
    16  import (
    17  	"context"
    18  	"math/rand"
    19  	"net"
    20  	"os"
    21  	"regexp"
    22  	"sync"
    23  	"syscall"
    24  	"time"
    25  
    26  	"github.com/golang/protobuf/ptypes/empty"
    27  
    28  	pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosfs/pb"
    29  	"github.com/chaos-mesh/chaos-mesh/pkg/utils"
    30  
    31  	"google.golang.org/grpc"
    32  	"google.golang.org/grpc/reflection"
    33  
    34  	ctrl "sigs.k8s.io/controller-runtime"
    35  )
    36  
    37  var log = ctrl.Log.WithName("fuse-server")
    38  
    39  //go:generate protoc -I pb pb/injure.proto --go_out=plugins=grpc:pb
    40  
    41  var (
    42  	faultMap sync.Map
    43  
    44  	methods map[string]bool
    45  )
    46  
    47  func init() {
    48  	faultMap = sync.Map{}
    49  	initMethods()
    50  }
    51  
    52  type faultContext struct {
    53  	errno  error
    54  	random bool
    55  	pct    uint32
    56  	path   string
    57  	delay  time.Duration
    58  }
    59  
    60  func initMethods() {
    61  	methods = make(map[string]bool)
    62  	methods["open"] = true
    63  	methods["read"] = true
    64  	methods["write"] = true
    65  	methods["mkdir"] = true
    66  	methods["rmdir"] = true
    67  	methods["opendir"] = true
    68  	methods["fsync"] = true
    69  	methods["flush"] = true
    70  	methods["release"] = true
    71  	methods["truncate"] = true
    72  	methods["getattr"] = true
    73  	methods["chown"] = true
    74  	methods["chmod"] = true
    75  	methods["utimens"] = true
    76  	methods["allocate"] = true
    77  	methods["getlk"] = true
    78  	methods["setlk"] = true
    79  	methods["setlkw"] = true
    80  	methods["statfs"] = true
    81  	methods["readlink"] = true
    82  	methods["symlink"] = true
    83  	methods["create"] = true
    84  	methods["access"] = true
    85  	methods["link"] = true
    86  	methods["mknod"] = true
    87  	methods["rename"] = true
    88  	methods["unlink"] = true
    89  	methods["getxattr"] = true
    90  	methods["listxattr"] = true
    91  	methods["removexattr"] = true
    92  	methods["setxattr"] = true
    93  }
    94  
    95  func randomErrno() error {
    96  	// from E2BIG to EXFULL, notice linux only
    97  	return syscall.Errno(rand.Intn(0x36-0x7) + 0x7)
    98  }
    99  
   100  func probab(percentage uint32) bool {
   101  	return rand.Intn(99) < int(percentage)
   102  }
   103  
   104  func faultInject(path, method string) error {
   105  	val, ok := faultMap.Load(method)
   106  	if !ok {
   107  		return nil
   108  	}
   109  
   110  	fc := val.(*faultContext)
   111  	if !probab(fc.pct) {
   112  		return nil
   113  	}
   114  
   115  	if len(fc.path) > 0 {
   116  		re, err := regexp.Compile(fc.path)
   117  		if err != nil {
   118  			log.Error(err, "failed to parse path", "path: ", fc.path)
   119  			return nil
   120  		}
   121  		if !re.MatchString(path) {
   122  			return nil
   123  		}
   124  	}
   125  
   126  	log.V(6).Info("Inject fault", "method", method, "path", path)
   127  	log.V(6).Info("Inject fault", "context", fc)
   128  
   129  	var errno error = nil
   130  	if fc.errno != nil {
   131  		errno = fc.errno
   132  	} else if fc.random {
   133  		errno = randomErrno()
   134  	}
   135  
   136  	if fc.delay > 0 {
   137  		time.Sleep(fc.delay)
   138  	}
   139  
   140  	return errno
   141  }
   142  
   143  type server struct {
   144  }
   145  
   146  func (s *server) methods() []string {
   147  	ms := make([]string, 0)
   148  	for k := range methods {
   149  		ms = append(ms, k)
   150  	}
   151  	return ms
   152  }
   153  
   154  func (s *server) Injected(_ context.Context, _ *empty.Empty) (*pb.InjectedResponse, error) {
   155  	for method := range methods {
   156  		if _, ok := faultMap.Load(method); ok {
   157  			return &pb.InjectedResponse{Injected: true}, nil
   158  		}
   159  	}
   160  	return &pb.InjectedResponse{Injected: false}, nil
   161  }
   162  
   163  func (s *server) Methods(_ context.Context, _ *empty.Empty) (*pb.Response, error) {
   164  	return &pb.Response{Methods: s.methods()}, nil
   165  }
   166  
   167  func (s *server) RecoverAll(_ context.Context, _ *empty.Empty) (*empty.Empty, error) {
   168  	log.Info("Recover all fault")
   169  	faultMap.Range(func(k, v interface{}) bool {
   170  		faultMap.Delete(k)
   171  		return true
   172  	})
   173  	return &empty.Empty{}, nil
   174  }
   175  
   176  func (s *server) RecoverMethod(_ context.Context, in *pb.Request) (*empty.Empty, error) {
   177  	ms := in.GetMethods()
   178  	for _, v := range ms {
   179  		faultMap.Delete(v)
   180  	}
   181  	return &empty.Empty{}, nil
   182  }
   183  
   184  func (s *server) setFault(ms []string, f *faultContext) {
   185  	for _, v := range ms {
   186  		faultMap.Store(v, f)
   187  	}
   188  }
   189  
   190  func (s *server) SetFault(_ context.Context, in *pb.Request) (*empty.Empty, error) {
   191  	// TODO: use Errno(0), and handle Errno(0) in Hook interfaces
   192  	log.Info("Set fault", "request", in)
   193  
   194  	var errno error = nil
   195  	if in.Errno != 0 {
   196  		errno = syscall.Errno(in.Errno)
   197  	}
   198  	f := &faultContext{
   199  		errno:  errno,
   200  		random: in.Random,
   201  		pct:    in.Pct,
   202  		path:   in.Path,
   203  		delay:  time.Duration(in.Delay) * time.Microsecond,
   204  	}
   205  
   206  	s.setFault(in.Methods, f)
   207  	return &empty.Empty{}, nil
   208  }
   209  
   210  func (s *server) SetFaultAll(ctx context.Context, in *pb.Request) (*empty.Empty, error) {
   211  	// TODO: use Errno(0), and handle Errno(0) in Hook interfaces
   212  	log.Info("Set fault all methods", "request", in)
   213  
   214  	var errno error = nil
   215  	if in.Errno != 0 {
   216  		errno = syscall.Errno(in.Errno)
   217  	}
   218  	f := &faultContext{
   219  		errno:  errno,
   220  		random: in.Random,
   221  		pct:    in.Pct,
   222  		path:   in.Path,
   223  		delay:  time.Duration(in.Delay) * time.Microsecond,
   224  	}
   225  
   226  	s.setFault(s.methods(), f)
   227  	return &empty.Empty{}, nil
   228  }
   229  
   230  func StartServer(addr string) {
   231  	lis, err := net.Listen("tcp", addr)
   232  	if err != nil {
   233  		log.Error(err, "failed to listen tcp server", "address", addr)
   234  		os.Exit(1)
   235  	}
   236  	s := grpc.NewServer(grpc.UnaryInterceptor(utils.TimeoutServerInterceptor))
   237  	pb.RegisterInjureServer(s, &server{})
   238  	// Register reflection service on gRPC server.
   239  	reflection.Register(s)
   240  	go func() {
   241  		if err := s.Serve(lis); err != nil {
   242  			log.Error(err, "failed to start serve")
   243  			os.Exit(1)
   244  		}
   245  	}()
   246  }
   247