...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/dnschaos/types.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/dnschaos

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package dnschaos
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"net"
    22  	"time"
    23  
    24  	"google.golang.org/grpc/credentials/insecure"
    25  
    26  	dnspb "github.com/chaos-mesh/k8s_dns_chaos/pb"
    27  	"github.com/go-logr/logr"
    28  	"github.com/pkg/errors"
    29  	"go.uber.org/fx"
    30  	"google.golang.org/grpc"
    31  	v1 "k8s.io/api/core/v1"
    32  	"k8s.io/apimachinery/pkg/labels"
    33  	"sigs.k8s.io/controller-runtime/pkg/client"
    34  
    35  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    36  	impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    37  	"github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
    38  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    39  	"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
    40  )
    41  
    42  var _ impltypes.ChaosImpl = (*Impl)(nil)
    43  
    44  type Impl struct {
    45  	client.Client
    46  	Log logr.Logger
    47  
    48  	decoder *utils.ContainerRecordDecoder
    49  }
    50  
    51  func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
    52  	decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
    53  	if decodedContainer.PbClient != nil {
    54  		defer decodedContainer.PbClient.Close()
    55  	}
    56  	if err != nil {
    57  		return v1alpha1.NotInjected, err
    58  	}
    59  
    60  	service, err := impl.getService(ctx, config.ControllerCfg.Namespace, config.ControllerCfg.DNSServiceName)
    61  	if err != nil {
    62  		impl.Log.Error(err, "fail to get dns service")
    63  		return v1alpha1.NotInjected, err
    64  	}
    65  
    66  	dnsPods, err := impl.getPodsFromSelector(ctx, config.ControllerCfg.Namespace, service.Spec.Selector)
    67  	if err != nil {
    68  		impl.Log.Error(err, "fail to get pods from selector")
    69  		return v1alpha1.NotInjected, err
    70  	}
    71  
    72  	dnschaos := obj.(*v1alpha1.DNSChaos)
    73  	for _, pod := range dnsPods {
    74  		err = impl.setDNSServerRules(pod.Status.PodIP, config.ControllerCfg.DNSServicePort, dnschaos.Name, decodedContainer.Pod, dnschaos.Spec.Action, dnschaos.Spec.DomainNamePatterns)
    75  		if err != nil {
    76  			impl.Log.Error(err, "fail to set DNS server rules")
    77  			return v1alpha1.NotInjected, err
    78  		}
    79  		impl.Log.Info("Apply DNS chaos to DNS pod", "ip", service.Spec.ClusterIP)
    80  	}
    81  
    82  	_, err = decodedContainer.PbClient.SetDNSServer(ctx, &pb.SetDNSServerRequest{
    83  		ContainerId: decodedContainer.ContainerId,
    84  		DnsServer:   service.Spec.ClusterIP,
    85  		Enable:      true,
    86  		EnterNS:     true,
    87  	})
    88  	if err != nil {
    89  		impl.Log.Error(err, "set dns server")
    90  		return v1alpha1.NotInjected, err
    91  	}
    92  
    93  	return v1alpha1.Injected, nil
    94  }
    95  
    96  func (impl *Impl) setDNSServerRules(dnsServerIP string, port int, name string, pod *v1.Pod, action v1alpha1.DNSChaosAction, patterns []string) error {
    97  	impl.Log.Info("setDNSServerRules", "name", name)
    98  
    99  	pbPods := make([]*dnspb.Pod, 1)
   100  	pbPods[0] = &dnspb.Pod{
   101  		Name:      pod.Name,
   102  		Namespace: pod.Namespace,
   103  	}
   104  
   105  	conn, err := grpc.Dial(net.JoinHostPort(dnsServerIP, fmt.Sprintf("%d", port)), grpc.WithTransportCredentials(insecure.NewCredentials()))
   106  	if err != nil {
   107  		return err
   108  	}
   109  	defer conn.Close()
   110  
   111  	c := dnspb.NewDNSClient(conn)
   112  	request := &dnspb.SetDNSChaosRequest{
   113  		Name:     name,
   114  		Action:   string(action),
   115  		Pods:     pbPods,
   116  		Patterns: patterns,
   117  	}
   118  
   119  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   120  	defer cancel()
   121  	response, err := c.SetDNSChaos(ctx, request)
   122  	if err != nil {
   123  		return err
   124  	}
   125  
   126  	if !response.Result {
   127  		return errors.Errorf("set dns chaos to dns server error: %s", response.Msg)
   128  	}
   129  
   130  	return nil
   131  }
   132  
   133  func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
   134  	decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
   135  	if decodedContainer.PbClient != nil {
   136  		defer decodedContainer.PbClient.Close()
   137  	}
   138  	if err != nil {
   139  		if errors.Is(err, utils.ErrContainerNotFound) {
   140  			// pretend the disappeared container has been recovered
   141  			return v1alpha1.NotInjected, nil
   142  		}
   143  		return v1alpha1.Injected, err
   144  	}
   145  
   146  	dnschaos := obj.(*v1alpha1.DNSChaos)
   147  
   148  	// get dns server's ip used for chaos
   149  	service, err := impl.getService(ctx, config.ControllerCfg.Namespace, config.ControllerCfg.DNSServiceName)
   150  	if err != nil {
   151  		impl.Log.Error(err, "fail to get dns service")
   152  		return v1alpha1.Injected, err
   153  	}
   154  
   155  	dnsPods, err := impl.getPodsFromSelector(ctx, config.ControllerCfg.Namespace, service.Spec.Selector)
   156  	if err != nil {
   157  		impl.Log.Error(err, "fail to get pods from selector")
   158  		return v1alpha1.NotInjected, err
   159  	}
   160  
   161  	for _, pod := range dnsPods {
   162  		err = impl.cancelDNSServerRules(pod.Status.PodIP, config.ControllerCfg.DNSServicePort, dnschaos.Name)
   163  		if err != nil {
   164  			impl.Log.Error(err, "fail to cancelDNSServerRules")
   165  			return v1alpha1.Injected, err
   166  		}
   167  		impl.Log.Info("Cancel DNS chaos to DNS pod", "ip", service.Spec.ClusterIP)
   168  	}
   169  
   170  	_, err = decodedContainer.PbClient.SetDNSServer(ctx, &pb.SetDNSServerRequest{
   171  		ContainerId: decodedContainer.ContainerId,
   172  		Enable:      false,
   173  		EnterNS:     true,
   174  	})
   175  	if err != nil {
   176  		impl.Log.Error(err, "recover pod for DNS chaos")
   177  		return v1alpha1.Injected, err
   178  	}
   179  
   180  	return v1alpha1.NotInjected, err
   181  }
   182  
   183  func (impl *Impl) cancelDNSServerRules(dnsServerIP string, port int, name string) error {
   184  	conn, err := grpc.Dial(net.JoinHostPort(dnsServerIP, fmt.Sprintf("%d", port)), grpc.WithTransportCredentials(insecure.NewCredentials()))
   185  	if err != nil {
   186  		return err
   187  	}
   188  	defer conn.Close()
   189  
   190  	c := dnspb.NewDNSClient(conn)
   191  	request := &dnspb.CancelDNSChaosRequest{
   192  		Name: name,
   193  	}
   194  
   195  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   196  	defer cancel()
   197  	response, err := c.CancelDNSChaos(ctx, request)
   198  	if err != nil {
   199  		return err
   200  	}
   201  
   202  	if !response.Result {
   203  		return errors.Errorf("set dns chaos to dns server error %s", response.Msg)
   204  	}
   205  
   206  	return nil
   207  }
   208  
   209  // getService get k8s service by service name
   210  func (impl *Impl) getService(ctx context.Context, namespace string, serviceName string) (*v1.Service, error) {
   211  	service := &v1.Service{}
   212  	err := impl.Client.Get(ctx, client.ObjectKey{
   213  		Namespace: namespace,
   214  		Name:      serviceName,
   215  	}, service)
   216  	if err != nil {
   217  		return nil, err
   218  	}
   219  
   220  	return service, nil
   221  }
   222  
   223  // getPodsFromSelector returns the pods assiocated to a given service
   224  func (impl *Impl) getPodsFromSelector(ctx context.Context, namespace string, labelSelector map[string]string) ([]v1.Pod, error) {
   225  	lSelector := labels.SelectorFromSet(labelSelector)
   226  	listOptions := &client.ListOptions{
   227  		Namespace:     namespace,
   228  		LabelSelector: lSelector,
   229  	}
   230  	podsList := &v1.PodList{}
   231  	err := impl.Client.List(ctx, podsList, listOptions)
   232  	if err != nil {
   233  		return nil, err
   234  	}
   235  
   236  	return podsList.Items, nil
   237  }
   238  
   239  func NewImpl(c client.Client, log logr.Logger, decoder *utils.ContainerRecordDecoder) *impltypes.ChaosImplPair {
   240  	return &impltypes.ChaosImplPair{
   241  		Name:   "dnschaos",
   242  		Object: &v1alpha1.DNSChaos{},
   243  		Impl: &Impl{
   244  			Client:  c,
   245  			Log:     log.WithName("dnschaos"),
   246  			decoder: decoder,
   247  		},
   248  	}
   249  }
   250  
   251  var Module = fx.Provide(
   252  	fx.Annotated{
   253  		Group:  "impl",
   254  		Target: NewImpl,
   255  	},
   256  )
   257