...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/stresschaos/impl.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/stresschaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package stresschaos
    17  
    18  import (
    19  	"context"
    20  	"time"
    21  
    22  	"github.com/go-logr/logr"
    23  	"github.com/pkg/errors"
    24  	"go.uber.org/fx"
    25  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    26  	"sigs.k8s.io/controller-runtime/pkg/client"
    27  
    28  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    29  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    30  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
    31  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    32  )
    33  
    34  var _ impltypes.ChaosImpl = (*Impl)(nil)
    35  
    36  type Impl struct {
    37  	client.Client
    38  
    39  	Log logr.Logger
    40  
    41  	decoder *utils.ContainerRecordDecoder
    42  }
    43  
    44  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    45  	decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
    46  	pbClient := decodedContainer.PbClient
    47  	containerId := decodedContainer.ContainerId
    48  	if pbClient != nil {
    49  		defer pbClient.Close()
    50  	}
    51  	if err != nil {
    52  		return v1alpha1.NotInjected, err
    53  	}
    54  
    55  	stresschaos := obj.(*v1alpha1.StressChaos)
    56  	if stresschaos.Status.Instances == nil {
    57  		stresschaos.Status.Instances = make(map[string]v1alpha1.StressInstance)
    58  	}
    59  	_, ok := stresschaos.Status.Instances[records[index].Id]
    60  	if ok {
    61  		impl.Log.Info("an stress-ng instance is running for this pod")
    62  		return v1alpha1.Injected, nil
    63  	}
    64  
    65  	stressors := stresschaos.Spec.StressngStressors
    66  	cpuStressors := ""
    67  	memoryStressors := ""
    68  	if len(stressors) == 0 {
    69  		cpuStressors, memoryStressors, err = stresschaos.Spec.Stressors.Normalize()
    70  		if err != nil {
    71  			impl.Log.Info("fail to ")
    72  			// TODO: add an event here
    73  			return v1alpha1.NotInjected, err
    74  		}
    75  	}
    76  
    77  	req := pb.ExecStressRequest{
    78  		Scope:           pb.ExecStressRequest_CONTAINER,
    79  		Target:          containerId,
    80  		CpuStressors:    cpuStressors,
    81  		MemoryStressors: memoryStressors,
    82  		EnterNS:         true,
    83  	}
    84  	if stresschaos.Spec.Stressors.MemoryStressor != nil {
    85  		req.OomScoreAdj = int32(stresschaos.Spec.Stressors.MemoryStressor.OOMScoreAdj)
    86  	}
    87  	res, err := pbClient.ExecStressors(ctx, &req)
    88  
    89  	if err != nil {
    90  		return v1alpha1.NotInjected, err
    91  	}
    92  	// TODO: support custom status
    93  	stresschaos.Status.Instances[records[index].Id] = v1alpha1.StressInstance{
    94  		UID: res.CpuInstance,
    95  		StartTime: &metav1.Time{
    96  			Time: time.Unix(res.CpuStartTime/1000, (res.CpuStartTime%1000)*int64(time.Millisecond)),
    97  		},
    98  		MemoryUID: res.MemoryInstance,
    99  		MemoryStartTime: &metav1.Time{
   100  			Time: time.Unix(res.MemoryStartTime/1000, (res.MemoryStartTime%1000)*int64(time.Millisecond)),
   101  		},
   102  	}
   103  
   104  	return v1alpha1.Injected, nil
   105  }
   106  
   107  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   108  	decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
   109  	pbClient := decodedContainer.PbClient
   110  	if pbClient != nil {
   111  		defer pbClient.Close()
   112  	}
   113  	if err != nil {
   114  		if errors.Is(err, utils.ErrContainerNotFound) {
   115  			// pretend the disappeared container has been recovered
   116  			return v1alpha1.NotInjected, nil
   117  		}
   118  		return v1alpha1.Injected, err
   119  	}
   120  
   121  	stresschaos := obj.(*v1alpha1.StressChaos)
   122  	if stresschaos.Status.Instances == nil {
   123  		return v1alpha1.NotInjected, nil
   124  	}
   125  	instance, ok := stresschaos.Status.Instances[records[index].Id]
   126  	if !ok {
   127  		impl.Log.Info("Pod seems already recovered", "pod", decodedContainer.Pod.UID)
   128  		return v1alpha1.NotInjected, nil
   129  	}
   130  	req := &pb.CancelStressRequest{
   131  		CpuInstance:    instance.UID,
   132  		MemoryInstance: instance.MemoryUID,
   133  	}
   134  	if instance.StartTime != nil {
   135  		req.CpuStartTime = instance.StartTime.UnixNano() / int64(time.Millisecond)
   136  	}
   137  	if instance.MemoryStartTime != nil {
   138  		req.MemoryStartTime = instance.MemoryStartTime.UnixNano() / int64(time.Millisecond)
   139  	}
   140  	if _, err = pbClient.CancelStressors(ctx, req); err != nil {
   141  		impl.Log.Error(err, "cancel stressors")
   142  		return v1alpha1.Injected, nil
   143  	}
   144  	delete(stresschaos.Status.Instances, records[index].Id)
   145  	return v1alpha1.NotInjected, nil
   146  }
   147  
   148  func NewImpl(c client.Client, log logr.Logger, decoder *utils.ContainerRecordDecoder) *impltypes.ChaosImplPair {
   149  	return &impltypes.ChaosImplPair{
   150  		Name:   "stresschaos",
   151  		Object: &v1alpha1.StressChaos{},
   152  		Impl: &Impl{
   153  			Client:  c,
   154  			Log:     log.WithName("stresschaos"),
   155  			decoder: decoder,
   156  		},
   157  	}
   158  }
   159  
   160  var Module = fx.Provide(
   161  	fx.Annotated{
   162  		Group:  "impl",
   163  		Target: NewImpl,
   164  	},
   165  )
   166