...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/trafficcontrol/impl.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/trafficcontrol

     1  // Copyright 2020 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package trafficcontrol
    15  
    16  import (
    17  	"context"
    18  	"errors"
    19  	"fmt"
    20  	"strings"
    21  
    22  	"github.com/go-logr/logr"
    23  	v1 "k8s.io/api/core/v1"
    24  	k8sError "k8s.io/apimachinery/pkg/api/errors"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	"sigs.k8s.io/controller-runtime/pkg/client"
    27  
    28  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    29  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/podnetworkchaosmanager"
    30  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/netutils"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
    33  )
    34  
    35  const (
    36  	networkTcActionMsg    = "network traffic control action duration %s"
    37  	networkChaosSourceMsg = "This is a source pod."
    38  	networkChaosTargetMsg = "This is a target pod."
    39  
    40  	targetIPSetPostFix = "tgt"
    41  	sourceIPSetPostFix = "src"
    42  )
    43  
    44  const (
    45  	waitForApplySync   v1alpha1.Phase = "Not Injected/Wait"
    46  	waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
    47  )
    48  
    49  type Impl struct {
    50  	client.Client
    51  
    52  	builder *podnetworkchaosmanager.Builder
    53  
    54  	Log logr.Logger
    55  }
    56  
    57  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    58  	// The only possible phase to get in here is "Not Injected" or "Not Injected/Wait"
    59  
    60  	impl.Log.Info("traffic control Apply", "namespace", obj.GetObjectMeta().Namespace, "name", obj.GetObjectMeta().Name)
    61  	networkchaos := obj.(*v1alpha1.NetworkChaos)
    62  	if networkchaos.Status.Instances == nil {
    63  		networkchaos.Status.Instances = make(map[string]int64)
    64  	}
    65  	if networkchaos.Status.Instances == nil {
    66  		networkchaos.Status.Instances = make(map[string]int64)
    67  	}
    68  
    69  	record := records[index]
    70  	phase := record.Phase
    71  
    72  	if phase == waitForApplySync {
    73  		podnetworkchaos := &v1alpha1.PodNetworkChaos{}
    74  		err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), podnetworkchaos)
    75  		if err != nil {
    76  			if k8sError.IsNotFound(err) {
    77  				return v1alpha1.NotInjected, nil
    78  			}
    79  
    80  			if k8sError.IsForbidden(err) {
    81  				if strings.Contains(err.Error(), "because it is being terminated") {
    82  					return v1alpha1.NotInjected, nil
    83  				}
    84  			}
    85  
    86  			return waitForApplySync, err
    87  		}
    88  
    89  		if podnetworkchaos.Status.FailedMessage != "" {
    90  			return waitForApplySync, errors.New(podnetworkchaos.Status.FailedMessage)
    91  		}
    92  
    93  		if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
    94  			return v1alpha1.Injected, nil
    95  		}
    96  
    97  		return waitForApplySync, nil
    98  	}
    99  
   100  	var pod v1.Pod
   101  	err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
   102  	if err != nil {
   103  		// TODO: handle this error
   104  		return v1alpha1.NotInjected, err
   105  	}
   106  
   107  	source := networkchaos.Namespace + "/" + networkchaos.Name
   108  	m := impl.builder.WithInit(source, types.NamespacedName{
   109  		Namespace: pod.Namespace,
   110  		Name:      pod.Name,
   111  	})
   112  
   113  	if record.SelectorKey == "." {
   114  		if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
   115  			var targets []*v1alpha1.Record
   116  			for _, record := range records {
   117  				if record.SelectorKey == ".Target" {
   118  					targets = append(targets, record)
   119  				}
   120  			}
   121  
   122  			err := impl.ApplyTc(ctx, m, targets, networkchaos, targetIPSetPostFix)
   123  			if err != nil {
   124  				return v1alpha1.NotInjected, err
   125  			}
   126  
   127  			generationNumber, err := m.Commit(ctx, networkchaos)
   128  			if err != nil {
   129  				return v1alpha1.NotInjected, err
   130  			}
   131  
   132  			// modify the custom status
   133  			networkchaos.Status.Instances[record.Id] = generationNumber
   134  			return waitForApplySync, nil
   135  		}
   136  
   137  		return v1alpha1.Injected, nil
   138  	} else if record.SelectorKey == ".Target" {
   139  		if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
   140  			var targets []*v1alpha1.Record
   141  			for _, record := range records {
   142  				if record.SelectorKey == "." {
   143  					targets = append(targets, record)
   144  				}
   145  			}
   146  
   147  			err := impl.ApplyTc(ctx, m, targets, networkchaos, sourceIPSetPostFix)
   148  			if err != nil {
   149  				return v1alpha1.NotInjected, err
   150  			}
   151  
   152  			generationNumber, err := m.Commit(ctx, networkchaos)
   153  			if err != nil {
   154  				return v1alpha1.NotInjected, err
   155  			}
   156  
   157  			// modify the custom status
   158  			networkchaos.Status.Instances[record.Id] = generationNumber
   159  			return waitForApplySync, nil
   160  		}
   161  
   162  		return v1alpha1.Injected, nil
   163  	} else {
   164  		impl.Log.Info("unknown selector key", "record", record)
   165  		return v1alpha1.NotInjected, nil
   166  	}
   167  }
   168  
   169  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   170  	// The only possible phase to get in here is "Injected" or "Injected/Wait"
   171  
   172  	networkchaos := obj.(*v1alpha1.NetworkChaos)
   173  	if networkchaos.Status.Instances == nil {
   174  		networkchaos.Status.Instances = make(map[string]int64)
   175  	}
   176  	if networkchaos.Status.Instances == nil {
   177  		networkchaos.Status.Instances = make(map[string]int64)
   178  	}
   179  
   180  	record := records[index]
   181  	phase := record.Phase
   182  
   183  	if phase == waitForRecoverSync {
   184  		podnetworkchaos := &v1alpha1.PodNetworkChaos{}
   185  		err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), podnetworkchaos)
   186  		if err != nil {
   187  			// TODO: handle this error
   188  			if k8sError.IsNotFound(err) {
   189  				return v1alpha1.NotInjected, nil
   190  			}
   191  			return waitForRecoverSync, err
   192  		}
   193  
   194  		if podnetworkchaos.Status.FailedMessage != "" {
   195  			return waitForRecoverSync, errors.New(podnetworkchaos.Status.FailedMessage)
   196  		}
   197  
   198  		if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
   199  			return v1alpha1.NotInjected, nil
   200  		}
   201  
   202  		return waitForRecoverSync, nil
   203  	}
   204  
   205  	var pod v1.Pod
   206  	err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
   207  	if err != nil {
   208  		// TODO: handle this error
   209  		if k8sError.IsNotFound(err) {
   210  			return v1alpha1.NotInjected, nil
   211  		}
   212  
   213  		if k8sError.IsForbidden(err) {
   214  			if strings.Contains(err.Error(), "because it is being terminated") {
   215  				return v1alpha1.NotInjected, nil
   216  			}
   217  		}
   218  		return v1alpha1.Injected, err
   219  	}
   220  
   221  	source := networkchaos.Namespace + "/" + networkchaos.Name
   222  	// TODO: use the DI but not construct it manually
   223  	m := impl.builder.WithInit(source, types.NamespacedName{
   224  		Namespace: pod.Namespace,
   225  		Name:      pod.Name,
   226  	})
   227  	generationNumber, err := m.Commit(ctx, networkchaos)
   228  	if err != nil {
   229  		if err == podnetworkchaosmanager.ErrPodNotFound || err == podnetworkchaosmanager.ErrPodNotRunning {
   230  			return v1alpha1.NotInjected, nil
   231  		}
   232  		return v1alpha1.Injected, err
   233  	}
   234  
   235  	// Now modify the custom status and phase
   236  	networkchaos.Status.Instances[record.Id] = generationNumber
   237  	return waitForRecoverSync, nil
   238  }
   239  
   240  func (impl *Impl) ApplyTc(ctx context.Context, m *podnetworkchaosmanager.PodNetworkManager, targets []*v1alpha1.Record, networkchaos *v1alpha1.NetworkChaos, ipSetPostFix string) error {
   241  	spec := networkchaos.Spec
   242  	tcType := v1alpha1.Bandwidth
   243  	switch spec.Action {
   244  	case v1alpha1.NetemAction, v1alpha1.DelayAction, v1alpha1.DuplicateAction, v1alpha1.CorruptAction, v1alpha1.LossAction:
   245  		tcType = v1alpha1.Netem
   246  	case v1alpha1.BandwidthAction:
   247  		tcType = v1alpha1.Bandwidth
   248  	default:
   249  		return fmt.Errorf("unknown action %s", spec.Action)
   250  	}
   251  
   252  	externalCidrs, err := netutils.ResolveCidrs(networkchaos.Spec.ExternalTargets)
   253  	if err != nil {
   254  		return err
   255  	}
   256  
   257  	if len(targets)+len(externalCidrs) == 0 {
   258  		impl.Log.Info("apply traffic control", "sources", m.Source)
   259  		m.T.Append(v1alpha1.RawTrafficControl{
   260  			Type:        tcType,
   261  			TcParameter: spec.TcParameter,
   262  			Source:      m.Source,
   263  		})
   264  		return nil
   265  	}
   266  
   267  	targetPods := []v1.Pod{}
   268  	for _, record := range targets {
   269  		var pod v1.Pod
   270  		err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
   271  		if err != nil {
   272  			// TODO: handle this error
   273  			return err
   274  		}
   275  		targetPods = append(targetPods, pod)
   276  	}
   277  	dstIpset := ipset.BuildIPSet(targetPods, externalCidrs, networkchaos, string(tcType[0:2])+ipSetPostFix, m.Source)
   278  	impl.Log.Info("apply traffic control with filter", "sources", m.Source, "ipset", dstIpset)
   279  
   280  	m.T.Append(dstIpset)
   281  	m.T.Append(v1alpha1.RawTrafficControl{
   282  		Type:        tcType,
   283  		TcParameter: spec.TcParameter,
   284  		Source:      m.Source,
   285  		IPSet:       dstIpset.Name,
   286  	})
   287  
   288  	return nil
   289  }
   290  
   291  func NewImpl(c client.Client, b *podnetworkchaosmanager.Builder, log logr.Logger) *Impl {
   292  	return &Impl{
   293  		Client:  c,
   294  		builder: b,
   295  		Log:     log.WithName("trafficcontrol"),
   296  	}
   297  }
   298