...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package podnetworkchaos
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  
    22  	"github.com/go-logr/logr"
    23  	"github.com/pkg/errors"
    24  	corev1 "k8s.io/api/core/v1"
    25  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    26  	"k8s.io/apimachinery/pkg/types"
    27  	"k8s.io/client-go/util/retry"
    28  	ctrl "sigs.k8s.io/controller-runtime"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  
    31  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/ipset"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/iptable"
    34  	tcpkg "github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/tc"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
    36  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    37  	chaosdaemonclient "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/client"
    38  	pbutils "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/netem"
    39  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    40  	"github.com/chaos-mesh/chaos-mesh/pkg/netem"
    41  )
    42  
    43  const (
    44  	invalidNetemSpecMsg = "invalid spec for netem action, at least one is required from delay, loss, duplicate, corrupt"
    45  )
    46  
    47  // Reconciler applys podnetworkchaos
    48  type Reconciler struct {
    49  	client.Client
    50  	Recorder recorder.ChaosRecorder
    51  
    52  	Log                      logr.Logger
    53  	AllowHostNetworkTesting  bool
    54  	ChaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
    55  }
    56  
    57  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    58  	obj := &v1alpha1.PodNetworkChaos{}
    59  
    60  	if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil {
    61  		if apierrors.IsNotFound(err) {
    62  			r.Log.Info("chaos not found")
    63  		} else {
    64  			// TODO: handle this error
    65  			r.Log.Error(err, "unable to get chaos")
    66  		}
    67  		return ctrl.Result{}, nil
    68  	}
    69  
    70  	if obj.ObjectMeta.Generation <= obj.Status.ObservedGeneration && obj.Status.FailedMessage == "" {
    71  		r.Log.Info("the target pod has been up to date", "pod", obj.Namespace+"/"+obj.Name)
    72  		return ctrl.Result{}, nil
    73  	}
    74  
    75  	r.Log.Info("updating podnetworkchaos", "pod", obj.Namespace+"/"+obj.Name, "spec", obj.Spec)
    76  
    77  	pod := &corev1.Pod{}
    78  
    79  	err := r.Client.Get(ctx, types.NamespacedName{
    80  		Name:      obj.Name,
    81  		Namespace: obj.Namespace,
    82  	}, pod)
    83  	if err != nil {
    84  		r.Log.Error(err, "fail to find pod")
    85  		return ctrl.Result{}, nil
    86  	}
    87  
    88  	failedMessage := ""
    89  	observedGeneration := obj.ObjectMeta.Generation
    90  	defer func() {
    91  		if err != nil {
    92  			failedMessage = err.Error()
    93  		}
    94  
    95  		updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
    96  			obj := &v1alpha1.PodNetworkChaos{}
    97  
    98  			if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
    99  				r.Log.Error(err, "unable to get chaos")
   100  				return err
   101  			}
   102  
   103  			obj.Status.FailedMessage = failedMessage
   104  			obj.Status.ObservedGeneration = observedGeneration
   105  
   106  			return r.Client.Status().Update(context.TODO(), obj)
   107  		})
   108  
   109  		if updateError != nil {
   110  			r.Log.Error(updateError, "fail to update")
   111  			r.Recorder.Event(obj, recorder.Failed{
   112  				Activity: "update status",
   113  				Err:      updateError.Error(),
   114  			})
   115  		}
   116  
   117  		r.Recorder.Event(obj, recorder.Updated{
   118  			Field: "ObservedGeneration and FailedMessage",
   119  		})
   120  	}()
   121  
   122  	if !r.AllowHostNetworkTesting {
   123  		if pod.Spec.HostNetwork {
   124  			err = errors.Errorf("It's dangerous to inject network chaos on a pod(%s/%s) with `hostNetwork`", pod.Namespace, pod.Name)
   125  			r.Log.Error(err, "fail to inject network chaos")
   126  			r.Recorder.Event(obj, recorder.Failed{
   127  				Activity: "inject network chaos",
   128  				Err:      err.Error(),
   129  			})
   130  			return ctrl.Result{}, nil
   131  		}
   132  	}
   133  
   134  	pbClient, err := r.ChaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
   135  		Name:      obj.Name,
   136  		Namespace: obj.Namespace,
   137  	})
   138  	if err != nil {
   139  		r.Recorder.Event(obj, recorder.Failed{
   140  			Activity: "create chaos daemon client",
   141  			Err:      err.Error(),
   142  		})
   143  		return ctrl.Result{Requeue: true}, nil
   144  	}
   145  	defer pbClient.Close()
   146  
   147  	err = r.SetIPSets(ctx, pod, obj, pbClient)
   148  	if err != nil {
   149  		r.Log.Error(err, "fail to set ipsets")
   150  		r.Recorder.Event(obj, recorder.Failed{
   151  			Activity: "set ipsets",
   152  			Err:      err.Error(),
   153  		})
   154  		return ctrl.Result{Requeue: true}, nil
   155  	}
   156  
   157  	err = r.SetIptables(ctx, pod, obj, pbClient)
   158  	if err != nil {
   159  		r.Log.Error(err, "fail to set iptables")
   160  		r.Recorder.Event(obj, recorder.Failed{
   161  			Activity: "set iptables",
   162  			Err:      err.Error(),
   163  		})
   164  		return ctrl.Result{Requeue: true}, nil
   165  	}
   166  
   167  	err = r.SetTcs(ctx, pod, obj, pbClient)
   168  	if err != nil {
   169  		r.Recorder.Event(obj, recorder.Failed{
   170  			Activity: "set tc",
   171  			Err:      err.Error(),
   172  		})
   173  		return ctrl.Result{Requeue: true}, nil
   174  	}
   175  
   176  	return ctrl.Result{}, nil
   177  }
   178  
   179  // SetIPSets sets ipset on pod
   180  func (r *Reconciler) SetIPSets(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
   181  	ipsets := []*pb.IPSet{}
   182  	for _, ipset := range chaos.Spec.IPSets {
   183  		ipsets = append(ipsets, &pb.IPSet{
   184  			Name:  ipset.Name,
   185  			Cidrs: ipset.Cidrs,
   186  		})
   187  	}
   188  	return ipset.FlushIPSets(ctx, chaosdaemonClient, pod, ipsets)
   189  }
   190  
   191  // SetIptables sets iptables on pod
   192  func (r *Reconciler) SetIptables(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
   193  	chains := []*pb.Chain{}
   194  	for _, chain := range chaos.Spec.Iptables {
   195  		var direction pb.Chain_Direction
   196  		if chain.Direction == v1alpha1.Input {
   197  			direction = pb.Chain_INPUT
   198  		} else if chain.Direction == v1alpha1.Output {
   199  			direction = pb.Chain_OUTPUT
   200  		} else {
   201  			err := fmt.Errorf("unknown direction %s", string(chain.Direction))
   202  			r.Log.Error(err, "unknown direction")
   203  			return err
   204  		}
   205  		chains = append(chains, &pb.Chain{
   206  			Name:      chain.Name,
   207  			Ipsets:    chain.IPSets,
   208  			Direction: direction,
   209  			Target:    "DROP",
   210  			Device:    chain.Device,
   211  		})
   212  	}
   213  	return iptable.SetIptablesChains(ctx, chaosdaemonClient, pod, chains)
   214  }
   215  
   216  // SetTcs sets traffic control related chaos on pod
   217  func (r *Reconciler) SetTcs(ctx context.Context, pod *corev1.Pod, chaos *v1alpha1.PodNetworkChaos, chaosdaemonClient chaosdaemonclient.ChaosDaemonClientInterface) error {
   218  	tcs := []*pb.Tc{}
   219  	for _, tc := range chaos.Spec.TrafficControls {
   220  		if tc.Type == v1alpha1.Bandwidth {
   221  			tbf, err := netem.FromBandwidth(tc.Bandwidth)
   222  			if err != nil {
   223  				return err
   224  			}
   225  			tcs = append(tcs, &pb.Tc{
   226  				Type:   pb.Tc_BANDWIDTH,
   227  				Tbf:    tbf,
   228  				Ipset:  tc.IPSet,
   229  				Device: tc.Device,
   230  			})
   231  		} else if tc.Type == v1alpha1.Netem {
   232  			netem, err := mergeNetem(tc.TcParameter)
   233  			if err != nil {
   234  				return err
   235  			}
   236  			tcs = append(tcs, &pb.Tc{
   237  				Type:   pb.Tc_NETEM,
   238  				Netem:  netem,
   239  				Ipset:  tc.IPSet,
   240  				Device: tc.Device,
   241  			})
   242  		} else {
   243  			return fmt.Errorf("unknown tc type")
   244  		}
   245  	}
   246  
   247  	r.Log.Info("setting tcs", "tcs", tcs)
   248  	return tcpkg.SetTcs(ctx, chaosdaemonClient, pod, tcs)
   249  }
   250  
   251  // NetemSpec defines the interface to convert to a Netem protobuf
   252  type NetemSpec interface {
   253  	ToNetem() (*pb.Netem, error)
   254  }
   255  
   256  // mergeNetem calls ToNetem on all non nil network emulation specs and merges them into one request.
   257  func mergeNetem(spec v1alpha1.TcParameter) (*pb.Netem, error) {
   258  	// NOTE: a cleaner way like
   259  	// emSpecs = []NetemSpec{spec.Delay, spec.Loss} won't work.
   260  	// Because in the for _, spec := range emSpecs loop,
   261  	// spec != nil would always be true.
   262  	// See https://stackoverflow.com/questions/13476349/check-for-nil-and-nil-interface-in-go
   263  	// And https://groups.google.com/forum/#!topic/golang-nuts/wnH302gBa4I/discussion
   264  	// > In short: If you never store (*T)(nil) in an interface, then you can reliably use comparison against nil
   265  	var emSpecs []*pb.Netem
   266  	if spec.Delay != nil {
   267  		em, err := netem.FromDelay(spec.Delay)
   268  		if err != nil {
   269  			return nil, err
   270  		}
   271  		emSpecs = append(emSpecs, em)
   272  	}
   273  	if spec.Loss != nil {
   274  		em, err := netem.FromLoss(spec.Loss)
   275  		if err != nil {
   276  			return nil, err
   277  		}
   278  		emSpecs = append(emSpecs, em)
   279  	}
   280  	if spec.Duplicate != nil {
   281  		em, err := netem.FromDuplicate(spec.Duplicate)
   282  		if err != nil {
   283  			return nil, err
   284  		}
   285  		emSpecs = append(emSpecs, em)
   286  	}
   287  	if spec.Corrupt != nil {
   288  		em, err := netem.FromCorrupt(spec.Corrupt)
   289  		if err != nil {
   290  			return nil, err
   291  		}
   292  		emSpecs = append(emSpecs, em)
   293  	}
   294  	if len(emSpecs) == 0 {
   295  		return nil, errors.New(invalidNetemSpecMsg)
   296  	}
   297  
   298  	merged := &pb.Netem{}
   299  	for _, em := range emSpecs {
   300  		merged = pbutils.MergeNetem(merged, em)
   301  	}
   302  	return merged, nil
   303  }
   304