1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package timechaos
17
18 import (
19 "context"
20 "fmt"
21 "time"
22
23 "github.com/go-logr/logr"
24 "github.com/pkg/errors"
25 "go.uber.org/fx"
26 "sigs.k8s.io/controller-runtime/pkg/client"
27
28 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
29 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
30 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
31 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
32 timeUtils "github.com/chaos-mesh/chaos-mesh/pkg/time/utils"
33 )
34
35 var _ impltypes.ChaosImpl = (*Impl)(nil)
36
37 type Impl struct {
38 client.Client
39 Log logr.Logger
40 decoder *utils.ContainerRecordDecoder
41 }
42
43 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
44 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
45 pbClient := decodedContainer.PbClient
46 containerId := decodedContainer.ContainerId
47 if pbClient != nil {
48 defer pbClient.Close()
49 }
50 if err != nil {
51 return v1alpha1.NotInjected, err
52 }
53
54 timechaos := obj.(*v1alpha1.TimeChaos)
55 mask, err := timeUtils.EncodeClkIds(timechaos.Spec.ClockIds)
56 if err != nil {
57 return v1alpha1.NotInjected, err
58 }
59
60 duration, err := time.ParseDuration(timechaos.Spec.TimeOffset)
61 if err != nil {
62 return v1alpha1.NotInjected, err
63 }
64
65 sec, nsec := secAndNSecFromDuration(duration)
66
67 impl.Log.Info("setting time shift", "mask", mask, "sec", sec, "nsec", nsec, "containerId", containerId)
68 _, err = pbClient.SetTimeOffset(ctx, &pb.TimeRequest{
69 ContainerId: containerId,
70 Sec: sec,
71 Nsec: nsec,
72 ClkIdsMask: mask,
73 Uid: string(obj.GetUID()) + string(decodedContainer.Pod.GetUID()),
74 PodContainerName: fmt.Sprintf("%s:%s", decodedContainer.Pod.GetUID(), decodedContainer.ContainerName),
75 })
76 if err != nil {
77 return v1alpha1.NotInjected, err
78 }
79
80 return v1alpha1.Injected, nil
81 }
82
83 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
84 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
85 pbClient := decodedContainer.PbClient
86 containerId := decodedContainer.ContainerId
87 if pbClient != nil {
88 defer pbClient.Close()
89 }
90 if err != nil {
91 if errors.Is(err, utils.ErrContainerNotFound) {
92
93 return v1alpha1.NotInjected, nil
94 }
95 return v1alpha1.Injected, err
96 }
97
98 impl.Log.Info("recover for container", "containerId", containerId)
99 _, err = pbClient.RecoverTimeOffset(ctx, &pb.TimeRequest{
100 ContainerId: containerId,
101 Uid: string(obj.GetUID()) + string(decodedContainer.Pod.GetUID()),
102 PodContainerName: fmt.Sprintf("%s:%s", decodedContainer.Pod.GetUID(), decodedContainer.ContainerName),
103 })
104 if err != nil {
105 return v1alpha1.Injected, err
106 }
107
108 return v1alpha1.NotInjected, nil
109 }
110
111 func secAndNSecFromDuration(duration time.Duration) (sec int64, nsec int64) {
112 sec = duration.Nanoseconds() / 1e9
113 nsec = duration.Nanoseconds() - (sec * 1e9)
114
115 return
116 }
117
118 func NewImpl(c client.Client, log logr.Logger, decoder *utils.ContainerRecordDecoder) *impltypes.ChaosImplPair {
119 return &impltypes.ChaosImplPair{
120 Name: "timechaos",
121 Object: &v1alpha1.TimeChaos{},
122 Impl: &Impl{
123 Client: c,
124 Log: log.WithName("timechaos"),
125 decoder: decoder,
126 },
127 }
128 }
129
130 var Module = fx.Provide(
131 fx.Annotated{
132 Group: "impl",
133 Target: NewImpl,
134 },
135 )
136