...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package tasks
17
18 import (
19 "strconv"
20
21 "github.com/go-logr/logr"
22
23 "github.com/chaos-mesh/chaos-mesh/pkg/cerr"
24 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/util"
25 )
26
27 var ErrNotTypeSysID = cerr.NotType[SysPID]()
28 var ErrNotFoundSysID = cerr.NotFoundType[SysPID]()
29
30 type SysPID uint32
31
32 func (s SysPID) ToID() string {
33 return strconv.FormatUint(uint64(s), 10)
34 }
35
36
37
38
39 type ChaosOnProcessGroup interface {
40 Fork() (ChaosOnProcessGroup, error)
41 Assign
42
43 Injectable
44 Recoverable
45 }
46
47
48 type ProcessGroupHandler struct {
49 LeaderProcess ChaosOnProcessGroup
50 childMap map[IsID]ChaosOnProcessGroup
51 Logger logr.Logger
52 }
53
54 func NewProcessGroupHandler(logger logr.Logger, leader ChaosOnProcessGroup) ProcessGroupHandler {
55 return ProcessGroupHandler{
56 LeaderProcess: leader,
57 childMap: make(map[IsID]ChaosOnProcessGroup),
58 Logger: logr.New(logger.GetSink()),
59 }
60 }
61
62
63
64 func (gp *ProcessGroupHandler) Inject(pid IsID) error {
65 sysPID, ok := pid.(SysPID)
66 if !ok {
67 return ErrNotTypeSysID.WrapInput(pid).Err()
68 }
69
70 err := gp.LeaderProcess.Inject(sysPID)
71 if err != nil {
72 return cerr.FromErr(err).Wrapf("inject leader process: %v", sysPID).Err()
73 }
74
75 childPIDs, err := util.GetChildProcesses(uint32(sysPID), gp.Logger)
76 if err != nil {
77 return cerr.NotFound("child process").WrapErr(err).Err()
78 }
79
80 for _, childPID := range childPIDs {
81 childSysPID := SysPID(childPID)
82 if childProcessChaos, ok := gp.childMap[childSysPID]; ok {
83 err := gp.LeaderProcess.Assign(childProcessChaos)
84 if err != nil {
85 gp.Logger.Error(err, "failed to assign old child process")
86 continue
87 }
88 err = childProcessChaos.Inject(childSysPID)
89 if err != nil {
90 gp.Logger.Error(err, "failed to inject old child process")
91 }
92 } else {
93 childProcessChaos, err := gp.LeaderProcess.Fork()
94 if err != nil {
95 gp.Logger.Error(err, "failed to create child process")
96 continue
97 }
98 err = childProcessChaos.Inject(childSysPID)
99 if err != nil {
100 gp.Logger.Error(err, "failed to inject new child process")
101 continue
102 }
103 gp.childMap[childSysPID] = childProcessChaos
104 }
105 }
106 return nil
107 }
108
109
110 func (gp *ProcessGroupHandler) Recover(pid IsID) error {
111 _, ok := pid.(SysPID)
112 if !ok {
113 return ErrNotTypeSysID.WrapInput(pid).Err()
114 }
115 err := gp.LeaderProcess.Recover(pid)
116 if err != nil {
117 return cerr.FromErr(err).Wrapf("recovery leader process : %v", pid).Err()
118 }
119
120 for childID, group := range gp.childMap {
121 childSysPID, ok := childID.(SysPID)
122 if !ok {
123 gp.Logger.Error(cerr.NotType[SysPID]().WrapInput(childID).Err(),
124 "failed to recover old child process")
125 }
126
127 err := group.Recover(childSysPID)
128 if err != nil {
129 gp.Logger.Error(err, "failed to recover old child process")
130 }
131 }
132 return nil
133 }
134