...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/dnschaos/types.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/dnschaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package dnschaos
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"net"
    22  	"time"
    23  
    24  	dnspb "github.com/chaos-mesh/k8s_dns_chaos/pb"
    25  	"github.com/go-logr/logr"
    26  	"github.com/pkg/errors"
    27  	"go.uber.org/fx"
    28  	"google.golang.org/grpc"
    29  	v1 "k8s.io/api/core/v1"
    30  	"k8s.io/apimachinery/pkg/labels"
    31  	"sigs.k8s.io/controller-runtime/pkg/client"
    32  
    33  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    34  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
    36  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    37  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    38  )
    39  
    40  var _ impltypes.ChaosImpl = (*Impl)(nil)
    41  
    42  type Impl struct {
    43  	client.Client
    44  	Log logr.Logger
    45  
    46  	decoder *utils.ContainerRecordDecoder
    47  }
    48  
    49  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    50  	decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
    51  	if decodedContainer.PbClient != nil {
    52  		defer decodedContainer.PbClient.Close()
    53  	}
    54  	if err != nil {
    55  		return v1alpha1.NotInjected, err
    56  	}
    57  
    58  	service, err := impl.getService(ctx, config.ControllerCfg.Namespace, config.ControllerCfg.DNSServiceName)
    59  	if err != nil {
    60  		impl.Log.Error(err, "fail to get dns service")
    61  		return v1alpha1.NotInjected, err
    62  	}
    63  
    64  	dnsPods, err := impl.getPodsFromSelector(ctx, config.ControllerCfg.Namespace, service.Spec.Selector)
    65  	if err != nil {
    66  		impl.Log.Error(err, "fail to get pods from selector")
    67  		return v1alpha1.NotInjected, err
    68  	}
    69  
    70  	dnschaos := obj.(*v1alpha1.DNSChaos)
    71  	for _, pod := range dnsPods {
    72  		err = impl.setDNSServerRules(pod.Status.PodIP, config.ControllerCfg.DNSServicePort, dnschaos.Name, decodedContainer.Pod, dnschaos.Spec.Action, dnschaos.Spec.DomainNamePatterns)
    73  		if err != nil {
    74  			impl.Log.Error(err, "fail to set DNS server rules")
    75  			return v1alpha1.NotInjected, err
    76  		}
    77  		impl.Log.Info("Apply DNS chaos to DNS pod", "ip", service.Spec.ClusterIP)
    78  	}
    79  
    80  	_, err = decodedContainer.PbClient.SetDNSServer(ctx, &pb.SetDNSServerRequest{
    81  		ContainerId: decodedContainer.ContainerId,
    82  		DnsServer:   service.Spec.ClusterIP,
    83  		Enable:      true,
    84  		EnterNS:     true,
    85  	})
    86  	if err != nil {
    87  		impl.Log.Error(err, "set dns server")
    88  		return v1alpha1.NotInjected, err
    89  	}
    90  
    91  	return v1alpha1.Injected, nil
    92  }
    93  
    94  func (impl *Impl) setDNSServerRules(dnsServerIP string, port int, name string, pod *v1.Pod, action v1alpha1.DNSChaosAction, patterns []string) error {
    95  	impl.Log.Info("setDNSServerRules", "name", name)
    96  
    97  	pbPods := make([]*dnspb.Pod, 1)
    98  	pbPods[0] = &dnspb.Pod{
    99  		Name:      pod.Name,
   100  		Namespace: pod.Namespace,
   101  	}
   102  
   103  	conn, err := grpc.Dial(net.JoinHostPort(dnsServerIP, fmt.Sprintf("%d", port)), grpc.WithInsecure())
   104  	if err != nil {
   105  		return err
   106  	}
   107  	defer conn.Close()
   108  
   109  	c := dnspb.NewDNSClient(conn)
   110  	request := &dnspb.SetDNSChaosRequest{
   111  		Name:     name,
   112  		Action:   string(action),
   113  		Pods:     pbPods,
   114  		Patterns: patterns,
   115  	}
   116  
   117  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   118  	defer cancel()
   119  	response, err := c.SetDNSChaos(ctx, request)
   120  	if err != nil {
   121  		return err
   122  	}
   123  
   124  	if !response.Result {
   125  		return errors.Errorf("set dns chaos to dns server error: %s", response.Msg)
   126  	}
   127  
   128  	return nil
   129  }
   130  
   131  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   132  	decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
   133  	if decodedContainer.PbClient != nil {
   134  		defer decodedContainer.PbClient.Close()
   135  	}
   136  	if err != nil {
   137  		if errors.Is(err, utils.ErrContainerNotFound) {
   138  			// pretend the disappeared container has been recovered
   139  			return v1alpha1.NotInjected, nil
   140  		}
   141  		return v1alpha1.Injected, err
   142  	}
   143  
   144  	dnschaos := obj.(*v1alpha1.DNSChaos)
   145  
   146  	// get dns server's ip used for chaos
   147  	service, err := impl.getService(ctx, config.ControllerCfg.Namespace, config.ControllerCfg.DNSServiceName)
   148  	if err != nil {
   149  		impl.Log.Error(err, "fail to get dns service")
   150  		return v1alpha1.Injected, err
   151  	}
   152  
   153  	dnsPods, err := impl.getPodsFromSelector(ctx, config.ControllerCfg.Namespace, service.Spec.Selector)
   154  	if err != nil {
   155  		impl.Log.Error(err, "fail to get pods from selector")
   156  		return v1alpha1.NotInjected, err
   157  	}
   158  
   159  	for _, pod := range dnsPods {
   160  		err = impl.cancelDNSServerRules(pod.Status.PodIP, config.ControllerCfg.DNSServicePort, dnschaos.Name)
   161  		if err != nil {
   162  			impl.Log.Error(err, "fail to cancelDNSServerRules")
   163  			return v1alpha1.Injected, err
   164  		}
   165  		impl.Log.Info("Cancel DNS chaos to DNS pod", "ip", service.Spec.ClusterIP)
   166  	}
   167  
   168  	_, err = decodedContainer.PbClient.SetDNSServer(ctx, &pb.SetDNSServerRequest{
   169  		ContainerId: decodedContainer.ContainerId,
   170  		Enable:      false,
   171  		EnterNS:     true,
   172  	})
   173  	if err != nil {
   174  		impl.Log.Error(err, "recover pod for DNS chaos")
   175  		return v1alpha1.Injected, err
   176  	}
   177  
   178  	return v1alpha1.NotInjected, err
   179  }
   180  
   181  func (impl *Impl) cancelDNSServerRules(dnsServerIP string, port int, name string) error {
   182  	conn, err := grpc.Dial(net.JoinHostPort(dnsServerIP, fmt.Sprintf("%d", port)), grpc.WithInsecure())
   183  	if err != nil {
   184  		return err
   185  	}
   186  	defer conn.Close()
   187  
   188  	c := dnspb.NewDNSClient(conn)
   189  	request := &dnspb.CancelDNSChaosRequest{
   190  		Name: name,
   191  	}
   192  
   193  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   194  	defer cancel()
   195  	response, err := c.CancelDNSChaos(ctx, request)
   196  	if err != nil {
   197  		return err
   198  	}
   199  
   200  	if !response.Result {
   201  		return errors.Errorf("set dns chaos to dns server error %s", response.Msg)
   202  	}
   203  
   204  	return nil
   205  }
   206  
   207  // getService get k8s service by service name
   208  func (impl *Impl) getService(ctx context.Context, namespace string, serviceName string) (*v1.Service, error) {
   209  	service := &v1.Service{}
   210  	err := impl.Client.Get(ctx, client.ObjectKey{
   211  		Namespace: namespace,
   212  		Name:      serviceName,
   213  	}, service)
   214  	if err != nil {
   215  		return nil, err
   216  	}
   217  
   218  	return service, nil
   219  }
   220  
   221  // getPodsFromSelector returns the pods assiocated to a given service
   222  func (impl *Impl) getPodsFromSelector(ctx context.Context, namespace string, labelSelector map[string]string) ([]v1.Pod, error) {
   223  	lSelector := labels.SelectorFromSet(labelSelector)
   224  	listOptions := &client.ListOptions{
   225  		Namespace:     namespace,
   226  		LabelSelector: lSelector,
   227  	}
   228  	podsList := &v1.PodList{}
   229  	err := impl.Client.List(ctx, podsList, listOptions)
   230  	if err != nil {
   231  		return nil, err
   232  	}
   233  
   234  	return podsList.Items, nil
   235  }
   236  
   237  func NewImpl(c client.Client, log logr.Logger, decoder *utils.ContainerRecordDecoder) *impltypes.ChaosImplPair {
   238  	return &impltypes.ChaosImplPair{
   239  		Name:   "dnschaos",
   240  		Object: &v1alpha1.DNSChaos{},
   241  		Impl: &Impl{
   242  			Client:  c,
   243  			Log:     log.WithName("dnschaos"),
   244  			decoder: decoder,
   245  		},
   246  	}
   247  }
   248  
   249  var Module = fx.Provide(
   250  	fx.Annotated{
   251  		Group:  "impl",
   252  		Target: NewImpl,
   253  	},
   254  )
   255