...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/physicalmachinechaos/impl.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/physicalmachinechaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package physicalmachinechaos
    17  
    18  import (
    19  	"bytes"
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"io"
    24  	"net/http"
    25  	"strings"
    26  
    27  	"github.com/go-logr/logr"
    28  	"github.com/pkg/errors"
    29  	"go.uber.org/fx"
    30  	"sigs.k8s.io/controller-runtime/pkg/client"
    31  
    32  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    33  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    34  )
    35  
    36  var _ impltypes.ChaosImpl = (*Impl)(nil)
    37  
    38  type Impl struct {
    39  	client.Client
    40  	Log logr.Logger
    41  }
    42  
    43  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    44  	impl.Log.Info("apply physical machine chaos")
    45  
    46  	physicalMachinechaos := obj.(*v1alpha1.PhysicalMachineChaos)
    47  	address := records[index].Id
    48  
    49  	// for example, physicalMachinechaos.Spec.Action is 'network-delay', action is 'network', subAction is 'delay'
    50  	// notice: 'process' and 'clock' action has no subAction, set subAction to ""
    51  	actions := strings.SplitN(string(physicalMachinechaos.Spec.Action), "-", 2)
    52  	if len(actions) == 1 {
    53  		actions = append(actions, "")
    54  	} else if len(actions) != 2 {
    55  		err := errors.New("action invalid")
    56  		return v1alpha1.NotInjected, err
    57  	}
    58  	action, subAction := actions[0], actions[1]
    59  	physicalMachinechaos.Spec.ExpInfo.Action = subAction
    60  
    61  	/*
    62  		transform ExpInfo in PhysicalMachineChaos to json data required by chaosd
    63  		for example:
    64  		    ExpInfo: &ExpInfo {
    65  			    UID: "123",
    66  				Action: "cpu",
    67  				StressCPU: &StressCPU {
    68  					Load: 1,
    69  					Workers: 1,
    70  				}
    71  			}
    72  
    73  			transform to json data: "{\"uid\":\"123\",\"action\":\"cpu\",\"load\":1,\"workers\":1}
    74  	*/
    75  	var expInfoMap map[string]interface{}
    76  	expInfoBytes, _ := json.Marshal(physicalMachinechaos.Spec.ExpInfo)
    77  	err := json.Unmarshal(expInfoBytes, &expInfoMap)
    78  	if err != nil {
    79  		impl.Log.Error(err, "fail to unmarshal experiment info")
    80  		return v1alpha1.NotInjected, err
    81  	}
    82  	configKV, ok := expInfoMap[string(physicalMachinechaos.Spec.Action)].(map[string]interface{})
    83  	if !ok {
    84  		err = errors.New("transform action config to map failed")
    85  		impl.Log.Error(err, "")
    86  		return v1alpha1.NotInjected, err
    87  	}
    88  	delete(expInfoMap, string(physicalMachinechaos.Spec.Action))
    89  	for k, v := range configKV {
    90  		expInfoMap[k] = v
    91  	}
    92  
    93  	expInfoBytes, err = json.Marshal(expInfoMap)
    94  	if err != nil {
    95  		impl.Log.Error(err, "fail to marshal experiment info")
    96  		return v1alpha1.NotInjected, err
    97  	}
    98  
    99  	url := fmt.Sprintf("%s/api/attack/%s", address, action)
   100  	impl.Log.Info("HTTP request", "address", address, "data", string(expInfoBytes))
   101  
   102  	statusCode, body, err := impl.doHttpRequest("POST", url, bytes.NewBuffer(expInfoBytes))
   103  	if err != nil {
   104  		return v1alpha1.NotInjected, errors.Wrap(err, body)
   105  	}
   106  
   107  	if statusCode != http.StatusOK {
   108  		err = errors.New("HTTP status is not OK")
   109  		impl.Log.Error(err, body)
   110  		return v1alpha1.NotInjected, errors.Wrap(err, body)
   111  	}
   112  
   113  	return v1alpha1.Injected, nil
   114  }
   115  
   116  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   117  	impl.Log.Info("recover physical machine chaos")
   118  
   119  	physicalMachinechaos := obj.(*v1alpha1.PhysicalMachineChaos)
   120  	address := records[index].Id
   121  
   122  	url := fmt.Sprintf("%s/api/attack/%s", address, physicalMachinechaos.Spec.ExpInfo.UID)
   123  	statusCode, body, err := impl.doHttpRequest("DELETE", url, nil)
   124  	if err != nil {
   125  		return v1alpha1.Injected, errors.Wrap(err, body)
   126  	}
   127  
   128  	if statusCode == http.StatusNotFound {
   129  		impl.Log.Info("experiment not found", "uid", physicalMachinechaos.Spec.ExpInfo.UID)
   130  	} else if statusCode != http.StatusOK {
   131  		err = errors.New("HTTP status is not OK")
   132  		impl.Log.Error(err, body)
   133  		return v1alpha1.Injected, errors.Wrap(err, body)
   134  	}
   135  
   136  	return v1alpha1.NotInjected, nil
   137  }
   138  
   139  func (impl *Impl) doHttpRequest(method, url string, data io.Reader) (int, string, error) {
   140  	req, err := http.NewRequest(method, url, data)
   141  	if err != nil {
   142  		impl.Log.Error(err, "fail to generate HTTP request")
   143  		return 0, "", err
   144  	}
   145  	req.Header.Set("Content-Type", "application/json")
   146  
   147  	httpClient := &http.Client{}
   148  	resp, err := httpClient.Do(req)
   149  	if err != nil {
   150  		impl.Log.Error(err, "do HTTP request")
   151  		return 0, "", err
   152  	}
   153  	defer resp.Body.Close()
   154  
   155  	body, err := io.ReadAll(resp.Body)
   156  	if err != nil {
   157  		return 0, "", err
   158  	}
   159  	impl.Log.Info("HTTP response", "url", url, "status", resp.Status, "body", string(body))
   160  
   161  	return resp.StatusCode, string(body), nil
   162  }
   163  
   164  func NewImpl(c client.Client, log logr.Logger) *impltypes.ChaosImplPair {
   165  	return &impltypes.ChaosImplPair{
   166  		Name:   "physicalmachinechaos",
   167  		Object: &v1alpha1.PhysicalMachineChaos{},
   168  		Impl: &Impl{
   169  			Client: c,
   170  			Log:    log.WithName("physicalmachinechaos"),
   171  		},
   172  	}
   173  }
   174  
   175  var Module = fx.Provide(
   176  	fx.Annotated{
   177  		Group:  "impl",
   178  		Target: NewImpl,
   179  	},
   180  )
   181