...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/podiochaos/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/podiochaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package podiochaos
    17  
    18  import (
    19  	"context"
    20  	"encoding/json"
    21  	"strings"
    22  
    23  	"github.com/go-logr/logr"
    24  	"github.com/pkg/errors"
    25  	v1 "k8s.io/api/core/v1"
    26  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	"k8s.io/client-go/tools/record"
    29  	"k8s.io/client-go/util/retry"
    30  	ctrl "sigs.k8s.io/controller-runtime"
    31  	"sigs.k8s.io/controller-runtime/pkg/client"
    32  
    33  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    34  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
    36  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    37  )
    38  
    39  // Reconciler applys podioworkchaos
    40  type Reconciler struct {
    41  	client.Client
    42  	Recorder record.EventRecorder
    43  
    44  	Log                      logr.Logger
    45  	ChaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
    46  }
    47  
    48  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    49  	obj := &v1alpha1.PodIOChaos{}
    50  
    51  	if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil {
    52  		if apierrors.IsNotFound(err) {
    53  			r.Log.Info("chaos not found")
    54  		} else {
    55  			// TODO: handle this error
    56  			r.Log.Error(err, "unable to get chaos")
    57  		}
    58  		return ctrl.Result{}, nil
    59  	}
    60  
    61  	if obj.ObjectMeta.Generation <= obj.Status.ObservedGeneration && obj.Status.FailedMessage == "" {
    62  		r.Log.Info("the target pod has been up to date", "pod", obj.Namespace+"/"+obj.Name)
    63  		return ctrl.Result{}, nil
    64  	}
    65  
    66  	r.Log.Info("updating io chaos", "pod", obj.Namespace+"/"+obj.Name, "spec", obj.Spec)
    67  
    68  	pod := &v1.Pod{}
    69  
    70  	err := r.Client.Get(ctx, types.NamespacedName{
    71  		Name:      obj.Name,
    72  		Namespace: obj.Namespace,
    73  	}, pod)
    74  	if err != nil {
    75  		r.Log.Error(err, "fail to find pod")
    76  		return ctrl.Result{}, nil
    77  	}
    78  
    79  	failedMessage := ""
    80  	observedGeneration := obj.ObjectMeta.Generation
    81  	pid := obj.Status.Pid
    82  	startTime := obj.Status.StartTime
    83  	defer func() {
    84  		if err != nil {
    85  			failedMessage = err.Error()
    86  		}
    87  
    88  		updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
    89  			obj := &v1alpha1.PodIOChaos{}
    90  
    91  			if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
    92  				r.Log.Error(err, "unable to get chaos")
    93  				return err
    94  			}
    95  
    96  			obj.Status.FailedMessage = failedMessage
    97  			obj.Status.ObservedGeneration = observedGeneration
    98  			obj.Status.Pid = pid
    99  			obj.Status.StartTime = startTime
   100  
   101  			return r.Client.Status().Update(context.TODO(), obj)
   102  		})
   103  
   104  		if updateError != nil {
   105  			r.Log.Error(updateError, "fail to update")
   106  			r.Recorder.Eventf(obj, "Normal", "Failed", "Failed to update status: %s", updateError.Error())
   107  		}
   108  	}()
   109  
   110  	pbClient, err := r.ChaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
   111  		Namespace: obj.Namespace,
   112  		Name:      obj.Name,
   113  	})
   114  	if err != nil {
   115  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   116  		return ctrl.Result{Requeue: true}, nil
   117  	}
   118  	defer pbClient.Close()
   119  
   120  	if len(pod.Status.ContainerStatuses) == 0 {
   121  		err = errors.Wrapf(utils.ErrContainerNotFound, "pod %s/%s has empty container status", pod.Namespace, pod.Name)
   122  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   123  		return ctrl.Result{}, nil
   124  	}
   125  
   126  	containerID := pod.Status.ContainerStatuses[0].ContainerID
   127  	if obj.Spec.Container != nil &&
   128  		len(strings.TrimSpace(*obj.Spec.Container)) != 0 {
   129  		containerID = ""
   130  		for _, container := range pod.Status.ContainerStatuses {
   131  			if container.Name == *obj.Spec.Container {
   132  				containerID = container.ContainerID
   133  				break
   134  			}
   135  		}
   136  		if len(containerID) == 0 {
   137  			err = errors.Errorf("cannot find container with name %s", *obj.Spec.Container)
   138  			r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   139  			return ctrl.Result{}, nil
   140  		}
   141  	}
   142  
   143  	actions, err := json.Marshal(obj.Spec.Actions)
   144  	if err != nil {
   145  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   146  		return ctrl.Result{Requeue: true}, nil
   147  	}
   148  	input := string(actions)
   149  	r.Log.Info("input with", "config", input)
   150  
   151  	res, err := pbClient.ApplyIOChaos(ctx, &pb.ApplyIOChaosRequest{
   152  		Actions:     input,
   153  		Volume:      obj.Spec.VolumeMountPath,
   154  		ContainerId: containerID,
   155  
   156  		Instance:  obj.Status.Pid,
   157  		StartTime: obj.Status.StartTime,
   158  		EnterNS:   true,
   159  	})
   160  	if err != nil {
   161  		err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
   162  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   163  		return ctrl.Result{Requeue: true}, nil
   164  	}
   165  
   166  	startTime = res.StartTime
   167  	pid = res.Instance
   168  
   169  	return ctrl.Result{}, nil
   170  }
   171