1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package physicalmachinechaos
17
18 import (
19 "bytes"
20 "context"
21 "crypto/tls"
22 "crypto/x509"
23 "encoding/json"
24 "fmt"
25 "io"
26 "net/http"
27 "os"
28 "strings"
29 "time"
30
31 "github.com/go-logr/logr"
32 "github.com/pkg/errors"
33 "go.uber.org/fx"
34 "sigs.k8s.io/controller-runtime/pkg/client"
35
36 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
37 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
38 "github.com/chaos-mesh/chaos-mesh/controllers/config"
39 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
40 )
41
42 var _ impltypes.ChaosImpl = (*Impl)(nil)
43
44 type Impl struct {
45 client.Client
46 Log logr.Logger
47 }
48
49 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
50 impl.Log.Info("apply physical machine chaos")
51
52 physicalMachineChaos := obj.(*v1alpha1.PhysicalMachineChaos)
53 var address string
54
55
56
57
58
59 if len(physicalMachineChaos.Spec.Address) > 0 {
60 address = records[index].Id
61 } else {
62 var physicalMachine v1alpha1.PhysicalMachine
63 namespacedName, err := controller.ParseNamespacedName(records[index].Id)
64 if err != nil {
65 return v1alpha1.NotInjected, err
66 }
67 err = impl.Get(ctx, namespacedName, &physicalMachine)
68 if err != nil {
69
70 return v1alpha1.NotInjected, err
71 }
72 address = physicalMachine.Spec.Address
73 }
74
75
76
77 actions := strings.SplitN(string(physicalMachineChaos.Spec.Action), "-", 2)
78 if len(actions) == 1 {
79 actions = append(actions, "")
80 } else if len(actions) != 2 {
81 err := errors.New("action invalid")
82 return v1alpha1.NotInjected, err
83 }
84 action, subAction := actions[0], actions[1]
85 physicalMachineChaos.Spec.ExpInfo.Action = subAction
86
87
101 var expInfoMap map[string]interface{}
102 expInfoBytes, _ := json.Marshal(physicalMachineChaos.Spec.ExpInfo)
103 err := json.Unmarshal(expInfoBytes, &expInfoMap)
104 if err != nil {
105 impl.Log.Error(err, "fail to unmarshal experiment info")
106 return v1alpha1.NotInjected, err
107 }
108 configKV, ok := expInfoMap[string(physicalMachineChaos.Spec.Action)].(map[string]interface{})
109 if !ok {
110 err = errors.New("transform action config to map failed")
111 impl.Log.Error(err, "")
112 return v1alpha1.NotInjected, err
113 }
114 delete(expInfoMap, string(physicalMachineChaos.Spec.Action))
115 for k, v := range configKV {
116 expInfoMap[k] = v
117 }
118
119 expInfoBytes, err = json.Marshal(expInfoMap)
120 if err != nil {
121 impl.Log.Error(err, "fail to marshal experiment info")
122 return v1alpha1.NotInjected, err
123 }
124
125 url := fmt.Sprintf("%s/api/attack/%s", address, action)
126 impl.Log.Info("HTTP request", "address", address, "data", string(expInfoBytes))
127
128 statusCode, body, err := impl.doHttpRequest("POST", url, bytes.NewBuffer(expInfoBytes))
129 if err != nil {
130 return v1alpha1.NotInjected, errors.Wrap(err, body)
131 }
132
133 if statusCode != http.StatusOK {
134 err = errors.New("HTTP status is not OK")
135 impl.Log.Error(err, body)
136 return v1alpha1.NotInjected, errors.Wrap(err, body)
137 }
138
139 return v1alpha1.Injected, nil
140 }
141
142 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
143 impl.Log.Info("recover physical machine chaos")
144
145 physicalMachineChaos := obj.(*v1alpha1.PhysicalMachineChaos)
146 var address string
147 if len(physicalMachineChaos.Spec.Address) > 0 {
148 address = records[index].Id
149 } else {
150 var physicalMachine v1alpha1.PhysicalMachine
151 namespacedName, err := controller.ParseNamespacedName(records[index].Id)
152 if err != nil {
153 return v1alpha1.Injected, err
154 }
155 err = impl.Get(ctx, namespacedName, &physicalMachine)
156 if err != nil {
157
158 return v1alpha1.Injected, err
159 }
160 address = physicalMachine.Spec.Address
161 }
162
163 url := fmt.Sprintf("%s/api/attack/%s", address, physicalMachineChaos.Spec.ExpInfo.UID)
164 statusCode, body, err := impl.doHttpRequest("DELETE", url, nil)
165 if err != nil {
166 return v1alpha1.Injected, errors.Wrap(err, body)
167 }
168
169 if statusCode == http.StatusNotFound {
170 impl.Log.Info("experiment not found", "uid", physicalMachineChaos.Spec.ExpInfo.UID)
171 } else if statusCode != http.StatusOK {
172 err = errors.New("HTTP status is not OK")
173 impl.Log.Error(err, body)
174 return v1alpha1.Injected, errors.Wrap(err, body)
175 }
176
177 return v1alpha1.NotInjected, nil
178 }
179
180 func (impl *Impl) doHttpRequest(method, url string, data io.Reader) (int, string, error) {
181 req, err := http.NewRequest(method, url, data)
182 if err != nil {
183 impl.Log.Error(err, "fail to generate HTTP request")
184 return 0, "", err
185 }
186 req.Header.Set("Content-Type", "application/json")
187
188 var httpClient *http.Client
189 if config.ControllerCfg.ChaosdSecurityMode {
190 httpClient, err = securityHTTPClient(url)
191 if err != nil {
192 impl.Log.Error(err, "generate HTTPS client")
193 return 0, "", err
194 }
195 } else {
196 httpClient = &http.Client{Timeout: 5 * time.Second}
197 }
198
199 resp, err := httpClient.Do(req)
200 if err != nil {
201 impl.Log.Error(err, "do HTTP request")
202 return 0, "", err
203 }
204 defer resp.Body.Close()
205
206 body, err := io.ReadAll(resp.Body)
207 if err != nil {
208 return 0, "", err
209 }
210 impl.Log.Info("HTTP response", "url", url, "status", resp.Status, "body", string(body))
211
212 return resp.StatusCode, string(body), nil
213 }
214
215 func securityHTTPClient(url string) (*http.Client, error) {
216 if !strings.Contains(url, "https") {
217 return nil, errors.Errorf("a secure url should begin with `https` rather than `http`, url: %s", url)
218 }
219
220 pair, err := tls.LoadX509KeyPair(config.ControllerCfg.ChaosdClientCert, config.ControllerCfg.ChaosdClientKey)
221 if err != nil {
222 return nil, errors.Wrap(err, "load x509 key pair failed")
223 }
224
225 pool := x509.NewCertPool()
226 ca, err := os.ReadFile(config.ControllerCfg.ChaosdCACert)
227 if err != nil {
228 return nil, errors.Wrap(err, "read ChaosdCACert file failed")
229 }
230 pool.AppendCertsFromPEM(ca)
231
232 return &http.Client{
233 Transport: &http.Transport{
234 TLSClientConfig: &tls.Config{
235 RootCAs: pool,
236 Certificates: []tls.Certificate{pair},
237 ServerName: "chaosd.chaos-mesh.org",
238 },
239 },
240 Timeout: 5 * time.Second,
241 }, nil
242 }
243
244 func NewImpl(c client.Client, log logr.Logger) *impltypes.ChaosImplPair {
245 return &impltypes.ChaosImplPair{
246 Name: "physicalmachinechaos",
247 Object: &v1alpha1.PhysicalMachineChaos{},
248 Impl: &Impl{
249 Client: c,
250 Log: log.WithName("physicalmachinechaos"),
251 },
252 }
253 }
254
255 var Module = fx.Provide(
256 fx.Annotated{
257 Group: "impl",
258 Target: NewImpl,
259 },
260 )
261