
Source file src/github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    16  package podnetworkchaos
    18  import (
    19  	"context"
    21  	"github.com/go-logr/logr"
    22  	"github.com/pkg/errors"
    23  	corev1 "k8s.io/api/core/v1"
    24  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	"k8s.io/client-go/util/retry"
    27  	ctrl "sigs.k8s.io/controller-runtime"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/iptable"
    33  	tcpkg "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/tc"
    34  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    36  	chaosdaemonclient "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/client"
    37  	pbutils "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/netem"
    38  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    39  	"github.com/chaos-mesh/chaos-mesh/pkg/netem"
    40  )
    42  const (
    43  	invalidNetemSpecMsg = "invalid spec for netem action, at least one is required from delay, loss, duplicate, corrupt"
    44  )
    46  // Reconciler applys podnetworkchaos
    47  type Reconciler struct {
    48  	client.Client
    49  	Recorder recorder.ChaosRecorder
    51  	Log                      logr.Logger
    52  	AllowHostNetworkTesting  bool
    53  	ChaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
    54  }
    56  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    57  	obj := &v1alpha1.PodNetworkChaos{}
    59  	if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil {
    60  		if apierrors.IsNotFound(err) {
    61  			r.Log.Info("chaos not found")
    62  		} else {
    63  			// TODO: handle this error
    64  			r.Log.Error(err, "unable to get chaos")
    65  		}
    66  		return ctrl.Result{}, nil
    67  	}
    69  	if obj.ObjectMeta.Generation <= obj.Status.ObservedGeneration && obj.Status.FailedMessage == "" {
    70  		r.Log.Info("the target pod has been up to date", "pod", obj.Namespace+"/"+obj.Name)
    71  		return ctrl.Result{}, nil
    72  	}
    74  	r.Log.Info("updating podnetworkchaos", "pod", obj.Namespace+"/"+obj.Name, "spec", obj.Spec)
    76  	pod := &corev1.Pod{}
    78  	err := r.Client.Get(ctx, types.NamespacedName{
    79  		Name:      obj.Name,
    80  		Namespace: obj.Namespace,
    81  	}, pod)
    82  	if err != nil {
    83  		r.Log.Error(err, "fail to find pod")
    84  		return ctrl.Result{}, nil
    85  	}
    87  	failedMessage := ""
    88  	observedGeneration := obj.ObjectMeta.Generation
    89  	defer func() {
    90  		if err != nil {
    91  			failedMessage = err.Error()
    92  		}
    94  		updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
    95  			obj := &v1alpha1.PodNetworkChaos{}
    97  			if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
    98  				r.Log.Error(err, "unable to get chaos")
    99  				return err
   100  			}
   102  			obj.Status.FailedMessage = failedMessage
   103  			obj.Status.ObservedGeneration = observedGeneration
   105  			return r.Client.Status().Update(context.TODO(), obj)
   106  		})
   108  		if updateError != nil {
   109  			r.Log.Error(updateError, "fail to update")
   110  			r.Recorder.Event(obj, recorder.Failed{
   111  				Activity: "update status",
   112  				Err:      updateError.Error(),
   113  			})
   114  		}
   116  		r.Recorder.Event(obj, recorder.Updated{
   117  			Field: "ObservedGeneration and FailedMessage",
   118  		})
   119  	}()
   121  	if !r.AllowHostNetworkTesting {
   122  		if pod.Spec.HostNetwork {
   123  			err = errors.Errorf("It's dangerous to inject network chaos on a pod(%s/%s) with `hostNetwork`", pod.Namespace, pod.Name)
   124  			r.Log.Error(err, "fail to inject network chaos")
   125  			r.Recorder.Event(obj, recorder.Failed{
   126  				Activity: "inject network chaos",
   127  				Err:      err.Error(),
   128  			})
   129  			return ctrl.Result{}, nil
   130  		}
   131  	}
   133  	pbClient, err := r.ChaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
   134  		Name:      obj.Name,
   135  		Namespace: obj.Namespace,
   136  	})
   137  	if err != nil {
   138  		r.Recorder.Event(obj, recorder.Failed{
   139  			Activity: "create chaos daemon client",
   140  			Err:      err.Error(),
   141  		})
   142  		return ctrl.Result{Requeue: true}, nil
   143  	}
   144  	defer pbClient.Close()
   146  	err = r.SetIPSets(ctx, pod, obj, pbClient)
   147  	if err != nil {
   148  		err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
   149  		r.Log.Error(err, "fail to set ipsets")
   150  		r.Recorder.Event(obj, recorder.Failed{
   151  			Activity: "set ipsets",
   152  			Err:      err.Error(),
   153  		})
   154  		return ctrl.Result{Requeue: true}, nil
   155  	}
   157  	err = r.SetIptables(ctx, pod, obj, pbClient)
   158  	if err != nil {
   159  		err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
   160  		r.Log.Error(err, "fail to set iptables")
   161  		r.Recorder.Event(obj, recorder.Failed{
   162  			Activity: "set iptables",
   163  			Err:      err.Error(),
   164  		})
   165  		return ctrl.Result{Requeue: true}, nil
   166  	}
   168  	err = r.SetTcs(ctx, pod, obj, pbClient)
   169  	if err != nil {
   170  		err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
   171  		r.Recorder.Event(obj, recorder.Failed{
   172  			Activity: "set tc",
   173  			Err:      err.Error(),
   174  		})
   175  		return ctrl.Result{Requeue: true}, nil
   176  	}
   178  	return ctrl.Result{}, nil
   179  }
   181  // SetIPSets sets ipset on pod
   182  func (r *Reconciler) SetIPSets(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
   183  	ipsets := []*pb.IPSet{}
   184  	for _, ipset := range chaos.Spec.IPSets {
   185  		cidrAndPorts := []*pb.CidrAndPort{}
   186  		for _, cidrAndPort := range ipset.CidrAndPorts {
   187  			cidrAndPorts = append(cidrAndPorts, &pb.CidrAndPort{
   188  				Cidr: cidrAndPort.Cidr,
   189  				Port: uint32(cidrAndPort.Port),
   190  			})
   191  		}
   192  		ipsets = append(ipsets, &pb.IPSet{
   193  			Name:         ipset.Name,
   194  			Type:         string(ipset.IPSetType),
   195  			Cidrs:        ipset.Cidrs,
   196  			CidrAndPorts: cidrAndPorts,
   197  			SetNames:     ipset.SetNames,
   198  		})
   199  	}
   200  	return ipset.FlushIPSets(ctx, chaosdaemonClient, pod, ipsets)
   201  }
   203  // SetIptables sets iptables on pod
   204  func (r *Reconciler) SetIptables(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
   205  	chains := []*pb.Chain{}
   206  	for _, chain := range chaos.Spec.Iptables {
   207  		var direction pb.Chain_Direction
   208  		if chain.Direction == v1alpha1.Input {
   209  			direction = pb.Chain_INPUT
   210  		} else if chain.Direction == v1alpha1.Output {
   211  			direction = pb.Chain_OUTPUT
   212  		} else {
   213  			err := errors.Errorf("unknown direction %s", string(chain.Direction))
   214  			r.Log.Error(err, "unknown direction")
   215  			return err
   216  		}
   217  		chains = append(chains, &pb.Chain{
   218  			Name:      chain.Name,
   219  			Ipsets:    chain.IPSets,
   220  			Direction: direction,
   221  			Target:    "DROP",
   222  			Device:    chain.Device,
   223  		})
   224  	}
   225  	return iptable.SetIptablesChains(ctx, chaosdaemonClient, pod, chains)
   226  }
   228  // SetTcs sets traffic control related chaos on pod
   229  func (r *Reconciler) SetTcs(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
   230  	tcs := []*pb.Tc{}
   231  	for _, tc := range chaos.Spec.TrafficControls {
   232  		if tc.Type == v1alpha1.Bandwidth {
   233  			tbf, err := netem.FromBandwidth(tc.Bandwidth)
   234  			if err != nil {
   235  				return err
   236  			}
   237  			tcs = append(tcs, &pb.Tc{
   238  				Type:   pb.Tc_BANDWIDTH,
   239  				Tbf:    tbf,
   240  				Ipset:  tc.IPSet,
   241  				Device: tc.Device,
   242  			})
   243  		} else if tc.Type == v1alpha1.Netem {
   244  			netem, err := mergeNetem(tc.TcParameter)
   245  			if err != nil {
   246  				return err
   247  			}
   248  			tcs = append(tcs, &pb.Tc{
   249  				Type:   pb.Tc_NETEM,
   250  				Netem:  netem,
   251  				Ipset:  tc.IPSet,
   252  				Device: tc.Device,
   253  			})
   254  		} else {
   255  			return errors.New("unknown tc type")
   256  		}
   257  	}
   259  	r.Log.Info("setting tcs", "tcs", tcs)
   260  	return tcpkg.SetTcs(ctx, chaosdaemonClient, pod, tcs)
   261  }
   263  // NetemSpec defines the interface to convert to a Netem protobuf
   264  type NetemSpec interface {
   265  	ToNetem() (*pb.Netem, error)
   266  }
   268  // mergeNetem calls ToNetem on all non nil network emulation specs and merges them into one request.
   269  func mergeNetem(spec v1alpha1.TcParameter) (*pb.Netem, error) {
   270  	// NOTE: a cleaner way like
   271  	// emSpecs = []NetemSpec{spec.Delay, spec.Loss} won't work.
   272  	// Because in the for _, spec := range emSpecs loop,
   273  	// spec != nil would always be true.
   274  	// See https://stackoverflow.com/questions/13476349/check-for-nil-and-nil-interface-in-go
   275  	// And https://groups.google.com/forum/#!topic/golang-nuts/wnH302gBa4I/discussion
   276  	// > In short: If you never store (*T)(nil) in an interface, then you can reliably use comparison against nil
   277  	var emSpecs []*pb.Netem
   278  	if spec.Delay != nil {
   279  		em, err := netem.FromDelay(spec.Delay)
   280  		if err != nil {
   281  			return nil, err
   282  		}
   283  		emSpecs = append(emSpecs, em)
   284  	}
   285  	if spec.Loss != nil {
   286  		em, err := netem.FromLoss(spec.Loss)
   287  		if err != nil {
   288  			return nil, err
   289  		}
   290  		emSpecs = append(emSpecs, em)
   291  	}
   292  	if spec.Duplicate != nil {
   293  		em, err := netem.FromDuplicate(spec.Duplicate)
   294  		if err != nil {
   295  			return nil, err
   296  		}
   297  		emSpecs = append(emSpecs, em)
   298  	}
   299  	if spec.Corrupt != nil {
   300  		em, err := netem.FromCorrupt(spec.Corrupt)
   301  		if err != nil {
   302  			return nil, err
   303  		}
   304  		emSpecs = append(emSpecs, em)
   305  	}
   306  	if spec.Rate != nil {
   307  		em, err := netem.FromRate(spec.Rate)
   308  		if err != nil {
   309  			return nil, err
   310  		}
   311  		emSpecs = append(emSpecs, em)
   312  	}
   313  	if len(emSpecs) == 0 {
   314  		return nil, errors.New(invalidNetemSpecMsg)
   315  	}
   317  	merged := &pb.Netem{}
   318  	for _, em := range emSpecs {
   319  		merged = pbutils.MergeNetem(merged, em)
   320  	}
   321  	return merged, nil
   322  }