...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/podhttpchaos/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/podhttpchaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package podhttpchaos
    17  
    18  import (
    19  	"context"
    20  	"encoding/json"
    21  	"fmt"
    22  	"net/http"
    23  
    24  	"github.com/go-logr/logr"
    25  	"github.com/pkg/errors"
    26  	v1 "k8s.io/api/core/v1"
    27  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/types"
    30  	"k8s.io/client-go/tools/record"
    31  	"k8s.io/client-go/util/retry"
    32  	ctrl "sigs.k8s.io/controller-runtime"
    33  	"sigs.k8s.io/controller-runtime/pkg/client"
    34  
    35  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    36  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
    37  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
    38  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    39  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/tproxyconfig"
    40  )
    41  
    42  // Reconciler applys podhttpchaos
    43  type Reconciler struct {
    44  	client.Client
    45  
    46  	Recorder                 record.EventRecorder
    47  	Log                      logr.Logger
    48  	ChaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
    49  }
    50  
    51  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    52  	obj := &v1alpha1.PodHttpChaos{}
    53  
    54  	if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil {
    55  		if apierrors.IsNotFound(err) {
    56  			r.Log.Info("chaos not found")
    57  		} else {
    58  			// TODO: handle this error
    59  			r.Log.Error(err, "unable to get chaos")
    60  		}
    61  		return ctrl.Result{}, nil
    62  	}
    63  
    64  	if obj.ObjectMeta.Generation <= obj.Status.ObservedGeneration && obj.Status.FailedMessage == "" {
    65  		r.Log.Info("the target pod has been up to date", "pod", obj.Namespace+"/"+obj.Name)
    66  		return ctrl.Result{}, nil
    67  	}
    68  
    69  	r.Log.Info("updating http chaos", "pod", obj.Namespace+"/"+obj.Name, "spec", obj.Spec)
    70  
    71  	pod := &v1.Pod{}
    72  
    73  	err := r.Client.Get(ctx, types.NamespacedName{
    74  		Name:      obj.Name,
    75  		Namespace: obj.Namespace,
    76  	}, pod)
    77  	if err != nil {
    78  		err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
    79  		r.Log.Error(err, "fail to find pod")
    80  		return ctrl.Result{}, nil
    81  	}
    82  
    83  	observedGeneration := obj.ObjectMeta.Generation
    84  	pid := obj.Status.Pid
    85  	startTime := obj.Status.StartTime
    86  
    87  	defer func() {
    88  		var failedMessage string
    89  		if err != nil {
    90  			failedMessage = err.Error()
    91  		}
    92  
    93  		updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
    94  			obj := &v1alpha1.PodHttpChaos{}
    95  
    96  			if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
    97  				r.Log.Error(err, "unable to get chaos")
    98  				return err
    99  			}
   100  
   101  			obj.Status.FailedMessage = failedMessage
   102  			obj.Status.ObservedGeneration = observedGeneration
   103  			obj.Status.Pid = pid
   104  			obj.Status.StartTime = startTime
   105  
   106  			return r.Client.Status().Update(context.TODO(), obj)
   107  		})
   108  
   109  		if updateError != nil {
   110  			updateError = errors.Wrapf(updateError, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
   111  			r.Log.Error(updateError, "fail to update")
   112  			r.Recorder.Eventf(obj, "Normal", "Failed", "Failed to update status: %s", updateError.Error())
   113  		}
   114  	}()
   115  
   116  	pbClient, err := r.ChaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
   117  		Namespace: obj.Namespace,
   118  		Name:      obj.Name,
   119  	})
   120  	if err != nil {
   121  		err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
   122  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   123  		return ctrl.Result{Requeue: true}, nil
   124  	}
   125  	defer pbClient.Close()
   126  
   127  	if len(pod.Status.ContainerStatuses) == 0 {
   128  		err = errors.Wrapf(utils.ErrContainerNotFound, "pod %s/%s has empty container status", pod.Namespace, pod.Name)
   129  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   130  		return ctrl.Result{}, nil
   131  	}
   132  
   133  	containerID := pod.Status.ContainerStatuses[0].ContainerID
   134  
   135  	rules := make([]v1alpha1.PodHttpChaosBaseRule, 0)
   136  	proxyPortsMap := make(map[uint32]bool)
   137  
   138  	for _, rule := range obj.Spec.Rules {
   139  		proxyPortsMap[uint32(rule.Port)] = true
   140  		rules = append(rules, rule.PodHttpChaosBaseRule)
   141  	}
   142  
   143  	var proxyPorts []uint32
   144  	for port := range proxyPortsMap {
   145  		proxyPorts = append(proxyPorts, port)
   146  	}
   147  
   148  	inputRules, err := json.Marshal(rules)
   149  	if err != nil {
   150  		err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
   151  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   152  		return ctrl.Result{}, nil
   153  	}
   154  
   155  	inputTLS := []byte("")
   156  	if obj.Spec.TLS != nil {
   157  		tlsKeys := obj.Spec.TLS
   158  		secret := v1.Secret{
   159  			ObjectMeta: metav1.ObjectMeta{
   160  				Name:      tlsKeys.SecretName,
   161  				Namespace: tlsKeys.SecretNamespace,
   162  			},
   163  		}
   164  		if err := r.Client.Get(context.TODO(), req.NamespacedName, &secret); err != nil {
   165  			r.Log.Error(err, "unable to get secret")
   166  			return ctrl.Result{}, nil
   167  		}
   168  
   169  		cert, ok := secret.Data[tlsKeys.CertName]
   170  		if !ok {
   171  			err = errors.Wrapf(err, "get cert %s", tlsKeys.CertName)
   172  			r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   173  			return ctrl.Result{}, nil
   174  		}
   175  
   176  		key, ok := secret.Data[tlsKeys.KeyName]
   177  		if !ok {
   178  			err = errors.Wrapf(err, "get key %s", tlsKeys.KeyName)
   179  			r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   180  			return ctrl.Result{}, nil
   181  		}
   182  
   183  		var ca []byte
   184  		if tlsKeys.CAName != nil {
   185  			ca, ok = secret.Data[*tlsKeys.CAName]
   186  			if !ok {
   187  				err = errors.Wrapf(err, "get ca %s", *tlsKeys.CAName)
   188  				r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   189  				return ctrl.Result{}, nil
   190  			}
   191  		}
   192  
   193  		tlsConfig := tproxyconfig.TLSConfig{
   194  			CertFile: tproxyconfig.TLSConfigItem{
   195  				Type:  "Contents",
   196  				Value: cert,
   197  			},
   198  			KeyFile: tproxyconfig.TLSConfigItem{
   199  				Type:  "Contents",
   200  				Value: key,
   201  			},
   202  		}
   203  
   204  		if ca != nil {
   205  			tlsConfig.CAFile = &tproxyconfig.TLSConfigItem{
   206  				Type:  "Contents",
   207  				Value: ca,
   208  			}
   209  		}
   210  
   211  		inputTLS, err = json.Marshal(tlsConfig)
   212  		if err != nil {
   213  			err = errors.Wrapf(err, "apply for pod %s/%s", pod.Namespace, pod.Name)
   214  			r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   215  			return ctrl.Result{}, nil
   216  		}
   217  	}
   218  
   219  	r.Log.Info("input with", "rules", string(inputRules))
   220  
   221  	res, err := pbClient.ApplyHttpChaos(ctx, &pb.ApplyHttpChaosRequest{
   222  		Rules:       string(inputRules),
   223  		Tls:         string(inputTLS),
   224  		ProxyPorts:  proxyPorts,
   225  		ContainerId: containerID,
   226  
   227  		Instance:  obj.Status.Pid,
   228  		StartTime: obj.Status.StartTime,
   229  		EnterNS:   true,
   230  	})
   231  	if err != nil {
   232  		err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
   233  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   234  		return ctrl.Result{Requeue: true}, nil
   235  	}
   236  
   237  	if res.StatusCode != http.StatusOK {
   238  		err = errors.Wrapf(fmt.Errorf("%s", res.Error),
   239  			"failed to apply for pod %s/%s, status(%d)",
   240  			pod.Namespace, pod.Name, res.StatusCode)
   241  		r.Recorder.Event(obj, "Warning", "Failed", err.Error())
   242  		return ctrl.Result{Requeue: true}, nil
   243  	}
   244  
   245  	pid = res.Instance
   246  	startTime = res.StartTime
   247  
   248  	return ctrl.Result{}, nil
   249  }
   250