1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package stresschaos
17
18 import (
19 "context"
20 "time"
21
22 "github.com/go-logr/logr"
23 "go.uber.org/fx"
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "sigs.k8s.io/controller-runtime/pkg/client"
26
27 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
28 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
29 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
30 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
31 )
32
33 var _ impltypes.ChaosImpl = (*Impl)(nil)
34
35 type Impl struct {
36 client.Client
37
38 Log logr.Logger
39
40 decoder *utils.ContainerRecordDecoder
41 }
42
43 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
44 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index])
45 pbClient := decodedContainer.PbClient
46 containerId := decodedContainer.ContainerId
47 if pbClient != nil {
48 defer pbClient.Close()
49 }
50 if err != nil {
51 return v1alpha1.NotInjected, err
52 }
53
54 stresschaos := obj.(*v1alpha1.StressChaos)
55 if stresschaos.Status.Instances == nil {
56 stresschaos.Status.Instances = make(map[string]v1alpha1.StressInstance)
57 }
58 _, ok := stresschaos.Status.Instances[records[index].Id]
59 if ok {
60 impl.Log.Info("an stress-ng instance is running for this pod")
61 return v1alpha1.Injected, nil
62 }
63
64 stressors := stresschaos.Spec.StressngStressors
65 if len(stressors) == 0 {
66 stressors, err = stresschaos.Spec.Stressors.Normalize()
67 if err != nil {
68 impl.Log.Info("fail to ")
69
70 return v1alpha1.NotInjected, err
71 }
72 }
73 res, err := pbClient.ExecStressors(ctx, &pb.ExecStressRequest{
74 Scope: pb.ExecStressRequest_CONTAINER,
75 Target: containerId,
76 Stressors: stressors,
77 EnterNS: true,
78 })
79 if err != nil {
80 return v1alpha1.NotInjected, err
81 }
82
83
84 stresschaos.Status.Instances[records[index].Id] = v1alpha1.StressInstance{
85 UID: res.Instance,
86 StartTime: &metav1.Time{
87 Time: time.Unix(res.StartTime/1000, (res.StartTime%1000)*int64(time.Millisecond)),
88 },
89 }
90
91 return v1alpha1.Injected, nil
92 }
93
94 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
95 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index])
96 pbClient := decodedContainer.PbClient
97 if pbClient != nil {
98 defer pbClient.Close()
99 }
100 if err != nil {
101 if utils.IsFailToGet(err) {
102
103 return v1alpha1.NotInjected, nil
104 }
105 return v1alpha1.Injected, err
106 }
107
108 stresschaos := obj.(*v1alpha1.StressChaos)
109 if stresschaos.Status.Instances == nil {
110 return v1alpha1.NotInjected, nil
111 }
112 instance, ok := stresschaos.Status.Instances[records[index].Id]
113 if !ok {
114 impl.Log.Info("Pod seems already recovered", "pod", decodedContainer.Pod.UID)
115 return v1alpha1.NotInjected, nil
116 }
117 if _, err = pbClient.CancelStressors(ctx, &pb.CancelStressRequest{
118 Instance: instance.UID,
119 StartTime: instance.StartTime.UnixNano() / int64(time.Millisecond),
120 }); err != nil {
121
122 return v1alpha1.Injected, nil
123 }
124 delete(stresschaos.Status.Instances, records[index].Id)
125 return v1alpha1.NotInjected, nil
126 }
127
128 func NewImpl(c client.Client, log logr.Logger, decoder *utils.ContainerRecordDecoder) *impltypes.ChaosImplPair {
129 return &impltypes.ChaosImplPair{
130 Name: "stresschaos",
131 Object: &v1alpha1.StressChaos{},
132 Impl: &Impl{
133 Client: c,
134 Log: log.WithName("stresschaos"),
135 decoder: decoder,
136 },
137 }
138 }
139
140 var Module = fx.Provide(
141 fx.Annotated{
142 Group: "impl",
143 Target: NewImpl,
144 },
145 )
146