...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/blockchaos/impl.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/blockchaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package blockchaos
    17  
    18  import (
    19  	"context"
    20  	"strconv"
    21  	"time"
    22  
    23  	"github.com/go-logr/logr"
    24  	"github.com/pkg/errors"
    25  	"go.uber.org/fx"
    26  	"sigs.k8s.io/controller-runtime/pkg/client"
    27  
    28  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    29  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    30  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
    32  	pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    33  )
    34  
    35  var _ impltypes.ChaosImpl = (*Impl)(nil)
    36  
    37  type Impl struct {
    38  	client.Client
    39  	Log logr.Logger
    40  
    41  	decoder *utils.ContainerRecordDecoder
    42  }
    43  
    44  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    45  	impl.Log.Info("blockchaos apply", "record", records[index])
    46  
    47  	_, _, volumePath, err := controller.ParseNamespacedNameContainerVolumePath(records[index].Id)
    48  	if err != nil {
    49  		return v1alpha1.NotInjected, errors.Wrapf(err, "parse container and volumePath %s", records[index].Id)
    50  	}
    51  
    52  	decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
    53  	pbClient := decodedContainer.PbClient
    54  	containerId := decodedContainer.ContainerId
    55  	if pbClient != nil {
    56  		defer pbClient.Close()
    57  	}
    58  	if err != nil {
    59  		return v1alpha1.NotInjected, err
    60  	}
    61  
    62  	blockchaos := obj.(*v1alpha1.BlockChaos)
    63  	if blockchaos.Status.InjectionIds == nil {
    64  		blockchaos.Status.InjectionIds = make(map[string]int)
    65  	}
    66  	_, ok := blockchaos.Status.InjectionIds[records[index].Id]
    67  	if ok {
    68  		impl.Log.Info("the blockchaos has already been injected")
    69  		return v1alpha1.Injected, nil
    70  	}
    71  
    72  	var res *pb.ApplyBlockChaosResponse
    73  	if blockchaos.Spec.Action == v1alpha1.BlockDelay {
    74  		delay, err := time.ParseDuration(blockchaos.Spec.Delay.Latency)
    75  		if err != nil {
    76  			return v1alpha1.NotInjected, errors.Wrapf(err, "parse latency: %s", blockchaos.Spec.Delay.Latency)
    77  		}
    78  
    79  		corr, err := strconv.ParseFloat(blockchaos.Spec.Delay.Correlation, 64)
    80  		if err != nil {
    81  			return v1alpha1.NotInjected, errors.Wrapf(err, "parse corr: %s", blockchaos.Spec.Delay.Correlation)
    82  		}
    83  
    84  		jitter, err := time.ParseDuration(blockchaos.Spec.Delay.Jitter)
    85  		if err != nil {
    86  			return v1alpha1.NotInjected, errors.Wrapf(err, "parse jitter: %s", blockchaos.Spec.Delay.Jitter)
    87  		}
    88  
    89  		res, err = pbClient.ApplyBlockChaos(ctx, &pb.ApplyBlockChaosRequest{
    90  			ContainerId: containerId,
    91  			VolumePath:  volumePath,
    92  			Action:      pb.ApplyBlockChaosRequest_Delay,
    93  			Delay: &pb.BlockDelaySpec{
    94  				Delay:       delay.Nanoseconds(),
    95  				Correlation: corr,
    96  				Jitter:      jitter.Nanoseconds(),
    97  			},
    98  			EnterNS: true,
    99  		})
   100  
   101  		if err != nil {
   102  			return v1alpha1.NotInjected, err
   103  		}
   104  	} else {
   105  		return v1alpha1.NotInjected, utils.ErrUnknownAction
   106  	}
   107  
   108  	blockchaos.Status.InjectionIds[records[index].Id] = int(res.InjectionId)
   109  
   110  	return v1alpha1.Injected, nil
   111  }
   112  
   113  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   114  	impl.Log.Info("blockchaos recover", "record", records[index])
   115  
   116  	decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
   117  	pbClient := decodedContainer.PbClient
   118  	if pbClient != nil {
   119  		defer pbClient.Close()
   120  	}
   121  	if err != nil {
   122  		if errors.Is(err, utils.ErrContainerNotFound) {
   123  			// pretend the disappeared container has been recovered
   124  			return v1alpha1.NotInjected, nil
   125  		}
   126  		return v1alpha1.Injected, err
   127  	}
   128  
   129  	blockchaos := obj.(*v1alpha1.BlockChaos)
   130  	if blockchaos.Status.InjectionIds == nil {
   131  		blockchaos.Status.InjectionIds = make(map[string]int)
   132  	}
   133  	injection_id, ok := blockchaos.Status.InjectionIds[records[index].Id]
   134  	if !ok {
   135  		impl.Log.Info("the blockchaos has already been recovered")
   136  		return v1alpha1.NotInjected, nil
   137  	}
   138  
   139  	if _, err = pbClient.RecoverBlockChaos(ctx, &pb.RecoverBlockChaosRequest{
   140  		InjectionId: int32(injection_id),
   141  	}); err != nil {
   142  		// TODO: check whether the error still exists
   143  		return v1alpha1.Injected, err
   144  	}
   145  	delete(blockchaos.Status.InjectionIds, records[index].Id)
   146  	return v1alpha1.NotInjected, nil
   147  }
   148  
   149  func NewImpl(c client.Client, log logr.Logger, decoder *utils.ContainerRecordDecoder) *impltypes.ChaosImplPair {
   150  	return &impltypes.ChaosImplPair{
   151  		Name:   "blockchaos",
   152  		Object: &v1alpha1.BlockChaos{},
   153  		Impl: &Impl{
   154  			Client:  c,
   155  			Log:     log.WithName("blockchaos"),
   156  			decoder: decoder,
   157  		},
   158  	}
   159  }
   160  
   161  var Module = fx.Provide(
   162  	fx.Annotated{
   163  		Group:  "impl",
   164  		Target: NewImpl,
   165  	},
   166  )
   167