1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package stresschaos
17
18 import (
19 "context"
20 "time"
21
22 "github.com/go-logr/logr"
23 "github.com/pkg/errors"
24 "go.uber.org/fx"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "sigs.k8s.io/controller-runtime/pkg/client"
27
28 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
29 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
30 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
31 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
32 )
33
34 var _ impltypes.ChaosImpl = (*Impl)(nil)
35
36 type Impl struct {
37 client.Client
38
39 Log logr.Logger
40
41 decoder *utils.ContainerRecordDecoder
42 }
43
44 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
45 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
46 pbClient := decodedContainer.PbClient
47 containerId := decodedContainer.ContainerId
48 if pbClient != nil {
49 defer pbClient.Close()
50 }
51 if err != nil {
52 return v1alpha1.NotInjected, err
53 }
54
55 stresschaos := obj.(*v1alpha1.StressChaos)
56 if stresschaos.Status.Instances == nil {
57 stresschaos.Status.Instances = make(map[string]v1alpha1.StressInstance)
58 }
59 _, ok := stresschaos.Status.Instances[records[index].Id]
60 if ok {
61 impl.Log.Info("an stress-ng instance is running for this pod")
62 return v1alpha1.Injected, nil
63 }
64
65 stressors := stresschaos.Spec.StressngStressors
66 cpuStressors := ""
67 memoryStressors := ""
68 if len(stressors) == 0 {
69 cpuStressors, memoryStressors, err = stresschaos.Spec.Stressors.Normalize()
70 if err != nil {
71 impl.Log.Info("fail to ")
72
73 return v1alpha1.NotInjected, err
74 }
75 }
76
77 req := pb.ExecStressRequest{
78 Scope: pb.ExecStressRequest_CONTAINER,
79 Target: containerId,
80 CpuStressors: cpuStressors,
81 MemoryStressors: memoryStressors,
82 EnterNS: true,
83 }
84 if stresschaos.Spec.Stressors.MemoryStressor != nil {
85 req.OomScoreAdj = int32(stresschaos.Spec.Stressors.MemoryStressor.OOMScoreAdj)
86 }
87 res, err := pbClient.ExecStressors(ctx, &req)
88
89 if err != nil {
90 return v1alpha1.NotInjected, err
91 }
92
93 stresschaos.Status.Instances[records[index].Id] = v1alpha1.StressInstance{
94 UID: res.CpuInstance,
95 StartTime: &metav1.Time{
96 Time: time.Unix(res.CpuStartTime/1000, (res.CpuStartTime%1000)*int64(time.Millisecond)),
97 },
98 MemoryUID: res.MemoryInstance,
99 MemoryStartTime: &metav1.Time{
100 Time: time.Unix(res.MemoryStartTime/1000, (res.MemoryStartTime%1000)*int64(time.Millisecond)),
101 },
102 }
103
104 return v1alpha1.Injected, nil
105 }
106
107 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
108 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
109 pbClient := decodedContainer.PbClient
110 if pbClient != nil {
111 defer pbClient.Close()
112 }
113 if err != nil {
114 if errors.Is(err, utils.ErrContainerNotFound) {
115
116 return v1alpha1.NotInjected, nil
117 }
118 return v1alpha1.Injected, err
119 }
120
121 stresschaos := obj.(*v1alpha1.StressChaos)
122 if stresschaos.Status.Instances == nil {
123 return v1alpha1.NotInjected, nil
124 }
125 instance, ok := stresschaos.Status.Instances[records[index].Id]
126 if !ok {
127 impl.Log.Info("Pod seems already recovered", "pod", decodedContainer.Pod.UID)
128 return v1alpha1.NotInjected, nil
129 }
130 req := &pb.CancelStressRequest{
131 CpuInstance: instance.UID,
132 MemoryInstance: instance.MemoryUID,
133 }
134 if instance.StartTime != nil {
135 req.CpuStartTime = instance.StartTime.UnixNano() / int64(time.Millisecond)
136 }
137 if instance.MemoryStartTime != nil {
138 req.MemoryStartTime = instance.MemoryStartTime.UnixNano() / int64(time.Millisecond)
139 }
140 if _, err = pbClient.CancelStressors(ctx, req); err != nil {
141 impl.Log.Error(err, "cancel stressors")
142 return v1alpha1.Injected, nil
143 }
144 delete(stresschaos.Status.Instances, records[index].Id)
145 return v1alpha1.NotInjected, nil
146 }
147
148 func NewImpl(c client.Client, log logr.Logger, decoder *utils.ContainerRecordDecoder) *impltypes.ChaosImplPair {
149 return &impltypes.ChaosImplPair{
150 Name: "stresschaos",
151 Object: &v1alpha1.StressChaos{},
152 Impl: &Impl{
153 Client: c,
154 Log: log.WithName("stresschaos"),
155 decoder: decoder,
156 },
157 }
158 }
159
160 var Module = fx.Provide(
161 fx.Annotated{
162 Group: "impl",
163 Target: NewImpl,
164 },
165 )
166