1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package dnschaos
17
18 import (
19 "context"
20 "fmt"
21 "net"
22 "time"
23
24 dnspb "github.com/chaos-mesh/k8s_dns_chaos/pb"
25 "github.com/go-logr/logr"
26 "github.com/pkg/errors"
27 "go.uber.org/fx"
28 "google.golang.org/grpc"
29 v1 "k8s.io/api/core/v1"
30 "k8s.io/apimachinery/pkg/labels"
31 "sigs.k8s.io/controller-runtime/pkg/client"
32
33 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
34 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
35 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
36 "github.com/chaos-mesh/chaos-mesh/controllers/config"
37 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
38 )
39
40 var _ impltypes.ChaosImpl = (*Impl)(nil)
41
42 type Impl struct {
43 client.Client
44 Log logr.Logger
45
46 decoder *utils.ContainerRecordDecoder
47 }
48
49 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
50 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
51 if decodedContainer.PbClient != nil {
52 defer decodedContainer.PbClient.Close()
53 }
54 if err != nil {
55 return v1alpha1.NotInjected, err
56 }
57
58 service, err := impl.getService(ctx, config.ControllerCfg.Namespace, config.ControllerCfg.DNSServiceName)
59 if err != nil {
60 impl.Log.Error(err, "fail to get dns service")
61 return v1alpha1.NotInjected, err
62 }
63
64 dnsPods, err := impl.getPodsFromSelector(ctx, config.ControllerCfg.Namespace, service.Spec.Selector)
65 if err != nil {
66 impl.Log.Error(err, "fail to get pods from selector")
67 return v1alpha1.NotInjected, err
68 }
69
70 dnschaos := obj.(*v1alpha1.DNSChaos)
71 for _, pod := range dnsPods {
72 err = impl.setDNSServerRules(pod.Status.PodIP, config.ControllerCfg.DNSServicePort, dnschaos.Name, decodedContainer.Pod, dnschaos.Spec.Action, dnschaos.Spec.DomainNamePatterns)
73 if err != nil {
74 impl.Log.Error(err, "fail to set DNS server rules")
75 return v1alpha1.NotInjected, err
76 }
77 impl.Log.Info("Apply DNS chaos to DNS pod", "ip", service.Spec.ClusterIP)
78 }
79
80 _, err = decodedContainer.PbClient.SetDNSServer(ctx, &pb.SetDNSServerRequest{
81 ContainerId: decodedContainer.ContainerId,
82 DnsServer: service.Spec.ClusterIP,
83 Enable: true,
84 EnterNS: true,
85 })
86 if err != nil {
87 impl.Log.Error(err, "set dns server")
88 return v1alpha1.NotInjected, err
89 }
90
91 return v1alpha1.Injected, nil
92 }
93
94 func (impl *Impl) setDNSServerRules(dnsServerIP string, port int, name string, pod *v1.Pod, action v1alpha1.DNSChaosAction, patterns []string) error {
95 impl.Log.Info("setDNSServerRules", "name", name)
96
97 pbPods := make([]*dnspb.Pod, 1)
98 pbPods[0] = &dnspb.Pod{
99 Name: pod.Name,
100 Namespace: pod.Namespace,
101 }
102
103 conn, err := grpc.Dial(net.JoinHostPort(dnsServerIP, fmt.Sprintf("%d", port)), grpc.WithInsecure())
104 if err != nil {
105 return err
106 }
107 defer conn.Close()
108
109 c := dnspb.NewDNSClient(conn)
110 request := &dnspb.SetDNSChaosRequest{
111 Name: name,
112 Action: string(action),
113 Pods: pbPods,
114 Patterns: patterns,
115 }
116
117 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
118 defer cancel()
119 response, err := c.SetDNSChaos(ctx, request)
120 if err != nil {
121 return err
122 }
123
124 if !response.Result {
125 return errors.Errorf("set dns chaos to dns server error: %s", response.Msg)
126 }
127
128 return nil
129 }
130
131 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
132 decodedContainer, err := impl.decoder.DecodeContainerRecord(ctx, records[index], obj)
133 if decodedContainer.PbClient != nil {
134 defer decodedContainer.PbClient.Close()
135 }
136 if err != nil {
137 if errors.Is(err, utils.ErrContainerNotFound) {
138
139 return v1alpha1.NotInjected, nil
140 }
141 return v1alpha1.Injected, err
142 }
143
144 dnschaos := obj.(*v1alpha1.DNSChaos)
145
146
147 service, err := impl.getService(ctx, config.ControllerCfg.Namespace, config.ControllerCfg.DNSServiceName)
148 if err != nil {
149 impl.Log.Error(err, "fail to get dns service")
150 return v1alpha1.Injected, err
151 }
152
153 dnsPods, err := impl.getPodsFromSelector(ctx, config.ControllerCfg.Namespace, service.Spec.Selector)
154 if err != nil {
155 impl.Log.Error(err, "fail to get pods from selector")
156 return v1alpha1.NotInjected, err
157 }
158
159 for _, pod := range dnsPods {
160 err = impl.cancelDNSServerRules(pod.Status.PodIP, config.ControllerCfg.DNSServicePort, dnschaos.Name)
161 if err != nil {
162 impl.Log.Error(err, "fail to cancelDNSServerRules")
163 return v1alpha1.Injected, err
164 }
165 impl.Log.Info("Cancel DNS chaos to DNS pod", "ip", service.Spec.ClusterIP)
166 }
167
168 _, err = decodedContainer.PbClient.SetDNSServer(ctx, &pb.SetDNSServerRequest{
169 ContainerId: decodedContainer.ContainerId,
170 Enable: false,
171 EnterNS: true,
172 })
173 if err != nil {
174 impl.Log.Error(err, "recover pod for DNS chaos")
175 return v1alpha1.Injected, err
176 }
177
178 return v1alpha1.NotInjected, err
179 }
180
181 func (impl *Impl) cancelDNSServerRules(dnsServerIP string, port int, name string) error {
182 conn, err := grpc.Dial(net.JoinHostPort(dnsServerIP, fmt.Sprintf("%d", port)), grpc.WithInsecure())
183 if err != nil {
184 return err
185 }
186 defer conn.Close()
187
188 c := dnspb.NewDNSClient(conn)
189 request := &dnspb.CancelDNSChaosRequest{
190 Name: name,
191 }
192
193 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
194 defer cancel()
195 response, err := c.CancelDNSChaos(ctx, request)
196 if err != nil {
197 return err
198 }
199
200 if !response.Result {
201 return errors.Errorf("set dns chaos to dns server error %s", response.Msg)
202 }
203
204 return nil
205 }
206
207
208 func (impl *Impl) getService(ctx context.Context, namespace string, serviceName string) (*v1.Service, error) {
209 service := &v1.Service{}
210 err := impl.Client.Get(ctx, client.ObjectKey{
211 Namespace: namespace,
212 Name: serviceName,
213 }, service)
214 if err != nil {
215 return nil, err
216 }
217
218 return service, nil
219 }
220
221
222 func (impl *Impl) getPodsFromSelector(ctx context.Context, namespace string, labelSelector map[string]string) ([]v1.Pod, error) {
223 lSelector := labels.SelectorFromSet(labelSelector)
224 listOptions := &client.ListOptions{
225 Namespace: namespace,
226 LabelSelector: lSelector,
227 }
228 podsList := &v1.PodList{}
229 err := impl.Client.List(ctx, podsList, listOptions)
230 if err != nil {
231 return nil, err
232 }
233
234 return podsList.Items, nil
235 }
236
237 func NewImpl(c client.Client, log logr.Logger, decoder *utils.ContainerRecordDecoder) *impltypes.ChaosImplPair {
238 return &impltypes.ChaosImplPair{
239 Name: "dnschaos",
240 Object: &v1alpha1.DNSChaos{},
241 Impl: &Impl{
242 Client: c,
243 Log: log.WithName("dnschaos"),
244 decoder: decoder,
245 },
246 }
247 }
248
249 var Module = fx.Provide(
250 fx.Annotated{
251 Group: "impl",
252 Target: NewImpl,
253 },
254 )
255