1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package dnschaos
17
18 import (
19 "context"
20 "fmt"
21 "net"
22 "time"
23
24 "google.golang.org/grpc/credentials/insecure"
25
26 dnspb "github.com/chaos-mesh/k8s_dns_chaos/pb"
27 "github.com/go-logr/logr"
28 "github.com/pkg/errors"
29 "go.uber.org/fx"
30 "google.golang.org/grpc"
31 v1 "k8s.io/api/core/v1"
32 "k8s.io/apimachinery/pkg/labels"
33 "sigs.k8s.io/controller-runtime/pkg/client"
34
35 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
36 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
37 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
38 "github.com/chaos-mesh/chaos-mesh/controllers/config"
39 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
40 )
41
42 var _ impltypes.ChaosImpl = (*Impl)(nil)
43
44 type Impl struct {
45 client.Client
46 Log logr.Logger
47
48 decoder *utils.ContainerRecordDecoder
49 }
50
51 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
52 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
53 if decodedContainer.PbClient != nil {
54 defer decodedContainer.PbClient.Close()
55 }
56 if err != nil {
57 return v1alpha1.NotInjected, err
58 }
59
60 service, err := impl.getService(ctx, config.ControllerCfg.Namespace, config.ControllerCfg.DNSServiceName)
61 if err != nil {
62 impl.Log.Error(err, "fail to get dns service")
63 return v1alpha1.NotInjected, err
64 }
65
66 dnsPods, err := impl.getPodsFromSelector(ctx, config.ControllerCfg.Namespace, service.Spec.Selector)
67 if err != nil {
68 impl.Log.Error(err, "fail to get pods from selector")
69 return v1alpha1.NotInjected, err
70 }
71
72 dnschaos := obj.(*v1alpha1.DNSChaos)
73 for _, pod := range dnsPods {
74 err = impl.setDNSServerRules(pod.Status.PodIP, config.ControllerCfg.DNSServicePort, dnschaos.Name, decodedContainer.Pod, dnschaos.Spec.Action, dnschaos.Spec.DomainNamePatterns)
75 if err != nil {
76 impl.Log.Error(err, "fail to set DNS server rules")
77 return v1alpha1.NotInjected, err
78 }
79 impl.Log.Info("Apply DNS chaos to DNS pod", "ip", service.Spec.ClusterIP)
80 }
81
82 _, err = decodedContainer.PbClient.SetDNSServer(ctx, &pb.SetDNSServerRequest{
83 ContainerId: decodedContainer.ContainerId,
84 DnsServer: service.Spec.ClusterIP,
85 Enable: true,
86 EnterNS: true,
87 })
88 if err != nil {
89 impl.Log.Error(err, "set dns server")
90 return v1alpha1.NotInjected, err
91 }
92
93 return v1alpha1.Injected, nil
94 }
95
96 func (impl *Impl) setDNSServerRules(dnsServerIP string, port int, name string, pod *v1.Pod, action v1alpha1.DNSChaosAction, patterns []string) error {
97 impl.Log.Info("setDNSServerRules", "name", name)
98
99 pbPods := make([]*dnspb.Pod, 1)
100 pbPods[0] = &dnspb.Pod{
101 Name: pod.Name,
102 Namespace: pod.Namespace,
103 }
104
105 conn, err := grpc.Dial(net.JoinHostPort(dnsServerIP, fmt.Sprintf("%d", port)), grpc.WithTransportCredentials(insecure.NewCredentials()))
106 if err != nil {
107 return err
108 }
109 defer conn.Close()
110
111 c := dnspb.NewDNSClient(conn)
112 request := &dnspb.SetDNSChaosRequest{
113 Name: name,
114 Action: string(action),
115 Pods: pbPods,
116 Patterns: patterns,
117 }
118
119 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
120 defer cancel()
121 response, err := c.SetDNSChaos(ctx, request)
122 if err != nil {
123 return err
124 }
125
126 if !response.Result {
127 return errors.Errorf("set dns chaos to dns server error: %s", response.Msg)
128 }
129
130 return nil
131 }
132
133 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
134 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
135 if decodedContainer.PbClient != nil {
136 defer decodedContainer.PbClient.Close()
137 }
138 if err != nil {
139 if errors.Is(err, utils.ErrContainerNotFound) {
140
141 return v1alpha1.NotInjected, nil
142 }
143 return v1alpha1.Injected, err
144 }
145
146 dnschaos := obj.(*v1alpha1.DNSChaos)
147
148
149 service, err := impl.getService(ctx, config.ControllerCfg.Namespace, config.ControllerCfg.DNSServiceName)
150 if err != nil {
151 impl.Log.Error(err, "fail to get dns service")
152 return v1alpha1.Injected, err
153 }
154
155 dnsPods, err := impl.getPodsFromSelector(ctx, config.ControllerCfg.Namespace, service.Spec.Selector)
156 if err != nil {
157 impl.Log.Error(err, "fail to get pods from selector")
158 return v1alpha1.NotInjected, err
159 }
160
161 for _, pod := range dnsPods {
162 err = impl.cancelDNSServerRules(pod.Status.PodIP, config.ControllerCfg.DNSServicePort, dnschaos.Name)
163 if err != nil {
164 impl.Log.Error(err, "fail to cancelDNSServerRules")
165 return v1alpha1.Injected, err
166 }
167 impl.Log.Info("Cancel DNS chaos to DNS pod", "ip", service.Spec.ClusterIP)
168 }
169
170 _, err = decodedContainer.PbClient.SetDNSServer(ctx, &pb.SetDNSServerRequest{
171 ContainerId: decodedContainer.ContainerId,
172 Enable: false,
173 EnterNS: true,
174 })
175 if err != nil {
176 impl.Log.Error(err, "recover pod for DNS chaos")
177 return v1alpha1.Injected, err
178 }
179
180 return v1alpha1.NotInjected, err
181 }
182
183 func (impl *Impl) cancelDNSServerRules(dnsServerIP string, port int, name string) error {
184 conn, err := grpc.Dial(net.JoinHostPort(dnsServerIP, fmt.Sprintf("%d", port)), grpc.WithTransportCredentials(insecure.NewCredentials()))
185 if err != nil {
186 return err
187 }
188 defer conn.Close()
189
190 c := dnspb.NewDNSClient(conn)
191 request := &dnspb.CancelDNSChaosRequest{
192 Name: name,
193 }
194
195 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
196 defer cancel()
197 response, err := c.CancelDNSChaos(ctx, request)
198 if err != nil {
199 return err
200 }
201
202 if !response.Result {
203 return errors.Errorf("set dns chaos to dns server error %s", response.Msg)
204 }
205
206 return nil
207 }
208
209
210 func (impl *Impl) getService(ctx context.Context, namespace string, serviceName string) (*v1.Service, error) {
211 service := &v1.Service{}
212 err := impl.Client.Get(ctx, client.ObjectKey{
213 Namespace: namespace,
214 Name: serviceName,
215 }, service)
216 if err != nil {
217 return nil, err
218 }
219
220 return service, nil
221 }
222
223
224 func (impl *Impl) getPodsFromSelector(ctx context.Context, namespace string, labelSelector map[string]string) ([]v1.Pod, error) {
225 lSelector := labels.SelectorFromSet(labelSelector)
226 listOptions := &client.ListOptions{
227 Namespace: namespace,
228 LabelSelector: lSelector,
229 }
230 podsList := &v1.PodList{}
231 err := impl.Client.List(ctx, podsList, listOptions)
232 if err != nil {
233 return nil, err
234 }
235
236 return podsList.Items, nil
237 }
238
239 func NewImpl(c client.Client, log logr.Logger, decoder *utils.ContainerRecordDecoder) *impltypes.ChaosImplPair {
240 return &impltypes.ChaosImplPair{
241 Name: "dnschaos",
242 Object: &v1alpha1.DNSChaos{},
243 Impl: &Impl{
244 Client: c,
245 Log: log.WithName("dnschaos"),
246 decoder: decoder,
247 },
248 }
249 }
250
251 var Module = fx.Provide(
252 fx.Annotated{
253 Group: "impl",
254 Target: NewImpl,
255 },
256 )
257