...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/podhttpchaos/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/podhttpchaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package podhttpchaos
    15  
    16  import (
    17  	"context"
    18  	"encoding/json"
    19  	"fmt"
    20  	"net/http"
    21  
    22  	"github.com/go-logr/logr"
    23  	v1 "k8s.io/api/core/v1"
    24  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	"k8s.io/client-go/tools/record"
    27  	"k8s.io/client-go/util/retry"
    28  	ctrl "sigs.k8s.io/controller-runtime"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  
    31  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
    33  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    34  )
    35  
    36  // Reconciler applys podhttpchaos
    37  type Reconciler struct {
    38  	client.Client
    39  
    40  	Recorder                 record.EventRecorder
    41  	Log                      logr.Logger
    42  	ChaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
    43  }
    44  
    45  func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    46  	ctx := context.TODO()
    47  
    48  	obj := &v1alpha1.PodHttpChaos{}
    49  
    50  	if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil {
    51  		if apierrors.IsNotFound(err) {
    52  			r.Log.Info("chaos not found")
    53  		} else {
    54  			// TODO: handle this error
    55  			r.Log.Error(err, "unable to get chaos")
    56  		}
    57  		return ctrl.Result{}, nil
    58  	}
    59  
    60  	r.Log.Info("updating http chaos", "pod", obj.Namespace+"/"+obj.Name, "spec", obj.Spec)
    61  
    62  	pod := &v1.Pod{}
    63  
    64  	err := r.Client.Get(ctx, types.NamespacedName{
    65  		Name:      obj.Name,
    66  		Namespace: obj.Namespace,
    67  	}, pod)
    68  	if err != nil {
    69  		r.Log.Error(err, "fail to find pod")
    70  		return ctrl.Result{}, nil
    71  	}
    72  
    73  	observedGeneration := obj.ObjectMeta.Generation
    74  	pid := obj.Status.Pid
    75  	startTime := obj.Status.StartTime
    76  
    77  	defer func() {
    78  		var failedMessage string
    79  		if err != nil {
    80  			failedMessage = err.Error()
    81  		}
    82  
    83  		updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
    84  			obj := &v1alpha1.PodHttpChaos{}
    85  
    86  			if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
    87  				r.Log.Error(err, "unable to get chaos")
    88  				return err
    89  			}
    90  
    91  			obj.Status.FailedMessage = failedMessage
    92  			obj.Status.ObservedGeneration = observedGeneration
    93  			obj.Status.Pid = pid
    94  			obj.Status.StartTime = startTime
    95  
    96  			return r.Client.Status().Update(context.TODO(), obj)
    97  		})
    98  
    99  		if updateError != nil {
   100  			r.Log.Error(updateError, "fail to update")
   101  			r.Recorder.Eventf(obj, "Normal", "Failed", "Failed to update status: %s", updateError.Error())
   102  		}
   103  	}()
   104  
   105  	pbClient, err := r.ChaosDaemonClientBuilder.Build(ctx, pod)
   106  	if err != nil {
   107  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   108  		return ctrl.Result{Requeue: true}, nil
   109  	}
   110  	defer pbClient.Close()
   111  
   112  	if len(pod.Status.ContainerStatuses) == 0 {
   113  		err = fmt.Errorf("%s %s can't get the state of container", pod.Namespace, pod.Name)
   114  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   115  		return ctrl.Result{}, nil
   116  	}
   117  
   118  	containerID := pod.Status.ContainerStatuses[0].ContainerID
   119  
   120  	rules := make([]v1alpha1.PodHttpChaosBaseRule, 0)
   121  	proxyPortsMap := make(map[uint32]bool)
   122  	proxyPorts := make([]uint32, 0)
   123  
   124  	for _, rule := range obj.Spec.Rules {
   125  		proxyPortsMap[uint32(rule.Port)] = true
   126  		rules = append(rules, rule.PodHttpChaosBaseRule)
   127  	}
   128  
   129  	for port := range proxyPortsMap {
   130  		proxyPorts = append(proxyPorts, port)
   131  	}
   132  
   133  	input, err := json.Marshal(rules)
   134  	if err != nil {
   135  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   136  		return ctrl.Result{}, nil
   137  	}
   138  
   139  	r.Log.Info("input with", "rules", string(input))
   140  
   141  	res, err := pbClient.ApplyHttpChaos(ctx, &pb.ApplyHttpChaosRequest{
   142  		Rules:       string(input),
   143  		ProxyPorts:  proxyPorts,
   144  		ContainerId: containerID,
   145  
   146  		Instance:  obj.Status.Pid,
   147  		StartTime: obj.Status.StartTime,
   148  		EnterNS:   true,
   149  	})
   150  	if err != nil {
   151  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   152  		return ctrl.Result{Requeue: true}, nil
   153  	}
   154  
   155  	if res.StatusCode != http.StatusOK {
   156  		err = fmt.Errorf("status(%d), apply fail: %s", res.StatusCode, res.Error)
   157  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   158  		return ctrl.Result{Requeue: true}, nil
   159  	}
   160  
   161  	pid = res.Instance
   162  	startTime = res.StartTime
   163  
   164  	return ctrl.Result{}, nil
   165  }
   166