...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package chaosdaemon
17
18 import (
19 "context"
20
21 "github.com/go-logr/logr"
22 "github.com/golang/protobuf/ptypes/empty"
23 "github.com/pkg/errors"
24
25 "github.com/chaos-mesh/chaos-mesh/pkg/cerr"
26 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
27 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/tasks"
28 "github.com/chaos-mesh/chaos-mesh/pkg/time"
29 )
30
31 type TimeChaosServer struct {
32 podContainerNameProcessMap tasks.PodContainerNameProcessMap
33 manager tasks.TaskManager
34
35 nameLocker tasks.LockMap[tasks.PodContainerName]
36 logger logr.Logger
37 }
38
39 func (s *TimeChaosServer) SetPodContainerNameProcess(idName tasks.PodContainerName, sysID tasks.SysPID) {
40 s.podContainerNameProcessMap.Write(idName, sysID)
41 }
42
43 func (s *TimeChaosServer) DelPodContainerNameProcess(idName tasks.PodContainerName) {
44 s.podContainerNameProcessMap.Delete(idName)
45 }
46
47 func (s *TimeChaosServer) SetTimeOffset(uid tasks.TaskID, id tasks.PodContainerName, config time.Config) error {
48 paras := time.ConfigCreatorParas{
49 Logger: s.logger,
50 Config: config,
51 PodProcessMap: &s.podContainerNameProcessMap,
52 }
53
54 unlock := s.nameLocker.Lock(id)
55 defer unlock()
56
57
58
59
60
61 err := s.manager.Create(uid, id, &config, paras)
62 if err != nil {
63 if errors.Cause(err) == cerr.ErrDuplicateEntity {
64 err := s.manager.Apply(uid, id, &config)
65 if err != nil {
66 return err
67 }
68 } else {
69 return err
70 }
71 }
72 return nil
73 }
74
75 func (s *DaemonServer) SetTimeOffset(ctx context.Context, req *pb.TimeRequest) (*empty.Empty, error) {
76 logger := s.timeChaosServer.logger
77
78 logger.Info("Shift time", "Request", req)
79
80 pid, err := s.crClient.GetPidFromContainerID(ctx, req.ContainerId)
81 if err != nil {
82 logger.Error(err, "error while getting IsID")
83 return nil, err
84 }
85
86 s.timeChaosServer.SetPodContainerNameProcess(tasks.PodContainerName(req.PodContainerName), tasks.SysPID(pid))
87 err = s.timeChaosServer.SetTimeOffset(req.Uid, tasks.PodContainerName(req.PodContainerName),
88 time.NewConfig(req.Sec, req.Nsec, req.ClkIdsMask))
89 if err != nil {
90 logger.Error(err, "error while applying chaos")
91 return nil, err
92 }
93 return &empty.Empty{}, nil
94 }
95
96 func (s *DaemonServer) RecoverTimeOffset(ctx context.Context, req *pb.TimeRequest) (*empty.Empty, error) {
97 logger := s.timeChaosServer.logger
98
99 logger.Info("Recover time", "Request", req)
100
101 pid, err := s.crClient.GetPidFromContainerID(ctx, req.ContainerId)
102 if err != nil {
103 logger.Error(err, "error while getting IsID")
104 return nil, err
105 }
106
107 nameID := tasks.PodContainerName(req.PodContainerName)
108
109 s.timeChaosServer.SetPodContainerNameProcess(nameID, tasks.SysPID(pid))
110
111 unlock := s.timeChaosServer.nameLocker.Lock(nameID)
112 defer unlock()
113
114 err = s.timeChaosServer.manager.Recover(req.Uid, nameID)
115 if err != nil {
116 logger.Error(err, "error while recovering chaos")
117 return nil, err
118 }
119
120 if len(s.timeChaosServer.manager.GetUIDsWithPID(nameID)) == 0 {
121 s.timeChaosServer.DelPodContainerNameProcess(nameID)
122 s.timeChaosServer.nameLocker.Del(nameID)
123 }
124
125 return &empty.Empty{}, nil
126 }
127