...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/bpm/bpm.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/bpm

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package bpm
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"io"
    22  	"os/exec"
    23  	"sync"
    24  	"syscall"
    25  
    26  	"github.com/go-logr/logr"
    27  	"github.com/google/uuid"
    28  	"github.com/pkg/errors"
    29  	"github.com/prometheus/client_golang/prometheus"
    30  	"github.com/shirou/gopsutil/process"
    31  
    32  	"github.com/chaos-mesh/chaos-mesh/pkg/log"
    33  )
    34  
    35  type NsType string
    36  
    37  const (
    38  	MountNS NsType = "mnt"
    39  	// uts namespace is not supported yet
    40  	// UtsNS   NsType = "uts"
    41  	IpcNS NsType = "ipc"
    42  	NetNS NsType = "net"
    43  	PidNS NsType = "pid"
    44  	// user namespace is not supported yet
    45  	// UserNS  NsType = "user"
    46  )
    47  
    48  var nsArgMap = map[NsType]string{
    49  	MountNS: "m",
    50  	// uts namespace is not supported by nsexec yet
    51  	// UtsNS:   "u",
    52  	IpcNS: "i",
    53  	NetNS: "n",
    54  	PidNS: "p",
    55  	// user namespace is not supported by nsexec yet
    56  	// UserNS:  "U",
    57  }
    58  
    59  const (
    60  	pausePath  = "/usr/local/bin/pause"
    61  	nsexecPath = "/usr/local/bin/nsexec"
    62  
    63  	DefaultProcPrefix = "/proc"
    64  )
    65  
    66  // ProcessPair is an identifier for process
    67  // Keep compatible with v2.x
    68  // TODO: remove in v3.x
    69  //
    70  // Currently, the bpm locate managed processes by both PID and create time, because the OS may reuse PID, we must check the create time to avoid locating the wrong process.
    71  //
    72  // However, the two-step locating is messy and the create time may be imprecise (we have fixed a [relevant bug](https://github.com/shirou/gopsutil/pull/1204)).
    73  // In future version, we should completely remove the two-step locating and identify managed processes by UID only.
    74  type ProcessPair struct {
    75  	Pid        int
    76  	CreateTime int64
    77  }
    78  
    79  type Process struct {
    80  	Uid string
    81  
    82  	// TODO: remove in v3.x
    83  	// store create time, to keep compatible with v2.x
    84  	Pair ProcessPair
    85  
    86  	Cmd   *ManagedCommand
    87  	Pipes Pipes
    88  
    89  	ctx     context.Context
    90  	stopped context.CancelFunc
    91  }
    92  
    93  // pipes that will be connected to the command's stdin/stdout
    94  type Pipes struct {
    95  	Stdin  io.WriteCloser
    96  	Stdout io.ReadCloser
    97  }
    98  
    99  // BackgroundProcessManager manages all background processes
   100  type BackgroundProcessManager struct {
   101  	// deathChannel is a channel to receive Uid of dead processes
   102  	deathChannel chan string
   103  
   104  	// wait group to await all processes exit
   105  	wg *sync.WaitGroup
   106  
   107  	// identifiers is a map to prevent duplicated processes
   108  	identifiers *sync.Map
   109  
   110  	// Uid -> Process
   111  	processes *sync.Map
   112  
   113  	// TODO: remove in v3.x
   114  	// PidPair -> Uid, to keep compatible with v2.x
   115  	pidPairToUid *sync.Map
   116  
   117  	rootLogger logr.Logger
   118  
   119  	metricsCollector *metricsCollector
   120  }
   121  
   122  func startProcess(cmd *ManagedCommand) (*Process, error) {
   123  	stdin, err := cmd.StdinPipe()
   124  	if err != nil {
   125  		return nil, errors.Wrap(err, "create stdin pipe")
   126  	}
   127  
   128  	stdout, err := cmd.StdoutPipe()
   129  	if err != nil {
   130  		return nil, errors.Wrap(err, "create stdout pipe")
   131  	}
   132  
   133  	err = cmd.Start()
   134  	if err != nil {
   135  		return nil, errors.Wrapf(err, "start command `%s`", cmd.String())
   136  	}
   137  
   138  	newProcess := &Process{
   139  		Uid:   uuid.NewString(),
   140  		Cmd:   cmd,
   141  		Pipes: Pipes{Stdin: stdin, Stdout: stdout},
   142  	}
   143  
   144  	newProcess.ctx, newProcess.stopped = context.WithCancel(context.Background())
   145  
   146  	// keep compatible with v2.x
   147  	// TODO: remove in v3.x
   148  	pid := cmd.Process.Pid
   149  	proc, err := process.NewProcess(int32(cmd.Process.Pid))
   150  	if err != nil {
   151  		return nil, errors.Wrapf(err, "get process state for pid %d", pid)
   152  	}
   153  
   154  	ct, err := proc.CreateTime()
   155  	if err != nil {
   156  		return nil, errors.Wrapf(err, "get process create time for pid %d", pid)
   157  	}
   158  
   159  	newProcess.Pair = ProcessPair{
   160  		Pid:        int(proc.Pid),
   161  		CreateTime: ct,
   162  	}
   163  	return newProcess, nil
   164  }
   165  
   166  func (p *Process) Stopped() <-chan struct{} {
   167  	return p.ctx.Done()
   168  }
   169  
   170  // StartBackgroundProcessManager creates a background process manager
   171  func StartBackgroundProcessManager(registry prometheus.Registerer, rootLogger logr.Logger) *BackgroundProcessManager {
   172  	backgroundProcessManager := &BackgroundProcessManager{
   173  		deathChannel:     make(chan string, 1),
   174  		wg:               &sync.WaitGroup{},
   175  		identifiers:      &sync.Map{},
   176  		processes:        &sync.Map{},
   177  		pidPairToUid:     &sync.Map{},
   178  		rootLogger:       rootLogger.WithName("background-process-manager"),
   179  		metricsCollector: nil,
   180  	}
   181  
   182  	go func() {
   183  		// return if deathChannel is closed
   184  		for uid := range backgroundProcessManager.deathChannel {
   185  			process, loaded := backgroundProcessManager.processes.LoadAndDelete(uid)
   186  			if loaded {
   187  				proc := process.(*Process)
   188  				backgroundProcessManager.pidPairToUid.Delete(proc.Pair)
   189  				if proc.Cmd.Identifier != nil {
   190  					backgroundProcessManager.identifiers.Delete(*proc.Cmd.Identifier)
   191  				}
   192  				proc.stopped()
   193  			}
   194  			backgroundProcessManager.wg.Done()
   195  		}
   196  	}()
   197  
   198  	if registry != nil {
   199  		backgroundProcessManager.metricsCollector = newMetricsCollector(backgroundProcessManager, registry)
   200  	}
   201  
   202  	return backgroundProcessManager
   203  }
   204  
   205  func (m *BackgroundProcessManager) recycle(uid string) {
   206  	m.deathChannel <- uid
   207  }
   208  
   209  // StartProcess manages a process in manager
   210  func (m *BackgroundProcessManager) StartProcess(ctx context.Context, cmd *ManagedCommand) (*Process, error) {
   211  	log := m.getLoggerFromContext(ctx)
   212  	if cmd.Identifier != nil {
   213  		_, loaded := m.identifiers.LoadOrStore(*cmd.Identifier, true)
   214  		if loaded {
   215  			return nil, errors.Errorf("process with identifier %s is running", *cmd.Identifier)
   216  		}
   217  	}
   218  
   219  	process, err := startProcess(cmd)
   220  	if err != nil {
   221  		return nil, err
   222  	}
   223  
   224  	m.processes.Store(process.Uid, process)
   225  	m.pidPairToUid.Store(process.Pair, process.Uid)
   226  	// end
   227  
   228  	if m.metricsCollector != nil {
   229  		m.metricsCollector.bpmControlledProcessTotal.Inc()
   230  	}
   231  
   232  	m.wg.Add(1)
   233  	log = log.WithValues("uid", process.Uid, "pid", process.Pair.Pid)
   234  
   235  	go func() {
   236  		err := cmd.Wait()
   237  		if err != nil {
   238  			if exitErr, ok := err.(*exec.ExitError); ok {
   239  				status := exitErr.Sys().(syscall.WaitStatus)
   240  				if status.Signaled() && status.Signal() == syscall.SIGTERM {
   241  					log.Info("process stopped with SIGTERM signal")
   242  				}
   243  			} else {
   244  				log.Error(err, "process exited accidentally")
   245  			}
   246  		}
   247  		log.Info("process stopped")
   248  		m.recycle(process.Uid)
   249  	}()
   250  
   251  	return process, nil
   252  }
   253  
   254  func (m *BackgroundProcessManager) Shutdown(ctx context.Context) {
   255  	log := m.getLoggerFromContext(ctx)
   256  
   257  	m.processes.Range(func(_, value interface{}) bool {
   258  		process := value.(*Process)
   259  		log := log.WithValues("uid", process.Uid, "pid", process.Pair.Pid)
   260  		if err := process.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
   261  			log.Error(err, "send SIGTERM to process")
   262  			return true
   263  		}
   264  		return true
   265  	})
   266  	m.wg.Wait()
   267  	close(m.deathChannel)
   268  }
   269  
   270  func (m *BackgroundProcessManager) GetUID(pair ProcessPair) (string, bool) {
   271  	if uid, loaded := m.pidPairToUid.Load(pair); loaded {
   272  		return uid.(string), true
   273  	}
   274  	return "", false
   275  }
   276  
   277  func (m *BackgroundProcessManager) getProc(uid string) (*Process, bool) {
   278  	if proc, loaded := m.processes.Load(uid); loaded {
   279  		return proc.(*Process), true
   280  	}
   281  	return nil, false
   282  }
   283  
   284  func (m *BackgroundProcessManager) GetPipes(uid string) (Pipes, bool) {
   285  	proc, ok := m.getProc(uid)
   286  	if !ok {
   287  		return Pipes{}, false
   288  	}
   289  	return proc.Pipes, true
   290  }
   291  
   292  // KillBackgroundProcess sends SIGTERM to process
   293  func (m *BackgroundProcessManager) KillBackgroundProcess(ctx context.Context, uid string) error {
   294  	log := m.getLoggerFromContext(ctx)
   295  
   296  	log = log.WithValues("uid", uid)
   297  
   298  	proc, loaded := m.getProc(uid)
   299  	if !loaded {
   300  		return errors.Errorf("failed to find process with uid %s", uid)
   301  	}
   302  
   303  	if err := proc.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
   304  		return errors.Wrap(err, "send SIGTERM to process")
   305  	}
   306  
   307  	select {
   308  	case <-proc.Stopped():
   309  		log.Info("Successfully killed process")
   310  	case <-ctx.Done():
   311  		if err := ctx.Err(); err != nil {
   312  			return errors.Wrap(err, "context closed")
   313  		}
   314  	}
   315  	return nil
   316  }
   317  
   318  // GetIdentifiers finds all identifiers in BPM
   319  func (m *BackgroundProcessManager) GetIdentifiers() []string {
   320  	var identifiers []string
   321  	m.identifiers.Range(func(key, value interface{}) bool {
   322  		identifiers = append(identifiers, key.(string))
   323  		return true
   324  	})
   325  
   326  	return identifiers
   327  }
   328  
   329  func (m *BackgroundProcessManager) getLoggerFromContext(ctx context.Context) logr.Logger {
   330  	return log.EnrichLoggerWithContext(ctx, m.rootLogger)
   331  }
   332  
   333  // DefaultProcessBuilder returns the default process builder
   334  func DefaultProcessBuilder(cmd string, args ...string) *CommandBuilder {
   335  	return &CommandBuilder{
   336  		cmd:        cmd,
   337  		args:       args,
   338  		nsOptions:  []nsOption{},
   339  		pause:      false,
   340  		identifier: nil,
   341  		ctx:        context.Background(),
   342  	}
   343  }
   344  
   345  // CommandBuilder builds a exec.Cmd for daemon
   346  type CommandBuilder struct {
   347  	cmd  string
   348  	args []string
   349  	env  []string
   350  
   351  	nsOptions []nsOption
   352  
   353  	pause    bool
   354  	localMnt bool
   355  
   356  	identifier *string
   357  	stdin      io.ReadWriteCloser
   358  	stdout     io.ReadWriteCloser
   359  	stderr     io.ReadWriteCloser
   360  
   361  	oomScoreAdj int
   362  
   363  	// the context is used to kill the process and will be passed into
   364  	// `exec.CommandContext`
   365  	ctx context.Context
   366  }
   367  
   368  // GetNsPath returns corresponding namespace path
   369  func GetNsPath(pid uint32, typ NsType) string {
   370  	return fmt.Sprintf("%s/%d/ns/%s", DefaultProcPrefix, pid, string(typ))
   371  }
   372  
   373  // SetEnv sets the environment variables of the process
   374  func (b *CommandBuilder) SetEnv(key, value string) *CommandBuilder {
   375  	b.env = append(b.env, fmt.Sprintf("%s=%s", key, value))
   376  	return b
   377  }
   378  
   379  // SetNS sets the namespace of the process
   380  func (b *CommandBuilder) SetNS(pid uint32, typ NsType) *CommandBuilder {
   381  	return b.SetNSOpt([]nsOption{{
   382  		Typ:  typ,
   383  		Path: GetNsPath(pid, typ),
   384  	}})
   385  }
   386  
   387  // SetNSOpt sets the namespace of the process
   388  func (b *CommandBuilder) SetNSOpt(options []nsOption) *CommandBuilder {
   389  	b.nsOptions = append(b.nsOptions, options...)
   390  
   391  	return b
   392  }
   393  
   394  // SetIdentifier sets the identifier of the process
   395  //
   396  // The identifier is used to identify the process in BPM, to confirm only one identified process is running.
   397  // If one identified process is already running, new processes with the same identifier will be blocked by lock.
   398  func (b *CommandBuilder) SetIdentifier(id string) *CommandBuilder {
   399  	b.identifier = &id
   400  
   401  	return b
   402  }
   403  
   404  // EnablePause enables pause for process
   405  func (b *CommandBuilder) EnablePause() *CommandBuilder {
   406  	b.pause = true
   407  
   408  	return b
   409  }
   410  
   411  func (b *CommandBuilder) EnableLocalMnt() *CommandBuilder {
   412  	b.localMnt = true
   413  
   414  	return b
   415  }
   416  
   417  // SetContext sets context for process
   418  func (b *CommandBuilder) SetContext(ctx context.Context) *CommandBuilder {
   419  	b.ctx = ctx
   420  
   421  	return b
   422  }
   423  
   424  // SetStdin sets stdin for process
   425  func (b *CommandBuilder) SetStdin(stdin io.ReadWriteCloser) *CommandBuilder {
   426  	b.stdin = stdin
   427  
   428  	return b
   429  }
   430  
   431  // SetStdout sets stdout for process
   432  func (b *CommandBuilder) SetStdout(stdout io.ReadWriteCloser) *CommandBuilder {
   433  	b.stdout = stdout
   434  
   435  	return b
   436  }
   437  
   438  // SetStderr sets stderr for process
   439  func (b *CommandBuilder) SetStderr(stderr io.ReadWriteCloser) *CommandBuilder {
   440  	b.stderr = stderr
   441  
   442  	return b
   443  }
   444  
   445  // SetOOMScoreAdj sets the oom_score_adj for a process
   446  // oom_score_adj ranges from -1000 to 1000
   447  func (b *CommandBuilder) SetOOMScoreAdj(scoreAdj int) *CommandBuilder {
   448  	b.oomScoreAdj = scoreAdj
   449  	return b
   450  }
   451  
   452  func (b *CommandBuilder) getLoggerFromContext(ctx context.Context) logr.Logger {
   453  	// this logger is inherited from the global one
   454  	// TODO: replace it with a specific logger by passing in one or creating a new one
   455  	logger := log.L().WithName("background-process-manager.process-builder")
   456  	return log.EnrichLoggerWithContext(ctx, logger)
   457  }
   458  
   459  type nsOption struct {
   460  	Typ  NsType
   461  	Path string
   462  }
   463  
   464  // ManagedCommand is a process which can be managed by backgroundProcessManager
   465  type ManagedCommand struct {
   466  	*exec.Cmd
   467  
   468  	// If the identifier is not nil, process manager should make sure no other
   469  	// process with this identifier is running when executing this command
   470  	Identifier *string
   471  }
   472