...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/trafficcontrol/impl.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/trafficcontrol

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package trafficcontrol
    17  
    18  import (
    19  	"context"
    20  	"strings"
    21  
    22  	"github.com/go-logr/logr"
    23  	"github.com/pkg/errors"
    24  	v1 "k8s.io/api/core/v1"
    25  	k8sError "k8s.io/apimachinery/pkg/api/errors"
    26  	"k8s.io/apimachinery/pkg/types"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  
    29  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    30  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/podnetworkchaosmanager"
    31  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
    34  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/netutils"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
    36  )
    37  
    38  var _ impltypes.ChaosImpl = (*Impl)(nil)
    39  
    40  const (
    41  	targetIPSetPostFix = "tgt"
    42  	sourceIPSetPostFix = "src"
    43  )
    44  
    45  const (
    46  	waitForApplySync   v1alpha1.Phase = "Not Injected/Wait"
    47  	waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
    48  )
    49  
    50  type Impl struct {
    51  	client.Client
    52  
    53  	builder *podnetworkchaosmanager.Builder
    54  
    55  	Log logr.Logger
    56  }
    57  
    58  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    59  	// The only possible phase to get in here is "Not Injected" or "Not Injected/Wait"
    60  
    61  	impl.Log.Info("traffic control Apply", "namespace", obj.GetNamespace(), "name", obj.GetName())
    62  	networkchaos := obj.(*v1alpha1.NetworkChaos)
    63  	if networkchaos.Status.Instances == nil {
    64  		networkchaos.Status.Instances = make(map[string]int64)
    65  	}
    66  
    67  	record := records[index]
    68  	phase := record.Phase
    69  
    70  	if phase == waitForApplySync {
    71  		podnetworkchaos := &v1alpha1.PodNetworkChaos{}
    72  		namespacedName, err := controller.ParseNamespacedName(record.Id)
    73  		if err != nil {
    74  			return waitForApplySync, err
    75  		}
    76  		err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
    77  		if err != nil {
    78  			if k8sError.IsNotFound(err) {
    79  				return v1alpha1.NotInjected, nil
    80  			}
    81  
    82  			if k8sError.IsForbidden(err) {
    83  				if strings.Contains(err.Error(), "because it is being terminated") {
    84  					return v1alpha1.NotInjected, nil
    85  				}
    86  			}
    87  
    88  			return waitForApplySync, err
    89  		}
    90  
    91  		if podnetworkchaos.Status.FailedMessage != "" {
    92  			return waitForApplySync, errors.New(podnetworkchaos.Status.FailedMessage)
    93  		}
    94  
    95  		if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
    96  			return v1alpha1.Injected, nil
    97  		}
    98  
    99  		return waitForApplySync, nil
   100  	}
   101  
   102  	var pod v1.Pod
   103  	namespacedName, err := controller.ParseNamespacedName(record.Id)
   104  	if err != nil {
   105  		return v1alpha1.NotInjected, err
   106  	}
   107  	err = impl.Client.Get(ctx, namespacedName, &pod)
   108  	if err != nil {
   109  		// TODO: handle this error
   110  		return v1alpha1.NotInjected, err
   111  	}
   112  
   113  	source := networkchaos.Namespace + "/" + networkchaos.Name
   114  	m := impl.builder.WithInit(source, types.NamespacedName{
   115  		Namespace: pod.Namespace,
   116  		Name:      pod.Name,
   117  	})
   118  
   119  	if record.SelectorKey == "." {
   120  		if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
   121  			var targets []*v1alpha1.Record
   122  			for _, record := range records {
   123  				if record.SelectorKey == ".Target" {
   124  					targets = append(targets, record)
   125  				}
   126  			}
   127  
   128  			err := impl.ApplyTc(ctx, m, targets, networkchaos, targetIPSetPostFix, networkchaos.Spec.Device)
   129  			if err != nil {
   130  				return v1alpha1.NotInjected, err
   131  			}
   132  
   133  			generationNumber, err := m.Commit(ctx, networkchaos)
   134  			if err != nil {
   135  				return v1alpha1.NotInjected, err
   136  			}
   137  
   138  			// modify the custom status
   139  			networkchaos.Status.Instances[record.Id] = generationNumber
   140  			return waitForApplySync, nil
   141  		}
   142  
   143  		return v1alpha1.Injected, nil
   144  	} else if record.SelectorKey == ".Target" {
   145  		if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
   146  			var targets []*v1alpha1.Record
   147  			for _, record := range records {
   148  				if record.SelectorKey == "." {
   149  					targets = append(targets, record)
   150  				}
   151  			}
   152  
   153  			err := impl.ApplyTc(ctx, m, targets, networkchaos, sourceIPSetPostFix, networkchaos.Spec.TargetDevice)
   154  			if err != nil {
   155  				return v1alpha1.NotInjected, err
   156  			}
   157  
   158  			generationNumber, err := m.Commit(ctx, networkchaos)
   159  			if err != nil {
   160  				return v1alpha1.NotInjected, err
   161  			}
   162  
   163  			// modify the custom status
   164  			networkchaos.Status.Instances[record.Id] = generationNumber
   165  			return waitForApplySync, nil
   166  		}
   167  
   168  		return v1alpha1.Injected, nil
   169  	} else {
   170  		impl.Log.Info("unknown selector key", "record", record)
   171  		return v1alpha1.NotInjected, nil
   172  	}
   173  }
   174  
   175  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   176  	// The only possible phase to get in here is "Injected" or "Injected/Wait"
   177  
   178  	networkchaos := obj.(*v1alpha1.NetworkChaos)
   179  	if networkchaos.Status.Instances == nil {
   180  		networkchaos.Status.Instances = make(map[string]int64)
   181  	}
   182  
   183  	record := records[index]
   184  	phase := record.Phase
   185  
   186  	if phase == waitForRecoverSync {
   187  		podnetworkchaos := &v1alpha1.PodNetworkChaos{}
   188  		namespacedName, err := controller.ParseNamespacedName(record.Id)
   189  		if err != nil {
   190  			// This error is not expected to exist
   191  			return waitForRecoverSync, err
   192  		}
   193  		err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
   194  		if err != nil {
   195  			// TODO: handle this error
   196  			if k8sError.IsNotFound(err) {
   197  				return v1alpha1.NotInjected, nil
   198  			}
   199  			return waitForRecoverSync, err
   200  		}
   201  
   202  		if podnetworkchaos.Status.FailedMessage != "" {
   203  			return waitForRecoverSync, errors.New(podnetworkchaos.Status.FailedMessage)
   204  		}
   205  
   206  		if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
   207  			return v1alpha1.NotInjected, nil
   208  		}
   209  
   210  		return waitForRecoverSync, nil
   211  	}
   212  
   213  	var pod v1.Pod
   214  	namespacedName, err := controller.ParseNamespacedName(record.Id)
   215  	if err != nil {
   216  		// This error is not expected to exist
   217  		return v1alpha1.Injected, err
   218  	}
   219  	err = impl.Client.Get(ctx, namespacedName, &pod)
   220  	if err != nil {
   221  		// TODO: handle this error
   222  		if k8sError.IsNotFound(err) {
   223  			return v1alpha1.NotInjected, nil
   224  		}
   225  
   226  		if k8sError.IsForbidden(err) {
   227  			if strings.Contains(err.Error(), "because it is being terminated") {
   228  				return v1alpha1.NotInjected, nil
   229  			}
   230  		}
   231  		return v1alpha1.Injected, err
   232  	}
   233  
   234  	source := networkchaos.Namespace + "/" + networkchaos.Name
   235  	// TODO: use the DI but not construct it manually
   236  	m := impl.builder.WithInit(source, types.NamespacedName{
   237  		Namespace: pod.Namespace,
   238  		Name:      pod.Name,
   239  	})
   240  	generationNumber, err := m.Commit(ctx, networkchaos)
   241  	if err != nil {
   242  		if err == podnetworkchaosmanager.ErrPodNotFound || err == podnetworkchaosmanager.ErrPodNotRunning {
   243  			return v1alpha1.NotInjected, nil
   244  		}
   245  		return v1alpha1.Injected, err
   246  	}
   247  
   248  	// Now modify the custom status and phase
   249  	networkchaos.Status.Instances[record.Id] = generationNumber
   250  	return waitForRecoverSync, nil
   251  }
   252  
   253  func (impl *Impl) ApplyTc(ctx context.Context, m *podnetworkchaosmanager.PodNetworkManager, targets []*v1alpha1.Record, networkchaos *v1alpha1.NetworkChaos, ipSetPostFix string, device string) error {
   254  	spec := networkchaos.Spec
   255  	tcType := v1alpha1.Bandwidth
   256  	switch spec.Action {
   257  	case v1alpha1.NetemAction, v1alpha1.DelayAction, v1alpha1.DuplicateAction, v1alpha1.CorruptAction, v1alpha1.LossAction:
   258  		tcType = v1alpha1.Netem
   259  	case v1alpha1.BandwidthAction:
   260  		tcType = v1alpha1.Bandwidth
   261  	default:
   262  		return errors.Wrapf(utils.ErrUnknownAction, "action: %s", spec.Action)
   263  	}
   264  
   265  	externalCidrs, err := netutils.ResolveCidrs(networkchaos.Spec.ExternalTargets)
   266  	if err != nil {
   267  		return err
   268  	}
   269  
   270  	if len(targets)+len(externalCidrs) == 0 {
   271  		impl.Log.Info("apply traffic control", "sources", m.Source)
   272  		m.T.Append(v1alpha1.RawTrafficControl{
   273  			Type:        tcType,
   274  			TcParameter: spec.TcParameter,
   275  			Source:      m.Source,
   276  			Device:      device,
   277  		})
   278  		return nil
   279  	}
   280  
   281  	targetPods := []v1.Pod{}
   282  	for _, record := range targets {
   283  		var pod v1.Pod
   284  		namespacedName, err := controller.ParseNamespacedName(record.Id)
   285  		if err != nil {
   286  			// TODO: handle this error
   287  			return err
   288  		}
   289  		err = impl.Client.Get(ctx, namespacedName, &pod)
   290  		if err != nil {
   291  			// TODO: handle this error
   292  			return err
   293  		}
   294  		targetPods = append(targetPods, pod)
   295  	}
   296  	ipSetWithTcPostFix := string(tcType[0:2]) + ipSetPostFix
   297  	dstIPSets := ipset.BuildIPSets(targetPods, externalCidrs, networkchaos, ipSetWithTcPostFix, m.Source)
   298  	dstSetIPSet := ipset.BuildSetIPSet(dstIPSets, networkchaos, ipSetWithTcPostFix, m.Source)
   299  	impl.Log.Info("apply traffic control with filter", "sources", m.Source, "setIpset", dstSetIPSet, "ipSets", dstIPSets)
   300  
   301  	for _, ipSet := range dstIPSets {
   302  		m.T.Append(ipSet)
   303  	}
   304  
   305  	m.T.Append(dstSetIPSet)
   306  
   307  	m.T.Append(v1alpha1.RawTrafficControl{
   308  		Type:        tcType,
   309  		TcParameter: spec.TcParameter,
   310  		Source:      m.Source,
   311  		IPSet:       dstSetIPSet.Name,
   312  		Device:      device,
   313  	})
   314  
   315  	return nil
   316  }
   317  
   318  func NewImpl(c client.Client, b *podnetworkchaosmanager.Builder, log logr.Logger) *Impl {
   319  	return &Impl{
   320  		Client:  c,
   321  		builder: b,
   322  		Log:     log.WithName("trafficcontrol"),
   323  	}
   324  }
   325