1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package blockchaos
17
18 import (
19 "context"
20 "strconv"
21 "time"
22
23 "github.com/go-logr/logr"
24 "github.com/pkg/errors"
25 "go.uber.org/fx"
26 "sigs.k8s.io/controller-runtime/pkg/client"
27
28 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
29 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
30 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
31 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
32 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
33 )
34
35 var _ impltypes.ChaosImpl = (*Impl)(nil)
36
37 type Impl struct {
38 client.Client
39 Log logr.Logger
40
41 decoder *utils.ContainerRecordDecoder
42 }
43
44 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
45 impl.Log.Info("blockchaos apply", "record", records[index])
46
47 _, _, volumePath, err := controller.ParseNamespacedNameContainerVolumePath(records[index].Id)
48 if err != nil {
49 return v1alpha1.NotInjected, errors.Wrapf(err, "parse container and volumePath %s", records[index].Id)
50 }
51
52 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
53 pbClient := decodedContainer.PbClient
54 containerId := decodedContainer.ContainerId
55 if pbClient != nil {
56 defer pbClient.Close()
57 }
58 if err != nil {
59 return v1alpha1.NotInjected, err
60 }
61
62 blockchaos := obj.(*v1alpha1.BlockChaos)
63 if blockchaos.Status.InjectionIds == nil {
64 blockchaos.Status.InjectionIds = make(map[string]int)
65 }
66 _, ok := blockchaos.Status.InjectionIds[records[index].Id]
67 if ok {
68 impl.Log.Info("the blockchaos has already been injected")
69 return v1alpha1.Injected, nil
70 }
71
72 var res *pb.ApplyBlockChaosResponse
73 if blockchaos.Spec.Action == v1alpha1.BlockDelay {
74 delay, err := time.ParseDuration(blockchaos.Spec.Delay.Latency)
75 if err != nil {
76 return v1alpha1.NotInjected, errors.Wrapf(err, "parse latency: %s", blockchaos.Spec.Delay.Latency)
77 }
78
79 corr, err := strconv.ParseFloat(blockchaos.Spec.Delay.Correlation, 64)
80 if err != nil {
81 return v1alpha1.NotInjected, errors.Wrapf(err, "parse corr: %s", blockchaos.Spec.Delay.Correlation)
82 }
83
84 jitter, err := time.ParseDuration(blockchaos.Spec.Delay.Jitter)
85 if err != nil {
86 return v1alpha1.NotInjected, errors.Wrapf(err, "parse jitter: %s", blockchaos.Spec.Delay.Jitter)
87 }
88
89 res, err = pbClient.ApplyBlockChaos(ctx, &pb.ApplyBlockChaosRequest{
90 ContainerId: containerId,
91 VolumePath: volumePath,
92 Action: pb.ApplyBlockChaosRequest_Delay,
93 Delay: &pb.BlockDelaySpec{
94 Delay: delay.Nanoseconds(),
95 Correlation: corr,
96 Jitter: jitter.Nanoseconds(),
97 },
98 EnterNS: true,
99 })
100
101 if err != nil {
102 return v1alpha1.NotInjected, err
103 }
104 } else {
105 return v1alpha1.NotInjected, utils.ErrUnknownAction
106 }
107
108 blockchaos.Status.InjectionIds[records[index].Id] = int(res.InjectionId)
109
110 return v1alpha1.Injected, nil
111 }
112
113 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
114 impl.Log.Info("blockchaos recover", "record", records[index])
115
116 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
117 pbClient := decodedContainer.PbClient
118 if pbClient != nil {
119 defer pbClient.Close()
120 }
121 if err != nil {
122 if errors.Is(err, utils.ErrContainerNotFound) {
123
124 return v1alpha1.NotInjected, nil
125 }
126 return v1alpha1.Injected, err
127 }
128
129 blockchaos := obj.(*v1alpha1.BlockChaos)
130 if blockchaos.Status.InjectionIds == nil {
131 blockchaos.Status.InjectionIds = make(map[string]int)
132 }
133 injection_id, ok := blockchaos.Status.InjectionIds[records[index].Id]
134 if !ok {
135 impl.Log.Info("the blockchaos has already been recovered")
136 return v1alpha1.NotInjected, nil
137 }
138
139 if _, err = pbClient.RecoverBlockChaos(ctx, &pb.RecoverBlockChaosRequest{
140 InjectionId: int32(injection_id),
141 }); err != nil {
142
143 return v1alpha1.Injected, err
144 }
145 delete(blockchaos.Status.InjectionIds, records[index].Id)
146 return v1alpha1.NotInjected, nil
147 }
148
149 func NewImpl(c client.Client, log logr.Logger, decoder *utils.ContainerRecordDecoder) *impltypes.ChaosImplPair {
150 return &impltypes.ChaosImplPair{
151 Name: "blockchaos",
152 Object: &v1alpha1.BlockChaos{},
153 Impl: &Impl{
154 Client: c,
155 Log: log.WithName("blockchaos"),
156 decoder: decoder,
157 },
158 }
159 }
160
161 var Module = fx.Provide(
162 fx.Annotated{
163 Group: "impl",
164 Target: NewImpl,
165 },
166 )
167