...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/trafficcontrol/impl.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/trafficcontrol

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package trafficcontrol
    17  
    18  import (
    19  	"context"
    20  	"errors"
    21  	"fmt"
    22  	"strings"
    23  
    24  	"github.com/go-logr/logr"
    25  	v1 "k8s.io/api/core/v1"
    26  	k8sError "k8s.io/apimachinery/pkg/api/errors"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    29  
    30  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/podnetworkchaosmanager"
    32  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
    34  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/netutils"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
    36  )
    37  
    38  var _ impltypes.ChaosImpl = (*Impl)(nil)
    39  
    40  const (
    41  	targetIPSetPostFix = "tgt"
    42  	sourceIPSetPostFix = "src"
    43  )
    44  
    45  const (
    46  	waitForApplySync   v1alpha1.Phase = "Not Injected/Wait"
    47  	waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
    48  )
    49  
    50  type Impl struct {
    51  	client.Client
    52  
    53  	builder *podnetworkchaosmanager.Builder
    54  
    55  	Log logr.Logger
    56  }
    57  
    58  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    59  	// The only possible phase to get in here is "Not Injected" or "Not Injected/Wait"
    60  
    61  	impl.Log.Info("traffic control Apply", "namespace", obj.GetNamespace(), "name", obj.GetName())
    62  	networkchaos := obj.(*v1alpha1.NetworkChaos)
    63  	if networkchaos.Status.Instances == nil {
    64  		networkchaos.Status.Instances = make(map[string]int64)
    65  	}
    66  	if networkchaos.Status.Instances == nil {
    67  		networkchaos.Status.Instances = make(map[string]int64)
    68  	}
    69  
    70  	record := records[index]
    71  	phase := record.Phase
    72  
    73  	if phase == waitForApplySync {
    74  		podnetworkchaos := &v1alpha1.PodNetworkChaos{}
    75  		namespacedName, err := controller.ParseNamespacedName(record.Id)
    76  		if err != nil {
    77  			return waitForApplySync, err
    78  		}
    79  		err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
    80  		if err != nil {
    81  			if k8sError.IsNotFound(err) {
    82  				return v1alpha1.NotInjected, nil
    83  			}
    84  
    85  			if k8sError.IsForbidden(err) {
    86  				if strings.Contains(err.Error(), "because it is being terminated") {
    87  					return v1alpha1.NotInjected, nil
    88  				}
    89  			}
    90  
    91  			return waitForApplySync, err
    92  		}
    93  
    94  		if podnetworkchaos.Status.FailedMessage != "" {
    95  			return waitForApplySync, errors.New(podnetworkchaos.Status.FailedMessage)
    96  		}
    97  
    98  		if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
    99  			return v1alpha1.Injected, nil
   100  		}
   101  
   102  		return waitForApplySync, nil
   103  	}
   104  
   105  	var pod v1.Pod
   106  	namespacedName, err := controller.ParseNamespacedName(record.Id)
   107  	if err != nil {
   108  		return v1alpha1.NotInjected, err
   109  	}
   110  	err = impl.Client.Get(ctx, namespacedName, &pod)
   111  	if err != nil {
   112  		// TODO: handle this error
   113  		return v1alpha1.NotInjected, err
   114  	}
   115  
   116  	source := networkchaos.Namespace + "/" + networkchaos.Name
   117  	m := impl.builder.WithInit(source, types.NamespacedName{
   118  		Namespace: pod.Namespace,
   119  		Name:      pod.Name,
   120  	})
   121  
   122  	if record.SelectorKey == "." {
   123  		if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
   124  			var targets []*v1alpha1.Record
   125  			for _, record := range records {
   126  				if record.SelectorKey == ".Target" {
   127  					targets = append(targets, record)
   128  				}
   129  			}
   130  
   131  			err := impl.ApplyTc(ctx, m, targets, networkchaos, targetIPSetPostFix, networkchaos.Spec.Device)
   132  			if err != nil {
   133  				return v1alpha1.NotInjected, err
   134  			}
   135  
   136  			generationNumber, err := m.Commit(ctx, networkchaos)
   137  			if err != nil {
   138  				return v1alpha1.NotInjected, err
   139  			}
   140  
   141  			// modify the custom status
   142  			networkchaos.Status.Instances[record.Id] = generationNumber
   143  			return waitForApplySync, nil
   144  		}
   145  
   146  		return v1alpha1.Injected, nil
   147  	} else if record.SelectorKey == ".Target" {
   148  		if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
   149  			var targets []*v1alpha1.Record
   150  			for _, record := range records {
   151  				if record.SelectorKey == "." {
   152  					targets = append(targets, record)
   153  				}
   154  			}
   155  
   156  			err := impl.ApplyTc(ctx, m, targets, networkchaos, sourceIPSetPostFix, networkchaos.Spec.TargetDevice)
   157  			if err != nil {
   158  				return v1alpha1.NotInjected, err
   159  			}
   160  
   161  			generationNumber, err := m.Commit(ctx, networkchaos)
   162  			if err != nil {
   163  				return v1alpha1.NotInjected, err
   164  			}
   165  
   166  			// modify the custom status
   167  			networkchaos.Status.Instances[record.Id] = generationNumber
   168  			return waitForApplySync, nil
   169  		}
   170  
   171  		return v1alpha1.Injected, nil
   172  	} else {
   173  		impl.Log.Info("unknown selector key", "record", record)
   174  		return v1alpha1.NotInjected, nil
   175  	}
   176  }
   177  
   178  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   179  	// The only possible phase to get in here is "Injected" or "Injected/Wait"
   180  
   181  	networkchaos := obj.(*v1alpha1.NetworkChaos)
   182  	if networkchaos.Status.Instances == nil {
   183  		networkchaos.Status.Instances = make(map[string]int64)
   184  	}
   185  	if networkchaos.Status.Instances == nil {
   186  		networkchaos.Status.Instances = make(map[string]int64)
   187  	}
   188  
   189  	record := records[index]
   190  	phase := record.Phase
   191  
   192  	if phase == waitForRecoverSync {
   193  		podnetworkchaos := &v1alpha1.PodNetworkChaos{}
   194  		namespacedName, err := controller.ParseNamespacedName(record.Id)
   195  		if err != nil {
   196  			// This error is not expected to exist
   197  			return waitForRecoverSync, err
   198  		}
   199  		err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
   200  		if err != nil {
   201  			// TODO: handle this error
   202  			if k8sError.IsNotFound(err) {
   203  				return v1alpha1.NotInjected, nil
   204  			}
   205  			return waitForRecoverSync, err
   206  		}
   207  
   208  		if podnetworkchaos.Status.FailedMessage != "" {
   209  			return waitForRecoverSync, errors.New(podnetworkchaos.Status.FailedMessage)
   210  		}
   211  
   212  		if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
   213  			return v1alpha1.NotInjected, nil
   214  		}
   215  
   216  		return waitForRecoverSync, nil
   217  	}
   218  
   219  	var pod v1.Pod
   220  	namespacedName, err := controller.ParseNamespacedName(record.Id)
   221  	if err != nil {
   222  		// This error is not expected to exist
   223  		return v1alpha1.Injected, err
   224  	}
   225  	err = impl.Client.Get(ctx, namespacedName, &pod)
   226  	if err != nil {
   227  		// TODO: handle this error
   228  		if k8sError.IsNotFound(err) {
   229  			return v1alpha1.NotInjected, nil
   230  		}
   231  
   232  		if k8sError.IsForbidden(err) {
   233  			if strings.Contains(err.Error(), "because it is being terminated") {
   234  				return v1alpha1.NotInjected, nil
   235  			}
   236  		}
   237  		return v1alpha1.Injected, err
   238  	}
   239  
   240  	source := networkchaos.Namespace + "/" + networkchaos.Name
   241  	// TODO: use the DI but not construct it manually
   242  	m := impl.builder.WithInit(source, types.NamespacedName{
   243  		Namespace: pod.Namespace,
   244  		Name:      pod.Name,
   245  	})
   246  	generationNumber, err := m.Commit(ctx, networkchaos)
   247  	if err != nil {
   248  		if err == podnetworkchaosmanager.ErrPodNotFound || err == podnetworkchaosmanager.ErrPodNotRunning {
   249  			return v1alpha1.NotInjected, nil
   250  		}
   251  		return v1alpha1.Injected, err
   252  	}
   253  
   254  	// Now modify the custom status and phase
   255  	networkchaos.Status.Instances[record.Id] = generationNumber
   256  	return waitForRecoverSync, nil
   257  }
   258  
   259  func (impl *Impl) ApplyTc(ctx context.Context, m *podnetworkchaosmanager.PodNetworkManager, targets []*v1alpha1.Record, networkchaos *v1alpha1.NetworkChaos, ipSetPostFix string, device string) error {
   260  	spec := networkchaos.Spec
   261  	tcType := v1alpha1.Bandwidth
   262  	switch spec.Action {
   263  	case v1alpha1.NetemAction, v1alpha1.DelayAction, v1alpha1.DuplicateAction, v1alpha1.CorruptAction, v1alpha1.LossAction:
   264  		tcType = v1alpha1.Netem
   265  	case v1alpha1.BandwidthAction:
   266  		tcType = v1alpha1.Bandwidth
   267  	default:
   268  		return fmt.Errorf("unknown action %s", spec.Action)
   269  	}
   270  
   271  	externalCidrs, err := netutils.ResolveCidrs(networkchaos.Spec.ExternalTargets)
   272  	if err != nil {
   273  		return err
   274  	}
   275  
   276  	if len(targets)+len(externalCidrs) == 0 {
   277  		impl.Log.Info("apply traffic control", "sources", m.Source)
   278  		m.T.Append(v1alpha1.RawTrafficControl{
   279  			Type:        tcType,
   280  			TcParameter: spec.TcParameter,
   281  			Source:      m.Source,
   282  			Device:      device,
   283  		})
   284  		return nil
   285  	}
   286  
   287  	targetPods := []v1.Pod{}
   288  	for _, record := range targets {
   289  		var pod v1.Pod
   290  		namespacedName, err := controller.ParseNamespacedName(record.Id)
   291  		if err != nil {
   292  			// TODO: handle this error
   293  			return err
   294  		}
   295  		err = impl.Client.Get(ctx, namespacedName, &pod)
   296  		if err != nil {
   297  			// TODO: handle this error
   298  			return err
   299  		}
   300  		targetPods = append(targetPods, pod)
   301  	}
   302  	dstIpset := ipset.BuildIPSet(targetPods, externalCidrs, networkchaos, string(tcType[0:2])+ipSetPostFix, m.Source)
   303  	impl.Log.Info("apply traffic control with filter", "sources", m.Source, "ipset", dstIpset)
   304  
   305  	m.T.Append(dstIpset)
   306  	m.T.Append(v1alpha1.RawTrafficControl{
   307  		Type:        tcType,
   308  		TcParameter: spec.TcParameter,
   309  		Source:      m.Source,
   310  		IPSet:       dstIpset.Name,
   311  		Device:      device,
   312  	})
   313  
   314  	return nil
   315  }
   316  
   317  func NewImpl(c client.Client, b *podnetworkchaosmanager.Builder, log logr.Logger) *Impl {
   318  	return &Impl{
   319  		Client:  c,
   320  		builder: b,
   321  		Log:     log.WithName("trafficcontrol"),
   322  	}
   323  }
   324