...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/physicalmachinechaos/impl.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/physicalmachinechaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package physicalmachinechaos
    17  
    18  import (
    19  	"bytes"
    20  	"context"
    21  	"crypto/tls"
    22  	"crypto/x509"
    23  	"encoding/json"
    24  	"fmt"
    25  	"io"
    26  	"net/http"
    27  	"os"
    28  	"strings"
    29  	"time"
    30  
    31  	"github.com/go-logr/logr"
    32  	"github.com/pkg/errors"
    33  	"go.uber.org/fx"
    34  	"sigs.k8s.io/controller-runtime/pkg/client"
    35  
    36  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    37  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    38  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    39  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
    40  )
    41  
    42  var _ impltypes.ChaosImpl = (*Impl)(nil)
    43  
    44  type Impl struct {
    45  	client.Client
    46  	Log logr.Logger
    47  }
    48  
    49  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    50  	impl.Log.Info("apply physical machine chaos")
    51  
    52  	physicalMachineChaos := obj.(*v1alpha1.PhysicalMachineChaos)
    53  	var address string
    54  	// For compatibility with older versions, we now have two ways to select the address
    55  	// of the physical machine, so there will be two possible values for the records:
    56  	//
    57  	// 1. when using address directly, values in records are IP
    58  	// 2. when using selector, values in records are NamespacedName
    59  	if len(physicalMachineChaos.Spec.Address) > 0 {
    60  		address = records[index].Id
    61  	} else {
    62  		var physicalMachine v1alpha1.PhysicalMachine
    63  		namespacedName, err := controller.ParseNamespacedName(records[index].Id)
    64  		if err != nil {
    65  			return v1alpha1.NotInjected, err
    66  		}
    67  		err = impl.Get(ctx, namespacedName, &physicalMachine)
    68  		if err != nil {
    69  			// TODO: handle this error
    70  			return v1alpha1.NotInjected, err
    71  		}
    72  		address = physicalMachine.Spec.Address
    73  	}
    74  
    75  	// for example, physicalMachinechaos.Spec.Action is 'network-delay', action is 'network', subAction is 'delay'
    76  	// notice: 'process', 'vm', 'clock' and 'user_defined' action has no subAction, set subAction to ""
    77  	actions := strings.SplitN(string(physicalMachineChaos.Spec.Action), "-", 2)
    78  	if len(actions) == 1 {
    79  		actions = append(actions, "")
    80  	} else if len(actions) != 2 {
    81  		err := errors.New("action invalid")
    82  		return v1alpha1.NotInjected, err
    83  	}
    84  	action, subAction := actions[0], actions[1]
    85  	physicalMachineChaos.Spec.ExpInfo.Action = subAction
    86  
    87  	/*
    88  		transform ExpInfo in PhysicalMachineChaos to json data required by chaosd
    89  		for example:
    90  		    ExpInfo: &ExpInfo {
    91  			    UID: "123",
    92  				Action: "cpu",
    93  				StressCPU: &StressCPU {
    94  					Load: 1,
    95  					Workers: 1,
    96  				}
    97  			}
    98  
    99  			transform to json data: "{\"uid\":\"123\",\"action\":\"cpu\",\"load\":1,\"workers\":1}
   100  	*/
   101  	var expInfoMap map[string]interface{}
   102  	expInfoBytes, _ := json.Marshal(physicalMachineChaos.Spec.ExpInfo)
   103  	err := json.Unmarshal(expInfoBytes, &expInfoMap)
   104  	if err != nil {
   105  		impl.Log.Error(err, "fail to unmarshal experiment info")
   106  		return v1alpha1.NotInjected, err
   107  	}
   108  	configKV, ok := expInfoMap[string(physicalMachineChaos.Spec.Action)].(map[string]interface{})
   109  	if !ok {
   110  		err = errors.New("transform action config to map failed")
   111  		impl.Log.Error(err, "")
   112  		return v1alpha1.NotInjected, err
   113  	}
   114  	delete(expInfoMap, string(physicalMachineChaos.Spec.Action))
   115  	for k, v := range configKV {
   116  		expInfoMap[k] = v
   117  	}
   118  
   119  	expInfoBytes, err = json.Marshal(expInfoMap)
   120  	if err != nil {
   121  		impl.Log.Error(err, "fail to marshal experiment info")
   122  		return v1alpha1.NotInjected, err
   123  	}
   124  
   125  	url := fmt.Sprintf("%s/api/attack/%s", address, action)
   126  	impl.Log.Info("HTTP request", "address", address, "data", string(expInfoBytes))
   127  
   128  	statusCode, body, err := impl.doHttpRequest("POST", url, bytes.NewBuffer(expInfoBytes))
   129  	if err != nil {
   130  		return v1alpha1.NotInjected, errors.Wrap(err, body)
   131  	}
   132  
   133  	if statusCode != http.StatusOK {
   134  		err = errors.New("HTTP status is not OK")
   135  		impl.Log.Error(err, body)
   136  		return v1alpha1.NotInjected, errors.Wrap(err, body)
   137  	}
   138  
   139  	return v1alpha1.Injected, nil
   140  }
   141  
   142  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   143  	impl.Log.Info("recover physical machine chaos")
   144  
   145  	physicalMachineChaos := obj.(*v1alpha1.PhysicalMachineChaos)
   146  	var address string
   147  	if len(physicalMachineChaos.Spec.Address) > 0 {
   148  		address = records[index].Id
   149  	} else {
   150  		var physicalMachine v1alpha1.PhysicalMachine
   151  		namespacedName, err := controller.ParseNamespacedName(records[index].Id)
   152  		if err != nil {
   153  			return v1alpha1.Injected, err
   154  		}
   155  		err = impl.Get(ctx, namespacedName, &physicalMachine)
   156  		if err != nil {
   157  			// TODO: handle this error
   158  			return v1alpha1.Injected, err
   159  		}
   160  		address = physicalMachine.Spec.Address
   161  	}
   162  
   163  	url := fmt.Sprintf("%s/api/attack/%s", address, physicalMachineChaos.Spec.ExpInfo.UID)
   164  	statusCode, body, err := impl.doHttpRequest("DELETE", url, nil)
   165  	if err != nil {
   166  		return v1alpha1.Injected, errors.Wrap(err, body)
   167  	}
   168  
   169  	if statusCode == http.StatusNotFound {
   170  		impl.Log.Info("experiment not found", "uid", physicalMachineChaos.Spec.ExpInfo.UID)
   171  	} else if statusCode != http.StatusOK {
   172  		err = errors.New("HTTP status is not OK")
   173  		impl.Log.Error(err, body)
   174  		return v1alpha1.Injected, errors.Wrap(err, body)
   175  	}
   176  
   177  	return v1alpha1.NotInjected, nil
   178  }
   179  
   180  func (impl *Impl) doHttpRequest(method, url string, data io.Reader) (int, string, error) {
   181  	req, err := http.NewRequest(method, url, data)
   182  	if err != nil {
   183  		impl.Log.Error(err, "fail to generate HTTP request")
   184  		return 0, "", err
   185  	}
   186  	req.Header.Set("Content-Type", "application/json")
   187  
   188  	var httpClient *http.Client
   189  	if config.ControllerCfg.ChaosdSecurityMode {
   190  		httpClient, err = securityHTTPClient(url)
   191  		if err != nil {
   192  			impl.Log.Error(err, "generate HTTPS client")
   193  			return 0, "", err
   194  		}
   195  	} else {
   196  		httpClient = &http.Client{Timeout: 5 * time.Second}
   197  	}
   198  
   199  	resp, err := httpClient.Do(req)
   200  	if err != nil {
   201  		impl.Log.Error(err, "do HTTP request")
   202  		return 0, "", err
   203  	}
   204  	defer resp.Body.Close()
   205  
   206  	body, err := io.ReadAll(resp.Body)
   207  	if err != nil {
   208  		return 0, "", err
   209  	}
   210  	impl.Log.Info("HTTP response", "url", url, "status", resp.Status, "body", string(body))
   211  
   212  	return resp.StatusCode, string(body), nil
   213  }
   214  
   215  func securityHTTPClient(url string) (*http.Client, error) {
   216  	if !strings.Contains(url, "https") {
   217  		return nil, errors.Errorf("a secure url should begin with `https` rather than `http`, url: %s", url)
   218  	}
   219  
   220  	pair, err := tls.LoadX509KeyPair(config.ControllerCfg.ChaosdClientCert, config.ControllerCfg.ChaosdClientKey)
   221  	if err != nil {
   222  		return nil, errors.Wrap(err, "load x509 key pair failed")
   223  	}
   224  
   225  	pool := x509.NewCertPool()
   226  	ca, err := os.ReadFile(config.ControllerCfg.ChaosdCACert)
   227  	if err != nil {
   228  		return nil, errors.Wrap(err, "read ChaosdCACert file failed")
   229  	}
   230  	pool.AppendCertsFromPEM(ca)
   231  
   232  	return &http.Client{
   233  		Transport: &http.Transport{
   234  			TLSClientConfig: &tls.Config{
   235  				RootCAs:      pool,
   236  				Certificates: []tls.Certificate{pair},
   237  				ServerName:   "chaosd.chaos-mesh.org",
   238  			},
   239  		},
   240  		Timeout: 5 * time.Second,
   241  	}, nil
   242  }
   243  
   244  func NewImpl(c client.Client, log logr.Logger) *impltypes.ChaosImplPair {
   245  	return &impltypes.ChaosImplPair{
   246  		Name:   "physicalmachinechaos",
   247  		Object: &v1alpha1.PhysicalMachineChaos{},
   248  		Impl: &Impl{
   249  			Client: c,
   250  			Log:    log.WithName("physicalmachinechaos"),
   251  		},
   252  	}
   253  }
   254  
   255  var Module = fx.Provide(
   256  	fx.Annotated{
   257  		Group:  "impl",
   258  		Target: NewImpl,
   259  	},
   260  )
   261