...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/partition/impl.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/partition

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package partition
    17  
    18  import (
    19  	"context"
    20  	"strings"
    21  
    22  	"github.com/go-logr/logr"
    23  	"github.com/pkg/errors"
    24  	v1 "k8s.io/api/core/v1"
    25  	k8sError "k8s.io/apimachinery/pkg/api/errors"
    26  	"k8s.io/apimachinery/pkg/types"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  
    29  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    30  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/podnetworkchaosmanager"
    31  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/iptable"
    34  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/netutils"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
    36  	pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    37  )
    38  
    39  var _ impltypes.ChaosImpl = (*Impl)(nil)
    40  
    41  const (
    42  	sourceIPSetPostFix = "src"
    43  	targetIPSetPostFix = "tgt"
    44  )
    45  
    46  type Impl struct {
    47  	client.Client
    48  
    49  	builder *podnetworkchaosmanager.Builder
    50  
    51  	Log logr.Logger
    52  }
    53  
    54  const (
    55  	waitForApplySync   v1alpha1.Phase = "Not Injected/Wait"
    56  	waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
    57  )
    58  
    59  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    60  	impl.Log.Info("partition Apply", "chaos", obj)
    61  	networkchaos, ok := obj.(*v1alpha1.NetworkChaos)
    62  	if !ok {
    63  		err := errors.New("chaos is not NetworkChaos")
    64  		impl.Log.Error(err, "chaos is not NetworkChaos", "chaos", obj)
    65  		return v1alpha1.NotInjected, err
    66  	}
    67  	if networkchaos.Status.Instances == nil {
    68  		networkchaos.Status.Instances = make(map[string]int64)
    69  	}
    70  
    71  	record := records[index]
    72  	phase := record.Phase
    73  
    74  	if phase == waitForApplySync {
    75  		podnetworkchaos := &v1alpha1.PodNetworkChaos{}
    76  		namespacedName, err := controller.ParseNamespacedName(record.Id)
    77  		if err != nil {
    78  			return waitForApplySync, err
    79  		}
    80  		err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
    81  		if err != nil {
    82  			if k8sError.IsNotFound(err) {
    83  				return v1alpha1.NotInjected, nil
    84  			}
    85  
    86  			if k8sError.IsForbidden(err) {
    87  				if strings.Contains(err.Error(), "because it is being terminated") {
    88  					return v1alpha1.NotInjected, nil
    89  				}
    90  			}
    91  
    92  			return waitForApplySync, err
    93  		}
    94  
    95  		if podnetworkchaos.Status.FailedMessage != "" {
    96  			return waitForApplySync, errors.New(podnetworkchaos.Status.FailedMessage)
    97  		}
    98  
    99  		if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
   100  			return v1alpha1.Injected, nil
   101  		}
   102  
   103  		return waitForApplySync, nil
   104  	}
   105  
   106  	var pod v1.Pod
   107  	namespacedName, err := controller.ParseNamespacedName(record.Id)
   108  	if err != nil {
   109  		return v1alpha1.NotInjected, err
   110  	}
   111  	err = impl.Client.Get(ctx, namespacedName, &pod)
   112  	if err != nil {
   113  		// TODO: handle this error
   114  		return v1alpha1.NotInjected, err
   115  	}
   116  
   117  	source := networkchaos.Namespace + "/" + networkchaos.Name
   118  	m := func() *podnetworkchaosmanager.PodNetworkManager {
   119  		shouldInit := true
   120  
   121  		if record.SelectorKey == ".Target" {
   122  			for _, r := range records {
   123  				if r.Id == record.Id {
   124  					// Only init in the "." selector key so it won't be cleared
   125  					// by another one with the same key in the ".Target"
   126  					shouldInit = false
   127  				}
   128  			}
   129  		}
   130  
   131  		if shouldInit {
   132  			return impl.builder.WithInit(source, types.NamespacedName{
   133  				Namespace: pod.Namespace,
   134  				Name:      pod.Name,
   135  			})
   136  		}
   137  
   138  		return impl.builder.Build(source, types.NamespacedName{
   139  			Namespace: pod.Namespace,
   140  			Name:      pod.Name,
   141  		})
   142  	}()
   143  
   144  	if record.SelectorKey == "." {
   145  		shouldCommit := false
   146  
   147  		if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
   148  			var targets []*v1alpha1.Record
   149  			for _, record := range records {
   150  				if record.SelectorKey == ".Target" {
   151  					targets = append(targets, record)
   152  				}
   153  			}
   154  
   155  			err := impl.SetDrop(ctx, m, targets, networkchaos, targetIPSetPostFix, v1alpha1.Output, networkchaos.Spec.Device)
   156  			if err != nil {
   157  				return v1alpha1.NotInjected, err
   158  			}
   159  
   160  			shouldCommit = true
   161  		}
   162  
   163  		if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
   164  			var targets []*v1alpha1.Record
   165  			for _, record := range records {
   166  				if record.SelectorKey == ".Target" {
   167  					targets = append(targets, record)
   168  				}
   169  			}
   170  
   171  			err := impl.SetDrop(ctx, m, targets, networkchaos, targetIPSetPostFix, v1alpha1.Input, networkchaos.Spec.Device)
   172  			if err != nil {
   173  				return v1alpha1.NotInjected, err
   174  			}
   175  
   176  			shouldCommit = true
   177  		}
   178  
   179  		if shouldCommit {
   180  			generationNumber, err := m.Commit(ctx, networkchaos)
   181  			if err != nil {
   182  				return v1alpha1.NotInjected, err
   183  			}
   184  
   185  			// modify the custom status
   186  			networkchaos.Status.Instances[record.Id] = generationNumber
   187  			return waitForApplySync, nil
   188  		}
   189  
   190  		return v1alpha1.Injected, nil
   191  	} else if record.SelectorKey == ".Target" {
   192  		shouldCommit := false
   193  
   194  		if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
   195  			var targets []*v1alpha1.Record
   196  			for _, record := range records {
   197  				if record.SelectorKey == "." {
   198  					targets = append(targets, record)
   199  				}
   200  			}
   201  
   202  			err := impl.SetDrop(ctx, m, targets, networkchaos, sourceIPSetPostFix, v1alpha1.Output, networkchaos.Spec.TargetDevice)
   203  			if err != nil {
   204  				return v1alpha1.NotInjected, err
   205  			}
   206  
   207  			shouldCommit = true
   208  		}
   209  
   210  		if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
   211  			var targets []*v1alpha1.Record
   212  			for _, record := range records {
   213  				if record.SelectorKey == "." {
   214  					targets = append(targets, record)
   215  				}
   216  			}
   217  
   218  			err := impl.SetDrop(ctx, m, targets, networkchaos, sourceIPSetPostFix, v1alpha1.Input, networkchaos.Spec.TargetDevice)
   219  			if err != nil {
   220  				return v1alpha1.NotInjected, err
   221  			}
   222  
   223  			shouldCommit = true
   224  		}
   225  
   226  		if shouldCommit {
   227  			generationNumber, err := m.Commit(ctx, networkchaos)
   228  			if err != nil {
   229  				return v1alpha1.NotInjected, err
   230  			}
   231  
   232  			// modify the custom status
   233  			networkchaos.Status.Instances[record.Id] = generationNumber
   234  			return waitForApplySync, nil
   235  		}
   236  
   237  		return v1alpha1.Injected, nil
   238  	} else {
   239  		impl.Log.Info("unknown selector key", "record", record)
   240  		return v1alpha1.NotInjected, nil
   241  	}
   242  }
   243  
   244  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   245  	networkchaos, ok := obj.(*v1alpha1.NetworkChaos)
   246  	if !ok {
   247  		err := errors.New("chaos is not NetworkChaos")
   248  		impl.Log.Error(err, "chaos is not NetworkChaos", "chaos", obj)
   249  		return v1alpha1.Injected, err
   250  	}
   251  	if networkchaos.Status.Instances == nil {
   252  		networkchaos.Status.Instances = make(map[string]int64)
   253  	}
   254  
   255  	record := records[index]
   256  	phase := record.Phase
   257  
   258  	if phase == waitForRecoverSync {
   259  		podnetworkchaos := &v1alpha1.PodNetworkChaos{}
   260  		namespacedName, err := controller.ParseNamespacedName(record.Id)
   261  		if err != nil {
   262  			// This error is not expected to exist
   263  			return waitForApplySync, err
   264  		}
   265  		err = impl.Client.Get(ctx, namespacedName, podnetworkchaos)
   266  		if err != nil {
   267  			// TODO: handle this error
   268  			if k8sError.IsNotFound(err) {
   269  				return v1alpha1.NotInjected, nil
   270  			}
   271  			return waitForRecoverSync, err
   272  		}
   273  
   274  		if podnetworkchaos.Status.FailedMessage != "" {
   275  			return waitForRecoverSync, errors.New(podnetworkchaos.Status.FailedMessage)
   276  		}
   277  
   278  		if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
   279  			return v1alpha1.NotInjected, nil
   280  		}
   281  
   282  		return waitForRecoverSync, nil
   283  	}
   284  
   285  	var pod v1.Pod
   286  	namespacedName, err := controller.ParseNamespacedName(record.Id)
   287  	if err != nil {
   288  		// This error is not expected to exist
   289  		return v1alpha1.NotInjected, err
   290  	}
   291  	err = impl.Client.Get(ctx, namespacedName, &pod)
   292  	if err != nil {
   293  		// TODO: handle this error
   294  		if k8sError.IsNotFound(err) {
   295  			return v1alpha1.NotInjected, nil
   296  		}
   297  		return v1alpha1.Injected, err
   298  	}
   299  
   300  	source := networkchaos.Namespace + "/" + networkchaos.Name
   301  	m := impl.builder.WithInit(source, types.NamespacedName{
   302  		Namespace: pod.Namespace,
   303  		Name:      pod.Name,
   304  	})
   305  	generationNumber, err := m.Commit(ctx, networkchaos)
   306  	if err != nil {
   307  		if err == podnetworkchaosmanager.ErrPodNotFound || err == podnetworkchaosmanager.ErrPodNotRunning {
   308  			return v1alpha1.NotInjected, nil
   309  		}
   310  
   311  		if k8sError.IsForbidden(err) {
   312  			if strings.Contains(err.Error(), "because it is being terminated") {
   313  				return v1alpha1.NotInjected, nil
   314  			}
   315  		}
   316  		return v1alpha1.Injected, err
   317  	}
   318  
   319  	// Now modify the custom status and phase
   320  	networkchaos.Status.Instances[record.Id] = generationNumber
   321  	return waitForRecoverSync, nil
   322  }
   323  
   324  func (impl *Impl) SetDrop(ctx context.Context, m *podnetworkchaosmanager.PodNetworkManager, targets []*v1alpha1.Record, networkchaos *v1alpha1.NetworkChaos, ipSetPostFix string, chainDirection v1alpha1.ChainDirection, device string) error {
   325  	externalCidrs, err := netutils.ResolveCidrs(networkchaos.Spec.ExternalTargets)
   326  	if err != nil {
   327  		return err
   328  	}
   329  
   330  	pbChainDirection := pb.Chain_OUTPUT
   331  	if chainDirection == v1alpha1.Input {
   332  		pbChainDirection = pb.Chain_INPUT
   333  	}
   334  	if len(targets)+len(externalCidrs) == 0 {
   335  		impl.Log.Info("apply traffic control", "sources", m.Source)
   336  		m.T.Append(v1alpha1.RawIptables{
   337  			Name:      iptable.GenerateName(pbChainDirection, networkchaos),
   338  			Direction: chainDirection,
   339  			IPSets:    nil,
   340  			RawRuleSource: v1alpha1.RawRuleSource{
   341  				Source: m.Source,
   342  			},
   343  			Device: device,
   344  		})
   345  		return nil
   346  	}
   347  
   348  	targetPods := []v1.Pod{}
   349  	for _, record := range targets {
   350  		var pod v1.Pod
   351  		namespacedName, err := controller.ParseNamespacedName(record.Id)
   352  		if err != nil {
   353  			// TODO: handle this error
   354  			return err
   355  		}
   356  		err = impl.Client.Get(ctx, namespacedName, &pod)
   357  		if err != nil {
   358  			// TODO: handle this error
   359  			return err
   360  		}
   361  		targetPods = append(targetPods, pod)
   362  	}
   363  	dstIPSets := ipset.BuildIPSets(targetPods, externalCidrs, networkchaos, ipSetPostFix, m.Source)
   364  	dstSetIPSet := ipset.BuildSetIPSet(dstIPSets, networkchaos, ipSetPostFix, m.Source)
   365  
   366  	for _, ipSet := range dstIPSets {
   367  		m.T.Append(ipSet)
   368  	}
   369  
   370  	m.T.Append(dstSetIPSet)
   371  
   372  	m.T.Append(v1alpha1.RawIptables{
   373  		Name:      iptable.GenerateName(pbChainDirection, networkchaos),
   374  		Direction: chainDirection,
   375  		IPSets:    []string{dstSetIPSet.Name},
   376  		RawRuleSource: v1alpha1.RawRuleSource{
   377  			Source: m.Source,
   378  		},
   379  		Device: device,
   380  	})
   381  
   382  	return nil
   383  }
   384  
   385  func NewImpl(c client.Client, b *podnetworkchaosmanager.Builder, log logr.Logger) *Impl {
   386  	return &Impl{
   387  		Client:  c,
   388  		builder: b,
   389  		Log:     log.WithName("partition"),
   390  	}
   391  }
   392