...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/iochaos/impl.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/iochaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package iochaos
    17  
    18  import (
    19  	"context"
    20  	"strings"
    21  
    22  	"github.com/go-logr/logr"
    23  	"github.com/pkg/errors"
    24  	"go.uber.org/fx"
    25  	v1 "k8s.io/api/core/v1"
    26  	k8sError "k8s.io/apimachinery/pkg/api/errors"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    29  
    30  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/iochaos/podiochaosmanager"
    32  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
    34  )
    35  
    36  var _ impltypes.ChaosImpl = (*Impl)(nil)
    37  
    38  const (
    39  	waitForApplySync   v1alpha1.Phase = "Not Injected/Wait"
    40  	waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
    41  )
    42  
    43  type Impl struct {
    44  	client.Client
    45  	Log logr.Logger
    46  
    47  	builder *podiochaosmanager.Builder
    48  }
    49  
    50  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    51  	// The only possible phase to get in here is "Not Injected" or "Not Injected/Wait"
    52  
    53  	impl.Log.Info("iochaos Apply", "namespace", obj.GetNamespace(), "name", obj.GetName())
    54  	iochaos := obj.(*v1alpha1.IOChaos)
    55  	if iochaos.Status.Instances == nil {
    56  		iochaos.Status.Instances = make(map[string]int64)
    57  	}
    58  
    59  	record := records[index]
    60  	phase := record.Phase
    61  
    62  	if phase == waitForApplySync {
    63  		podiochaos := &v1alpha1.PodIOChaos{}
    64  		namespacedName, err := controller.ParseNamespacedName(record.Id)
    65  		if err != nil {
    66  			return waitForApplySync, err
    67  		}
    68  		err = impl.Client.Get(ctx, namespacedName, podiochaos)
    69  		if err != nil {
    70  			if k8sError.IsNotFound(err) {
    71  				return v1alpha1.NotInjected, nil
    72  			}
    73  
    74  			if k8sError.IsForbidden(err) {
    75  				if strings.Contains(err.Error(), "because it is being terminated") {
    76  					return v1alpha1.NotInjected, nil
    77  				}
    78  			}
    79  
    80  			return waitForApplySync, err
    81  		}
    82  
    83  		if podiochaos.Status.FailedMessage != "" {
    84  			return waitForApplySync, errors.New(podiochaos.Status.FailedMessage)
    85  		}
    86  
    87  		if podiochaos.Status.ObservedGeneration >= iochaos.Status.Instances[record.Id] {
    88  			return v1alpha1.Injected, nil
    89  		}
    90  
    91  		return waitForApplySync, nil
    92  	}
    93  
    94  	podId, containerName, err := controller.ParseNamespacedNameContainer(records[index].Id)
    95  	if err != nil {
    96  		return v1alpha1.NotInjected, err
    97  	}
    98  	var pod v1.Pod
    99  	err = impl.Client.Get(ctx, podId, &pod)
   100  	if err != nil {
   101  		return v1alpha1.NotInjected, err
   102  	}
   103  
   104  	source := iochaos.Namespace + "/" + iochaos.Name
   105  	m := impl.builder.WithInit(source, types.NamespacedName{
   106  		Namespace: pod.Namespace,
   107  		Name:      pod.Name,
   108  	})
   109  
   110  	m.T.SetVolumePath(iochaos.Spec.VolumePath)
   111  	m.T.SetContainer(containerName)
   112  
   113  	m.T.Append(v1alpha1.IOChaosAction{
   114  		Type: iochaos.Spec.Action,
   115  		Filter: v1alpha1.Filter{
   116  			Path:    iochaos.Spec.Path,
   117  			Percent: iochaos.Spec.Percent,
   118  			Methods: iochaos.Spec.Methods,
   119  		},
   120  		Faults: []v1alpha1.IoFault{
   121  			{
   122  				Errno:  iochaos.Spec.Errno,
   123  				Weight: 1,
   124  			},
   125  		},
   126  		Latency:          iochaos.Spec.Delay,
   127  		AttrOverrideSpec: iochaos.Spec.Attr,
   128  		MistakeSpec:      iochaos.Spec.Mistake,
   129  		Source:           m.Source,
   130  	})
   131  	generationNumber, err := m.Commit(ctx, iochaos)
   132  	if err != nil {
   133  		return v1alpha1.NotInjected, err
   134  	}
   135  
   136  	// modify the custom status
   137  	iochaos.Status.Instances[record.Id] = generationNumber
   138  	return waitForApplySync, nil
   139  }
   140  
   141  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   142  	// The only possible phase to get in here is "Injected" or "Injected/Wait"
   143  
   144  	iochaos := obj.(*v1alpha1.IOChaos)
   145  	if iochaos.Status.Instances == nil {
   146  		iochaos.Status.Instances = make(map[string]int64)
   147  	}
   148  
   149  	record := records[index]
   150  	phase := record.Phase
   151  	if phase == waitForRecoverSync {
   152  		podiochaos := &v1alpha1.PodIOChaos{}
   153  		namespacedName, err := controller.ParseNamespacedName(record.Id)
   154  		if err != nil {
   155  			// This error is not expected to exist
   156  			return waitForRecoverSync, nil
   157  		}
   158  		err = impl.Client.Get(ctx, namespacedName, podiochaos)
   159  		if err != nil {
   160  			// TODO: handle this error
   161  			if k8sError.IsNotFound(err) {
   162  				return v1alpha1.NotInjected, nil
   163  			}
   164  			return waitForRecoverSync, err
   165  		}
   166  
   167  		if podiochaos.Status.FailedMessage != "" {
   168  			return waitForRecoverSync, errors.New(podiochaos.Status.FailedMessage)
   169  		}
   170  
   171  		if podiochaos.Status.ObservedGeneration >= iochaos.Status.Instances[record.Id] {
   172  			return v1alpha1.NotInjected, nil
   173  		}
   174  
   175  		return waitForRecoverSync, nil
   176  	}
   177  
   178  	podId, _, err := controller.ParseNamespacedNameContainer(records[index].Id)
   179  	if err != nil {
   180  		// This error is not expected to exist
   181  		return v1alpha1.NotInjected, err
   182  	}
   183  	var pod v1.Pod
   184  	err = impl.Client.Get(ctx, podId, &pod)
   185  	if err != nil {
   186  		// TODO: handle this error
   187  		if k8sError.IsNotFound(err) {
   188  			return v1alpha1.NotInjected, nil
   189  		}
   190  		return v1alpha1.Injected, err
   191  	}
   192  
   193  	source := iochaos.Namespace + "/" + iochaos.Name
   194  	m := impl.builder.WithInit(source, types.NamespacedName{
   195  		Namespace: pod.Namespace,
   196  		Name:      pod.Name,
   197  	})
   198  
   199  	generationNumber, err := m.Commit(ctx, iochaos)
   200  	if err != nil {
   201  		if err == podiochaosmanager.ErrPodNotFound || err == podiochaosmanager.ErrPodNotRunning {
   202  			return v1alpha1.NotInjected, nil
   203  		}
   204  
   205  		if k8sError.IsForbidden(err) {
   206  			if strings.Contains(err.Error(), "because it is being terminated") {
   207  				return v1alpha1.NotInjected, nil
   208  			}
   209  		}
   210  		return v1alpha1.Injected, err
   211  	}
   212  
   213  	// Now modify the custom status and phase
   214  	iochaos.Status.Instances[record.Id] = generationNumber
   215  	return waitForRecoverSync, nil
   216  }
   217  
   218  func NewImpl(c client.Client, b *podiochaosmanager.Builder, log logr.Logger) *impltypes.ChaosImplPair {
   219  	return &impltypes.ChaosImplPair{
   220  		Name:   "iochaos",
   221  		Object: &v1alpha1.IOChaos{},
   222  		Impl: &Impl{
   223  			Client:  c,
   224  			Log:     log.WithName("iochaos"),
   225  			builder: b,
   226  		},
   227  		ObjectList: &v1alpha1.IOChaosList{},
   228  		Controlls:  []client.Object{&v1alpha1.PodIOChaos{}},
   229  	}
   230  }
   231  
   232  var Module = fx.Provide(
   233  	fx.Annotated{
   234  		Group:  "impl",
   235  		Target: NewImpl,
   236  	},
   237  	podiochaosmanager.NewBuilder,
   238  )
   239