...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/kernelchaos/types.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/kernelchaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package kernelchaos
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  
    22  	"github.com/go-logr/logr"
    23  	"github.com/pkg/errors"
    24  	"go.uber.org/fx"
    25  	"google.golang.org/grpc"
    26  	v1 "k8s.io/api/core/v1"
    27  	k8sError "k8s.io/apimachinery/pkg/api/errors"
    28  	"k8s.io/apimachinery/pkg/types"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  
    31  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    32  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
    34  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
    36  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
    37  	pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    38  	pb_kernel "github.com/chaos-mesh/chaos-mesh/pkg/chaoskernel/pb"
    39  	grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
    40  )
    41  
    42  var _ impltypes.ChaosImpl = (*Impl)(nil)
    43  
    44  type Impl struct {
    45  	client.Client
    46  	Log logr.Logger
    47  
    48  	chaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
    49  }
    50  
    51  // Apply applies KernelChaos
    52  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    53  	kernelChaos := obj.(*v1alpha1.KernelChaos)
    54  	record := records[index]
    55  
    56  	log := impl.Log.WithValues("chaos", kernelChaos, "record", record)
    57  	podId, containerID, err := controller.ParseNamespacedNameContainer(record.Id)
    58  	if err != nil {
    59  		return v1alpha1.NotInjected, err
    60  	}
    61  	var pod v1.Pod
    62  	err = impl.Client.Get(ctx, podId, &pod)
    63  	if err != nil {
    64  		log.Error(err, "fail to get pod by record")
    65  		// TODO: handle this error
    66  		if k8sError.IsNotFound(err) {
    67  			return v1alpha1.NotInjected, nil
    68  		}
    69  		return v1alpha1.NotInjected, err
    70  	}
    71  
    72  	log = log.WithValues("pod", pod)
    73  
    74  	if err = impl.applyPod(ctx, &pod, kernelChaos, containerID); err != nil {
    75  		log.Error(err, "failed to apply chaos on pod")
    76  		return v1alpha1.NotInjected, err
    77  	}
    78  
    79  	return v1alpha1.Injected, nil
    80  }
    81  
    82  // Recover means the reconciler recovers the chaos action
    83  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    84  	kernelChaos := obj.(*v1alpha1.KernelChaos)
    85  	record := records[index]
    86  
    87  	log := impl.Log.WithValues("chaos", kernelChaos, "record", record)
    88  	podId, containerID, err := controller.ParseNamespacedNameContainer(record.Id)
    89  	if err != nil {
    90  		errorInfo := fmt.Sprintf("kernelChaos recover error, record ID is %s", record.Id)
    91  		log.Error(err, errorInfo)
    92  		// This error is not expected to exist
    93  		return v1alpha1.Injected, err
    94  	}
    95  	var pod v1.Pod
    96  	err = impl.Client.Get(ctx, podId, &pod)
    97  	if err != nil {
    98  		log.Error(err, "fail to get pod by record")
    99  		// TODO: handle this error
   100  		if k8sError.IsNotFound(err) {
   101  			return v1alpha1.NotInjected, nil
   102  		}
   103  		return v1alpha1.Injected, err
   104  	}
   105  
   106  	log = log.WithValues("pod", pod)
   107  
   108  	if err = impl.recoverPod(ctx, &pod, kernelChaos, containerID); err != nil {
   109  		log.Error(err, "failed to recover chaos on pod")
   110  		return v1alpha1.Injected, err
   111  	}
   112  
   113  	return v1alpha1.NotInjected, nil
   114  }
   115  
   116  func (impl *Impl) recoverPod(ctx context.Context, pod *v1.Pod, chaos *v1alpha1.KernelChaos, containerID string) error {
   117  	impl.Log.Info("try to recover pod", "namespace", pod.Namespace, "name", pod.Name)
   118  
   119  	pbClient, err := impl.chaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
   120  		Namespace: chaos.Namespace,
   121  		Name:      chaos.Name,
   122  	})
   123  	if err != nil {
   124  		return err
   125  	}
   126  	defer pbClient.Close()
   127  
   128  	if len(pod.Status.ContainerStatuses) == 0 {
   129  		err = errors.Wrapf(utils.ErrContainerNotFound, "pod %s/%s has empty container status", pod.Namespace, pod.Name)
   130  		return err
   131  	}
   132  
   133  	containerResponse, err := pbClient.ContainerGetPid(ctx, &pb.ContainerRequest{
   134  		Action: &pb.ContainerAction{
   135  			Action: pb.ContainerAction_GETPID,
   136  		},
   137  		ContainerId: containerID,
   138  	})
   139  
   140  	if err != nil {
   141  		impl.Log.Error(err, "Get container pid error", "namespace", pod.Namespace, "name", pod.Name)
   142  		return err
   143  	}
   144  
   145  	impl.Log.Info("Get container pid", "namespace", pod.Namespace, "name", pod.Name)
   146  	conn, err := impl.CreateBPFKIConnection(ctx, impl.Client, pod)
   147  	if err != nil {
   148  		return err
   149  	}
   150  	defer conn.Close()
   151  
   152  	var callchain []*pb_kernel.FailKernRequestFrame
   153  	for _, frame := range chaos.Spec.FailKernRequest.Callchain {
   154  		callchain = append(callchain, &pb_kernel.FailKernRequestFrame{
   155  			Funcname:   frame.Funcname,
   156  			Parameters: frame.Parameters,
   157  			Predicate:  frame.Predicate,
   158  		})
   159  	}
   160  
   161  	bpfClient := pb_kernel.NewBPFKIServiceClient(conn)
   162  	_, err = bpfClient.RecoverMMOrBIO(ctx, &pb_kernel.FailKernRequest{
   163  		Pid:       containerResponse.Pid,
   164  		Callchain: callchain,
   165  	})
   166  
   167  	return err
   168  }
   169  
   170  func (impl *Impl) applyPod(ctx context.Context, pod *v1.Pod, chaos *v1alpha1.KernelChaos, containerID string) error {
   171  	impl.Log.Info("Try to inject kernel on pod", "namespace", pod.Namespace, "name", pod.Name)
   172  
   173  	pbClient, err := impl.chaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
   174  		Namespace: chaos.Namespace,
   175  		Name:      chaos.Name,
   176  	})
   177  	if err != nil {
   178  		return err
   179  	}
   180  	defer pbClient.Close()
   181  
   182  	if len(pod.Status.ContainerStatuses) == 0 {
   183  		err = errors.Wrapf(utils.ErrContainerNotFound, "pod %s/%s has empty container status", pod.Namespace, pod.Name)
   184  		return err
   185  	}
   186  
   187  	containerResponse, err := pbClient.ContainerGetPid(ctx, &pb.ContainerRequest{
   188  		Action: &pb.ContainerAction{
   189  			Action: pb.ContainerAction_GETPID,
   190  		},
   191  		ContainerId: containerID,
   192  	})
   193  	if err != nil {
   194  		impl.Log.Error(err, "Get container pid error", "namespace", pod.Namespace, "name", pod.Name)
   195  		return err
   196  	}
   197  
   198  	impl.Log.Info("Get container pid", "namespace", pod.Namespace, "name", pod.Name)
   199  	conn, err := impl.CreateBPFKIConnection(ctx, impl.Client, pod)
   200  	if err != nil {
   201  		return err
   202  	}
   203  	defer conn.Close()
   204  
   205  	var callchain []*pb_kernel.FailKernRequestFrame
   206  	for _, frame := range chaos.Spec.FailKernRequest.Callchain {
   207  		callchain = append(callchain, &pb_kernel.FailKernRequestFrame{
   208  			Funcname:   frame.Funcname,
   209  			Parameters: frame.Parameters,
   210  			Predicate:  frame.Predicate,
   211  		})
   212  	}
   213  
   214  	bpfClient := pb_kernel.NewBPFKIServiceClient(conn)
   215  	_, err = bpfClient.FailMMOrBIO(ctx, &pb_kernel.FailKernRequest{
   216  		Pid:         containerResponse.Pid,
   217  		Ftype:       pb_kernel.FailKernRequest_FAILTYPE(chaos.Spec.FailKernRequest.FailType),
   218  		Headers:     chaos.Spec.FailKernRequest.Headers,
   219  		Callchain:   callchain,
   220  		Probability: float32(chaos.Spec.FailKernRequest.Probability) / 100,
   221  		Times:       chaos.Spec.FailKernRequest.Times,
   222  	})
   223  
   224  	return err
   225  }
   226  
   227  // CreateBPFKIConnection create a grpc connection with bpfki
   228  func (impl *Impl) CreateBPFKIConnection(ctx context.Context, c client.Client, pod *v1.Pod) (*grpc.ClientConn, error) {
   229  	daemonIP, err := impl.chaosDaemonClientBuilder.FindDaemonIP(ctx, pod)
   230  	if err != nil {
   231  		return nil, err
   232  	}
   233  	builder := grpcUtils.Builder(daemonIP, config.ControllerCfg.BPFKIPort).
   234  		WithDefaultTimeout().
   235  		Insecure()
   236  	return builder.Build()
   237  }
   238  
   239  func NewImpl(c client.Client, log logr.Logger, builder *chaosdaemon.ChaosDaemonClientBuilder) *impltypes.ChaosImplPair {
   240  	return &impltypes.ChaosImplPair{
   241  		Name:   "kernelchaos",
   242  		Object: &v1alpha1.KernelChaos{},
   243  		Impl: &Impl{
   244  			Client:                   c,
   245  			Log:                      log.WithName("kernelchaos"),
   246  			chaosDaemonClientBuilder: builder,
   247  		},
   248  		ObjectList: &v1alpha1.KernelChaosList{},
   249  	}
   250  }
   251  
   252  var Module = fx.Provide(
   253  	fx.Annotated{
   254  		Group:  "impl",
   255  		Target: NewImpl,
   256  	},
   257  )
   258