...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/timechaos/impl.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/timechaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package timechaos
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"time"
    22  
    23  	"github.com/go-logr/logr"
    24  	"github.com/pkg/errors"
    25  	"go.uber.org/fx"
    26  	"sigs.k8s.io/controller-runtime/pkg/client"
    27  
    28  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    29  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    30  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
    31  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    32  	timeUtils "github.com/chaos-mesh/chaos-mesh/pkg/time/utils"
    33  )
    34  
    35  var _ impltypes.ChaosImpl = (*Impl)(nil)
    36  
    37  type Impl struct {
    38  	client.Client
    39  	Log     logr.Logger
    40  	decoder *utils.ContainerRecordDecoder
    41  }
    42  
    43  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    44  	decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
    45  	pbClient := decodedContainer.PbClient
    46  	containerId := decodedContainer.ContainerId
    47  	if pbClient != nil {
    48  		defer pbClient.Close()
    49  	}
    50  	if err != nil {
    51  		return v1alpha1.NotInjected, err
    52  	}
    53  
    54  	timechaos := obj.(*v1alpha1.TimeChaos)
    55  	mask, err := timeUtils.EncodeClkIds(timechaos.Spec.ClockIds)
    56  	if err != nil {
    57  		return v1alpha1.NotInjected, err
    58  	}
    59  
    60  	duration, err := time.ParseDuration(timechaos.Spec.TimeOffset)
    61  	if err != nil {
    62  		return v1alpha1.NotInjected, err
    63  	}
    64  
    65  	sec, nsec := secAndNSecFromDuration(duration)
    66  
    67  	impl.Log.Info("setting time shift", "mask", mask, "sec", sec, "nsec", nsec, "containerId", containerId)
    68  	_, err = pbClient.SetTimeOffset(ctx, &pb.TimeRequest{
    69  		ContainerId:      containerId,
    70  		Sec:              sec,
    71  		Nsec:             nsec,
    72  		ClkIdsMask:       mask,
    73  		Uid:              string(obj.GetUID()) + string(decodedContainer.Pod.GetUID()),
    74  		PodContainerName: fmt.Sprintf("%s:%s", decodedContainer.Pod.GetUID(), decodedContainer.ContainerName),
    75  	})
    76  	if err != nil {
    77  		return v1alpha1.NotInjected, err
    78  	}
    79  
    80  	return v1alpha1.Injected, nil
    81  }
    82  
    83  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    84  	decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
    85  	pbClient := decodedContainer.PbClient
    86  	containerId := decodedContainer.ContainerId
    87  	if pbClient != nil {
    88  		defer pbClient.Close()
    89  	}
    90  	if err != nil {
    91  		if errors.Is(err, utils.ErrContainerNotFound) {
    92  			// pretend the disappeared container has been recovered
    93  			return v1alpha1.NotInjected, nil
    94  		}
    95  		return v1alpha1.Injected, err
    96  	}
    97  
    98  	impl.Log.Info("recover for container", "containerId", containerId)
    99  	_, err = pbClient.RecoverTimeOffset(ctx, &pb.TimeRequest{
   100  		ContainerId:      containerId,
   101  		Uid:              string(obj.GetUID()) + string(decodedContainer.Pod.GetUID()),
   102  		PodContainerName: fmt.Sprintf("%s:%s", decodedContainer.Pod.GetUID(), decodedContainer.ContainerName),
   103  	})
   104  	if err != nil {
   105  		return v1alpha1.Injected, err
   106  	}
   107  
   108  	return v1alpha1.NotInjected, nil
   109  }
   110  
   111  func secAndNSecFromDuration(duration time.Duration) (sec int64, nsec int64) {
   112  	sec = duration.Nanoseconds() / 1e9
   113  	nsec = duration.Nanoseconds() - (sec * 1e9)
   114  
   115  	return
   116  }
   117  
   118  func NewImpl(c client.Client, log logr.Logger, decoder *utils.ContainerRecordDecoder) *impltypes.ChaosImplPair {
   119  	return &impltypes.ChaosImplPair{
   120  		Name:   "timechaos",
   121  		Object: &v1alpha1.TimeChaos{},
   122  		Impl: &Impl{
   123  			Client:  c,
   124  			Log:     log.WithName("timechaos"),
   125  			decoder: decoder,
   126  		},
   127  	}
   128  }
   129  
   130  var Module = fx.Provide(
   131  	fx.Annotated{
   132  		Group:  "impl",
   133  		Target: NewImpl,
   134  	},
   135  )
   136