...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package podnetworkchaos
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  
    22  	"github.com/go-logr/logr"
    23  	"github.com/pkg/errors"
    24  	corev1 "k8s.io/api/core/v1"
    25  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    26  	"k8s.io/apimachinery/pkg/types"
    27  	"k8s.io/client-go/util/retry"
    28  	ctrl "sigs.k8s.io/controller-runtime"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  
    31  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/iptable"
    34  	tcpkg "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/tc"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
    36  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    37  	pbutils "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/netem"
    38  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    39  	"github.com/chaos-mesh/chaos-mesh/pkg/netem"
    40  )
    41  
    42  const (
    43  	invalidNetemSpecMsg = "invalid spec for netem action, at least one is required from delay, loss, duplicate, corrupt"
    44  )
    45  
    46  // Reconciler applys podnetworkchaos
    47  type Reconciler struct {
    48  	client.Client
    49  	Recorder recorder.ChaosRecorder
    50  
    51  	Log                      logr.Logger
    52  	AllowHostNetworkTesting  bool
    53  	ChaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
    54  }
    55  
    56  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    57  	obj := &v1alpha1.PodNetworkChaos{}
    58  
    59  	if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil {
    60  		if apierrors.IsNotFound(err) {
    61  			r.Log.Info("chaos not found")
    62  		} else {
    63  			// TODO: handle this error
    64  			r.Log.Error(err, "unable to get chaos")
    65  		}
    66  		return ctrl.Result{}, nil
    67  	}
    68  
    69  	if obj.ObjectMeta.Generation <= obj.Status.ObservedGeneration && obj.Status.FailedMessage == "" {
    70  		r.Log.Info("the target pod has been up to date", "pod", obj.Namespace+"/"+obj.Name)
    71  		return ctrl.Result{}, nil
    72  	}
    73  
    74  	r.Log.Info("updating podnetworkchaos", "pod", obj.Namespace+"/"+obj.Name, "spec", obj.Spec)
    75  
    76  	pod := &corev1.Pod{}
    77  
    78  	err := r.Client.Get(ctx, types.NamespacedName{
    79  		Name:      obj.Name,
    80  		Namespace: obj.Namespace,
    81  	}, pod)
    82  	if err != nil {
    83  		r.Log.Error(err, "fail to find pod")
    84  		return ctrl.Result{}, nil
    85  	}
    86  
    87  	failedMessage := ""
    88  	observedGeneration := obj.ObjectMeta.Generation
    89  	defer func() {
    90  		if err != nil {
    91  			failedMessage = err.Error()
    92  		}
    93  
    94  		updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
    95  			obj := &v1alpha1.PodNetworkChaos{}
    96  
    97  			if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
    98  				r.Log.Error(err, "unable to get chaos")
    99  				return err
   100  			}
   101  
   102  			obj.Status.FailedMessage = failedMessage
   103  			obj.Status.ObservedGeneration = observedGeneration
   104  
   105  			return r.Client.Status().Update(context.TODO(), obj)
   106  		})
   107  
   108  		if updateError != nil {
   109  			r.Log.Error(updateError, "fail to update")
   110  			r.Recorder.Event(obj, recorder.Failed{
   111  				Activity: "update status",
   112  				Err:      updateError.Error(),
   113  			})
   114  		}
   115  
   116  		r.Recorder.Event(obj, recorder.Updated{
   117  			Field: "ObservedGeneration and FailedMessage",
   118  		})
   119  	}()
   120  
   121  	if !r.AllowHostNetworkTesting {
   122  		if pod.Spec.HostNetwork {
   123  			err = errors.Errorf("It's dangerous to inject network chaos on a pod(%s/%s) with `hostNetwork`", pod.Namespace, pod.Name)
   124  			r.Log.Error(err, "fail to inject network chaos")
   125  			r.Recorder.Event(obj, recorder.Failed{
   126  				Activity: "inject network chaos",
   127  				Err:      err.Error(),
   128  			})
   129  			return ctrl.Result{}, nil
   130  		}
   131  	}
   132  
   133  	err = r.SetIPSets(ctx, pod, obj)
   134  	if err != nil {
   135  		r.Log.Error(err, "fail to set ipsets")
   136  		r.Recorder.Event(obj, recorder.Failed{
   137  			Activity: "set ipsets",
   138  			Err:      err.Error(),
   139  		})
   140  		return ctrl.Result{Requeue: true}, nil
   141  	}
   142  
   143  	err = r.SetIptables(ctx, pod, obj)
   144  	if err != nil {
   145  		r.Log.Error(err, "fail to set iptables")
   146  		r.Recorder.Event(obj, recorder.Failed{
   147  			Activity: "set iptables",
   148  			Err:      err.Error(),
   149  		})
   150  		return ctrl.Result{Requeue: true}, nil
   151  	}
   152  
   153  	err = r.SetTcs(ctx, pod, obj)
   154  	if err != nil {
   155  		r.Recorder.Event(obj, recorder.Failed{
   156  			Activity: "set tc",
   157  			Err:      err.Error(),
   158  		})
   159  		return ctrl.Result{Requeue: true}, nil
   160  	}
   161  
   162  	return ctrl.Result{}, nil
   163  }
   164  
   165  // SetIPSets sets ipset on pod
   166  func (r *Reconciler) SetIPSets(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos) error {
   167  	ipsets := []*pb.IPSet{}
   168  	for _, ipset := range chaos.Spec.IPSets {
   169  		ipsets = append(ipsets, &pb.IPSet{
   170  			Name:  ipset.Name,
   171  			Cidrs: ipset.Cidrs,
   172  		})
   173  	}
   174  	return ipset.FlushIPSets(ctx, r.ChaosDaemonClientBuilder, pod, ipsets)
   175  }
   176  
   177  // SetIptables sets iptables on pod
   178  func (r *Reconciler) SetIptables(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos) error {
   179  	chains := []*pb.Chain{}
   180  	for _, chain := range chaos.Spec.Iptables {
   181  		var direction pb.Chain_Direction
   182  		if chain.Direction == v1alpha1.Input {
   183  			direction = pb.Chain_INPUT
   184  		} else if chain.Direction == v1alpha1.Output {
   185  			direction = pb.Chain_OUTPUT
   186  		} else {
   187  			err := fmt.Errorf("unknown direction %s", string(chain.Direction))
   188  			r.Log.Error(err, "unknown direction")
   189  			return err
   190  		}
   191  		chains = append(chains, &pb.Chain{
   192  			Name:      chain.Name,
   193  			Ipsets:    chain.IPSets,
   194  			Direction: direction,
   195  			Target:    "DROP",
   196  			Device:    chain.Device,
   197  		})
   198  	}
   199  	return iptable.SetIptablesChains(ctx, r.ChaosDaemonClientBuilder, pod, chains)
   200  }
   201  
   202  // SetTcs sets traffic control related chaos on pod
   203  func (r *Reconciler) SetTcs(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos) error {
   204  	tcs := []*pb.Tc{}
   205  	for _, tc := range chaos.Spec.TrafficControls {
   206  		if tc.Type == v1alpha1.Bandwidth {
   207  			tbf, err := netem.FromBandwidth(tc.Bandwidth)
   208  			if err != nil {
   209  				return err
   210  			}
   211  			tcs = append(tcs, &pb.Tc{
   212  				Type:   pb.Tc_BANDWIDTH,
   213  				Tbf:    tbf,
   214  				Ipset:  tc.IPSet,
   215  				Device: tc.Device,
   216  			})
   217  		} else if tc.Type == v1alpha1.Netem {
   218  			netem, err := mergeNetem(tc.TcParameter)
   219  			if err != nil {
   220  				return err
   221  			}
   222  			tcs = append(tcs, &pb.Tc{
   223  				Type:   pb.Tc_NETEM,
   224  				Netem:  netem,
   225  				Ipset:  tc.IPSet,
   226  				Device: tc.Device,
   227  			})
   228  		} else {
   229  			return fmt.Errorf("unknown tc type")
   230  		}
   231  	}
   232  
   233  	r.Log.Info("setting tcs", "tcs", tcs)
   234  	return tcpkg.SetTcs(ctx, r.ChaosDaemonClientBuilder, pod, tcs)
   235  }
   236  
   237  // NetemSpec defines the interface to convert to a Netem protobuf
   238  type NetemSpec interface {
   239  	ToNetem() (*pb.Netem, error)
   240  }
   241  
   242  // mergeNetem calls ToNetem on all non nil network emulation specs and merges them into one request.
   243  func mergeNetem(spec v1alpha1.TcParameter) (*pb.Netem, error) {
   244  	// NOTE: a cleaner way like
   245  	// emSpecs = []NetemSpec{spec.Delay, spec.Loss} won't work.
   246  	// Because in the for _, spec := range emSpecs loop,
   247  	// spec != nil would always be true.
   248  	// See https://stackoverflow.com/questions/13476349/check-for-nil-and-nil-interface-in-go
   249  	// And https://groups.google.com/forum/#!topic/golang-nuts/wnH302gBa4I/discussion
   250  	// > In short: If you never store (*T)(nil) in an interface, then you can reliably use comparison against nil
   251  	var emSpecs []*pb.Netem
   252  	if spec.Delay != nil {
   253  		em, err := netem.FromDelay(spec.Delay)
   254  		if err != nil {
   255  			return nil, err
   256  		}
   257  		emSpecs = append(emSpecs, em)
   258  	}
   259  	if spec.Loss != nil {
   260  		em, err := netem.FromLoss(spec.Loss)
   261  		if err != nil {
   262  			return nil, err
   263  		}
   264  		emSpecs = append(emSpecs, em)
   265  	}
   266  	if spec.Duplicate != nil {
   267  		em, err := netem.FromDuplicate(spec.Duplicate)
   268  		if err != nil {
   269  			return nil, err
   270  		}
   271  		emSpecs = append(emSpecs, em)
   272  	}
   273  	if spec.Corrupt != nil {
   274  		em, err := netem.FromCorrupt(spec.Corrupt)
   275  		if err != nil {
   276  			return nil, err
   277  		}
   278  		emSpecs = append(emSpecs, em)
   279  	}
   280  	if len(emSpecs) == 0 {
   281  		return nil, errors.New(invalidNetemSpecMsg)
   282  	}
   283  
   284  	merged := &pb.Netem{}
   285  	for _, em := range emSpecs {
   286  		merged = pbutils.MergeNetem(merged, em)
   287  	}
   288  	return merged, nil
   289  }
   290