1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package jvmchaos
17
18 import (
19 "bytes"
20 "context"
21 "fmt"
22 "strings"
23 "text/template"
24
25 "github.com/go-logr/logr"
26 "github.com/pingcap/errors"
27 "go.uber.org/fx"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
32 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
33 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
34 )
35
36 var (
37 errNilDecoder error = errors.New("impl decoder is nil")
38 )
39
40 var _ impltypes.ChaosImpl = (*Impl)(nil)
41
42 const (
43
44 SimpleRuleTemplate = `
45 RULE {{.Name}}
46 CLASS {{.Class}}
47 METHOD {{.Method}}
48 AT ENTRY
49 IF true
50 DO
51 {{.Do}};
52 ENDRULE
53 `
54
55 CompleteRuleTemplate = `
56 RULE {{.Name}}
57 CLASS {{.Class}}
58 METHOD {{.Method}}
59 HELPER {{.Helper}}
60 AT ENTRY
61 BIND {{.Bind}};
62 IF {{.Condition}}
63 DO
64 {{.Do}};
65 ENDRULE
66 `
67
68
69 SQLHelper = "org.chaos_mesh.byteman.helper.SQLHelper"
70 GCHelper = "org.chaos_mesh.byteman.helper.GCHelper"
71 StressHelper = "org.chaos_mesh.byteman.helper.StressHelper"
72
73
74 TriggerClass = "org.chaos_mesh.chaos_agent.TriggerThread"
75 TriggerMethod = "triggerFunc"
76
77 MySQL5InjectClass = "com.mysql.jdbc.MysqlIO"
78 MySQL5InjectMethod = "sqlQueryDirect"
79 MySQL5Exception = "java.sql.SQLException(\"%s\")"
80
81 MySQL8InjectClass = "com.mysql.cj.NativeSession"
82 MySQL8InjectMethod = "execSQL"
83 MySQL8Exception = "com.mysql.cj.exceptions.CJException(\"%s\")"
84 )
85
86
87 type BytemanTemplateSpec struct {
88 Name string
89 Class string
90 Method string
91 Helper string
92 Bind string
93 Condition string
94 Do string
95
96
97 StressType string
98 StressValueName string
99 StressValue string
100 }
101
102 type Impl struct {
103 client.Client
104 Log logr.Logger
105
106 decoder *utils.ContainerRecordDecoder
107 }
108
109
110 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
111 impl.Log.Info("jvm chaos apply", "record", records[index])
112 if impl.decoder == nil {
113 return v1alpha1.NotInjected, errors.WithStack(errNilDecoder)
114 }
115 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
116 if decodedContainer.PbClient != nil {
117 defer func() {
118 err := decodedContainer.PbClient.Close()
119 if err != nil {
120 impl.Log.Error(err, "fail to close pb client")
121 }
122 }()
123 }
124 if err != nil {
125 return v1alpha1.NotInjected, err
126 }
127
128 jvmChaos := obj.(*v1alpha1.JVMChaos)
129 err = generateRuleData(&jvmChaos.Spec)
130 if err != nil {
131 impl.Log.Error(err, "fail to generate rule data")
132
133 return v1alpha1.Injected, err
134 }
135
136 _, err = decodedContainer.PbClient.InstallJVMRules(ctx, &pb.InstallJVMRulesRequest{
137 ContainerId: decodedContainer.ContainerId,
138 Rule: jvmChaos.Spec.RuleData,
139 Port: jvmChaos.Spec.Port,
140 EnterNS: true,
141 })
142 if err != nil {
143 impl.Log.Error(err, "install jvm rules")
144 return v1alpha1.NotInjected, err
145 }
146
147 return v1alpha1.Injected, nil
148 }
149
150
151 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
152 if impl.decoder == nil {
153 return v1alpha1.Injected, errors.WithStack(errNilDecoder)
154 }
155 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
156 if decodedContainer.PbClient != nil {
157 defer func() {
158 err := decodedContainer.PbClient.Close()
159 if err != nil {
160 impl.Log.Error(err, "fail to close pb client")
161 }
162 }()
163 }
164 if err != nil && strings.Contains(err.Error(), "container not found") {
165
166 impl.Log.Error(err, "finding container")
167 return v1alpha1.NotInjected, nil
168 }
169 if err != nil {
170 return v1alpha1.Injected, err
171 }
172
173 jvmChaos := obj.(*v1alpha1.JVMChaos)
174 err = generateRuleData(&jvmChaos.Spec)
175 if err != nil {
176 impl.Log.Error(err, "fail to generate rule data")
177
178 return v1alpha1.Injected, err
179 }
180
181 _, err = decodedContainer.PbClient.UninstallJVMRules(ctx, &pb.UninstallJVMRulesRequest{
182 ContainerId: decodedContainer.ContainerId,
183 Rule: jvmChaos.Spec.RuleData,
184 Port: jvmChaos.Spec.Port,
185 EnterNS: true,
186 })
187 if err != nil && strings.Contains(err.Error(), "Connection refused") {
188
189 impl.Log.Error(err, "uninstall jvm rules (possible restart of jvm process)")
190 return v1alpha1.NotInjected, nil
191 }
192 if err != nil {
193 impl.Log.Error(err, "uninstall jvm rules")
194 return v1alpha1.Injected, err
195 }
196
197 return v1alpha1.NotInjected, nil
198 }
199
200
201 type JVMRuleParameter struct {
202 v1alpha1.JVMParameter
203
204 StressType string
205 StressValue string
206 StressValueName string
207 Do string
208 }
209
210 func generateRuleData(spec *v1alpha1.JVMChaosSpec) error {
211 if len(spec.RuleData) != 0 {
212 return nil
213 }
214
215 bytemanTemplateSpec := BytemanTemplateSpec{
216 Name: spec.Name,
217 Class: spec.Class,
218 Method: spec.Method,
219 }
220
221 switch spec.Action {
222 case v1alpha1.JVMLatencyAction:
223 bytemanTemplateSpec.Do = fmt.Sprintf("Thread.sleep(%d)", spec.LatencyDuration)
224 case v1alpha1.JVMExceptionAction:
225 bytemanTemplateSpec.Do = fmt.Sprintf("throw new %s", spec.ThrowException)
226 case v1alpha1.JVMReturnAction:
227 bytemanTemplateSpec.Do = fmt.Sprintf("return %s", spec.ReturnValue)
228 case v1alpha1.JVMStressAction:
229 bytemanTemplateSpec.Helper = StressHelper
230 bytemanTemplateSpec.Class = TriggerClass
231 bytemanTemplateSpec.Method = TriggerMethod
232
233 bytemanTemplateSpec.Bind = "flag:boolean=true"
234 bytemanTemplateSpec.Condition = "true"
235 if spec.CPUCount > 0 {
236 bytemanTemplateSpec.Do = fmt.Sprintf("injectCPUStress(\"%s\", %d)", spec.Name, spec.CPUCount)
237 } else {
238 bytemanTemplateSpec.Do = fmt.Sprintf("injectMemStress(\"%s\", \"%s\")", spec.Name, spec.MemoryType)
239 }
240 case v1alpha1.JVMGCAction:
241 bytemanTemplateSpec.Helper = GCHelper
242 bytemanTemplateSpec.Class = TriggerClass
243 bytemanTemplateSpec.Method = TriggerMethod
244
245 bytemanTemplateSpec.Bind = "flag:boolean=true"
246 bytemanTemplateSpec.Condition = "true"
247 bytemanTemplateSpec.Do = "gc()"
248 case v1alpha1.JVMMySQLAction:
249 var mysqlException string
250 bytemanTemplateSpec.Helper = SQLHelper
251
252
253
254 bytemanTemplateSpec.Bind = fmt.Sprintf("flag:boolean=matchDBTable(\"\", $2, \"%s\", \"%s\", \"%s\")", spec.Database, spec.Table, spec.SQLType)
255 bytemanTemplateSpec.Condition = "flag"
256 if spec.MySQLConnectorVersion == "5" {
257 bytemanTemplateSpec.Class = MySQL5InjectClass
258 bytemanTemplateSpec.Method = MySQL5InjectMethod
259 mysqlException = MySQL5Exception
260 } else if spec.MySQLConnectorVersion == "8" {
261 bytemanTemplateSpec.Class = MySQL8InjectClass
262 bytemanTemplateSpec.Method = MySQL8InjectMethod
263 mysqlException = MySQL8Exception
264 } else {
265 return errors.Errorf("mysql connector version %s is not supported", spec.MySQLConnectorVersion)
266 }
267
268 if len(spec.ThrowException) > 0 {
269 exception := fmt.Sprintf(mysqlException, spec.ThrowException)
270 bytemanTemplateSpec.Do = fmt.Sprintf("throw new %s", exception)
271 } else if spec.LatencyDuration > 0 {
272 bytemanTemplateSpec.Do = fmt.Sprintf("Thread.sleep(%d)", spec.LatencyDuration)
273 }
274 }
275
276 buf := new(bytes.Buffer)
277 var t *template.Template
278 switch spec.Action {
279 case v1alpha1.JVMStressAction, v1alpha1.JVMGCAction, v1alpha1.JVMMySQLAction:
280 t = template.Must(template.New("byteman rule").Parse(CompleteRuleTemplate))
281 case v1alpha1.JVMExceptionAction, v1alpha1.JVMLatencyAction, v1alpha1.JVMReturnAction:
282 t = template.Must(template.New("byteman rule").Parse(SimpleRuleTemplate))
283 default:
284 return errors.Errorf("jvm action %s not supported", spec.Action)
285 }
286 if t == nil {
287 return errors.Errorf("parse byeman rule template failed")
288 }
289 err := t.Execute(buf, bytemanTemplateSpec)
290 if err != nil {
291 return err
292 }
293
294 spec.RuleData = buf.String()
295 return nil
296 }
297
298
299 func NewImpl(c client.Client, decoder *utils.ContainerRecordDecoder, log logr.Logger) *impltypes.ChaosImplPair {
300 return &impltypes.ChaosImplPair{
301 Name: "jvmchaos",
302 Object: &v1alpha1.JVMChaos{},
303 Impl: &Impl{
304 Client: c,
305 Log: log.WithName("jvmchaos"),
306 decoder: decoder,
307 },
308 }
309 }
310
311 var Module = fx.Provide(
312 fx.Annotated{
313 Group: "impl",
314 Target: NewImpl,
315 },
316 )
317