...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/partition/impl.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/partition

     1  // Copyright 2019 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package partition
    15  
    16  import (
    17  	"context"
    18  	"errors"
    19  	"strings"
    20  
    21  	"github.com/go-logr/logr"
    22  	v1 "k8s.io/api/core/v1"
    23  	k8sError "k8s.io/apimachinery/pkg/api/errors"
    24  	"k8s.io/apimachinery/pkg/types"
    25  	"sigs.k8s.io/controller-runtime/pkg/client"
    26  
    27  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    28  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/networkchaos/podnetworkchaosmanager"
    29  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
    30  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/iptable"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/netutils"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
    33  	pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    34  )
    35  
    36  const (
    37  	sourceIPSetPostFix = "src"
    38  	targetIPSetPostFix = "tgt"
    39  )
    40  
    41  type Impl struct {
    42  	client.Client
    43  
    44  	builder *podnetworkchaosmanager.Builder
    45  
    46  	Log logr.Logger
    47  }
    48  
    49  const (
    50  	waitForApplySync   v1alpha1.Phase = "Not Injected/Wait"
    51  	waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
    52  )
    53  
    54  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    55  	impl.Log.Info("partition Apply", "chaos", obj)
    56  	networkchaos, ok := obj.(*v1alpha1.NetworkChaos)
    57  	if !ok {
    58  		err := errors.New("chaos is not NetworkChaos")
    59  		impl.Log.Error(err, "chaos is not NetworkChaos", "chaos", obj)
    60  		return v1alpha1.NotInjected, err
    61  	}
    62  	if networkchaos.Status.Instances == nil {
    63  		networkchaos.Status.Instances = make(map[string]int64)
    64  	}
    65  
    66  	record := records[index]
    67  	phase := record.Phase
    68  
    69  	if phase == waitForApplySync {
    70  		podnetworkchaos := &v1alpha1.PodNetworkChaos{}
    71  		err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), podnetworkchaos)
    72  		if err != nil {
    73  			if k8sError.IsNotFound(err) {
    74  				return v1alpha1.NotInjected, nil
    75  			}
    76  
    77  			if k8sError.IsForbidden(err) {
    78  				if strings.Contains(err.Error(), "because it is being terminated") {
    79  					return v1alpha1.NotInjected, nil
    80  				}
    81  			}
    82  
    83  			return waitForApplySync, err
    84  		}
    85  
    86  		if podnetworkchaos.Status.FailedMessage != "" {
    87  			return waitForApplySync, errors.New(podnetworkchaos.Status.FailedMessage)
    88  		}
    89  
    90  		if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
    91  			return v1alpha1.Injected, nil
    92  		}
    93  
    94  		return waitForApplySync, nil
    95  	}
    96  
    97  	var pod v1.Pod
    98  	err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
    99  	if err != nil {
   100  		// TODO: handle this error
   101  		return v1alpha1.NotInjected, err
   102  	}
   103  
   104  	source := networkchaos.Namespace + "/" + networkchaos.Name
   105  	m := func() *podnetworkchaosmanager.PodNetworkManager {
   106  		shouldInit := true
   107  
   108  		if record.SelectorKey == ".Target" {
   109  			for _, r := range records {
   110  				if r.Id == record.Id {
   111  					// Only init in the "." selector key so it won't be cleared
   112  					// by another one with the same key in the ".Target"
   113  					shouldInit = false
   114  				}
   115  			}
   116  		}
   117  
   118  		if shouldInit {
   119  			return impl.builder.WithInit(source, types.NamespacedName{
   120  				Namespace: pod.Namespace,
   121  				Name:      pod.Name,
   122  			})
   123  		}
   124  
   125  		return impl.builder.Build(source, types.NamespacedName{
   126  			Namespace: pod.Namespace,
   127  			Name:      pod.Name,
   128  		})
   129  	}()
   130  
   131  	if record.SelectorKey == "." {
   132  		shouldCommit := false
   133  
   134  		if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
   135  			var targets []*v1alpha1.Record
   136  			for _, record := range records {
   137  				if record.SelectorKey == ".Target" {
   138  					targets = append(targets, record)
   139  				}
   140  			}
   141  
   142  			err := impl.SetDrop(ctx, m, targets, networkchaos, targetIPSetPostFix, v1alpha1.Output)
   143  			if err != nil {
   144  				return v1alpha1.NotInjected, err
   145  			}
   146  
   147  			shouldCommit = true
   148  		}
   149  
   150  		if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
   151  			var targets []*v1alpha1.Record
   152  			for _, record := range records {
   153  				if record.SelectorKey == ".Target" {
   154  					targets = append(targets, record)
   155  				}
   156  			}
   157  
   158  			err := impl.SetDrop(ctx, m, targets, networkchaos, targetIPSetPostFix, v1alpha1.Input)
   159  			if err != nil {
   160  				return v1alpha1.NotInjected, err
   161  			}
   162  
   163  			shouldCommit = true
   164  		}
   165  
   166  		if shouldCommit {
   167  			generationNumber, err := m.Commit(ctx, networkchaos)
   168  			if err != nil {
   169  				return v1alpha1.NotInjected, err
   170  			}
   171  
   172  			// modify the custom status
   173  			networkchaos.Status.Instances[record.Id] = generationNumber
   174  			return waitForApplySync, nil
   175  		}
   176  
   177  		return v1alpha1.Injected, nil
   178  	} else if record.SelectorKey == ".Target" {
   179  		shouldCommit := false
   180  
   181  		if networkchaos.Spec.Direction == v1alpha1.From || networkchaos.Spec.Direction == v1alpha1.Both {
   182  			var targets []*v1alpha1.Record
   183  			for _, record := range records {
   184  				if record.SelectorKey == "." {
   185  					targets = append(targets, record)
   186  				}
   187  			}
   188  
   189  			err := impl.SetDrop(ctx, m, targets, networkchaos, sourceIPSetPostFix, v1alpha1.Output)
   190  			if err != nil {
   191  				return v1alpha1.NotInjected, err
   192  			}
   193  
   194  			shouldCommit = true
   195  		}
   196  
   197  		if networkchaos.Spec.Direction == v1alpha1.To || networkchaos.Spec.Direction == v1alpha1.Both {
   198  			var targets []*v1alpha1.Record
   199  			for _, record := range records {
   200  				if record.SelectorKey == "." {
   201  					targets = append(targets, record)
   202  				}
   203  			}
   204  
   205  			err := impl.SetDrop(ctx, m, targets, networkchaos, sourceIPSetPostFix, v1alpha1.Input)
   206  			if err != nil {
   207  				return v1alpha1.NotInjected, err
   208  			}
   209  
   210  			shouldCommit = true
   211  		}
   212  
   213  		if shouldCommit {
   214  			generationNumber, err := m.Commit(ctx, networkchaos)
   215  			if err != nil {
   216  				return v1alpha1.NotInjected, err
   217  			}
   218  
   219  			// modify the custom status
   220  			networkchaos.Status.Instances[record.Id] = generationNumber
   221  			return waitForApplySync, nil
   222  		}
   223  
   224  		return v1alpha1.Injected, nil
   225  	} else {
   226  		impl.Log.Info("unknown selector key", "record", record)
   227  		return v1alpha1.NotInjected, nil
   228  	}
   229  }
   230  
   231  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   232  	networkchaos, ok := obj.(*v1alpha1.NetworkChaos)
   233  	if !ok {
   234  		err := errors.New("chaos is not NetworkChaos")
   235  		impl.Log.Error(err, "chaos is not NetworkChaos", "chaos", obj)
   236  		return v1alpha1.Injected, err
   237  	}
   238  	if networkchaos.Status.Instances == nil {
   239  		networkchaos.Status.Instances = make(map[string]int64)
   240  	}
   241  
   242  	record := records[index]
   243  	phase := record.Phase
   244  
   245  	if phase == waitForRecoverSync {
   246  		podnetworkchaos := &v1alpha1.PodNetworkChaos{}
   247  		err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), podnetworkchaos)
   248  		if err != nil {
   249  			// TODO: handle this error
   250  			if k8sError.IsNotFound(err) {
   251  				return v1alpha1.NotInjected, nil
   252  			}
   253  			return waitForRecoverSync, err
   254  		}
   255  
   256  		if podnetworkchaos.Status.FailedMessage != "" {
   257  			return waitForRecoverSync, errors.New(podnetworkchaos.Status.FailedMessage)
   258  		}
   259  
   260  		if podnetworkchaos.Status.ObservedGeneration >= networkchaos.Status.Instances[record.Id] {
   261  			return v1alpha1.NotInjected, nil
   262  		}
   263  
   264  		return waitForRecoverSync, nil
   265  	}
   266  
   267  	var pod v1.Pod
   268  	err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
   269  	if err != nil {
   270  		// TODO: handle this error
   271  		if k8sError.IsNotFound(err) {
   272  			return v1alpha1.NotInjected, nil
   273  		}
   274  		return v1alpha1.Injected, err
   275  	}
   276  
   277  	source := networkchaos.Namespace + "/" + networkchaos.Name
   278  	m := impl.builder.WithInit(source, types.NamespacedName{
   279  		Namespace: pod.Namespace,
   280  		Name:      pod.Name,
   281  	})
   282  	generationNumber, err := m.Commit(ctx, networkchaos)
   283  	if err != nil {
   284  		if err == podnetworkchaosmanager.ErrPodNotFound || err == podnetworkchaosmanager.ErrPodNotRunning {
   285  			return v1alpha1.NotInjected, nil
   286  		}
   287  
   288  		if k8sError.IsForbidden(err) {
   289  			if strings.Contains(err.Error(), "because it is being terminated") {
   290  				return v1alpha1.NotInjected, nil
   291  			}
   292  		}
   293  		return v1alpha1.Injected, err
   294  	}
   295  
   296  	// Now modify the custom status and phase
   297  	networkchaos.Status.Instances[record.Id] = generationNumber
   298  	return waitForRecoverSync, nil
   299  }
   300  
   301  func (impl *Impl) SetDrop(ctx context.Context, m *podnetworkchaosmanager.PodNetworkManager, targets []*v1alpha1.Record, networkchaos *v1alpha1.NetworkChaos, ipSetPostFix string, chainDirection v1alpha1.ChainDirection) error {
   302  	externalCidrs, err := netutils.ResolveCidrs(networkchaos.Spec.ExternalTargets)
   303  	if err != nil {
   304  		return err
   305  	}
   306  
   307  	pbChainDirection := pb.Chain_OUTPUT
   308  	if chainDirection == v1alpha1.Input {
   309  		pbChainDirection = pb.Chain_INPUT
   310  	}
   311  	if len(targets)+len(externalCidrs) == 0 {
   312  		impl.Log.Info("apply traffic control", "sources", m.Source)
   313  		m.T.Append(v1alpha1.RawIptables{
   314  			Name:      iptable.GenerateName(pbChainDirection, networkchaos),
   315  			Direction: chainDirection,
   316  			IPSets:    nil,
   317  			RawRuleSource: v1alpha1.RawRuleSource{
   318  				Source: m.Source,
   319  			},
   320  		})
   321  		return nil
   322  	}
   323  
   324  	targetPods := []v1.Pod{}
   325  	for _, record := range targets {
   326  		var pod v1.Pod
   327  		err := impl.Client.Get(ctx, controller.ParseNamespacedName(record.Id), &pod)
   328  		if err != nil {
   329  			// TODO: handle this error
   330  			return err
   331  		}
   332  		targetPods = append(targetPods, pod)
   333  	}
   334  	dstIpset := ipset.BuildIPSet(targetPods, externalCidrs, networkchaos, ipSetPostFix, m.Source)
   335  	m.T.Append(dstIpset)
   336  	m.T.Append(v1alpha1.RawIptables{
   337  		Name:      iptable.GenerateName(pbChainDirection, networkchaos),
   338  		Direction: chainDirection,
   339  		IPSets:    []string{dstIpset.Name},
   340  		RawRuleSource: v1alpha1.RawRuleSource{
   341  			Source: m.Source,
   342  		},
   343  	})
   344  
   345  	return nil
   346  }
   347  
   348  func NewImpl(c client.Client, b *podnetworkchaosmanager.Builder, log logr.Logger) *Impl {
   349  	return &Impl{
   350  		Client:  c,
   351  		builder: b,
   352  		Log:     log.WithName("partition"),
   353  	}
   354  }
   355