1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package kernelchaos
17
18 import (
19 "context"
20 "fmt"
21
22 "github.com/go-logr/logr"
23 "github.com/pkg/errors"
24 "go.uber.org/fx"
25 "google.golang.org/grpc"
26 v1 "k8s.io/api/core/v1"
27 k8sError "k8s.io/apimachinery/pkg/api/errors"
28 "k8s.io/apimachinery/pkg/types"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
33 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
34 "github.com/chaos-mesh/chaos-mesh/controllers/config"
35 "github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
36 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
37 pb "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
38 pb_kernel "github.com/chaos-mesh/chaos-mesh/pkg/chaoskernel/pb"
39 grpcUtils "github.com/chaos-mesh/chaos-mesh/pkg/grpc"
40 )
41
42 var _ impltypes.ChaosImpl = (*Impl)(nil)
43
44 type Impl struct {
45 client.Client
46 Log logr.Logger
47
48 chaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
49 }
50
51
52 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
53 kernelChaos := obj.(*v1alpha1.KernelChaos)
54 record := records[index]
55
56 log := impl.Log.WithValues("chaos", kernelChaos, "record", record)
57 podId, containerID, err := controller.ParseNamespacedNameContainer(record.Id)
58 if err != nil {
59 return v1alpha1.NotInjected, err
60 }
61 var pod v1.Pod
62 err = impl.Client.Get(ctx, podId, &pod)
63 if err != nil {
64 log.Error(err, "fail to get pod by record")
65
66 if k8sError.IsNotFound(err) {
67 return v1alpha1.NotInjected, nil
68 }
69 return v1alpha1.NotInjected, err
70 }
71
72 log = log.WithValues("pod", pod)
73
74 if err = impl.applyPod(ctx, &pod, kernelChaos, containerID); err != nil {
75 log.Error(err, "failed to apply chaos on pod")
76 return v1alpha1.NotInjected, err
77 }
78
79 return v1alpha1.Injected, nil
80 }
81
82
83 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
84 kernelChaos := obj.(*v1alpha1.KernelChaos)
85 record := records[index]
86
87 log := impl.Log.WithValues("chaos", kernelChaos, "record", record)
88 podId, containerID, err := controller.ParseNamespacedNameContainer(record.Id)
89 if err != nil {
90 errorInfo := fmt.Sprintf("kernelChaos recover error, record ID is %s", record.Id)
91 log.Error(err, errorInfo)
92
93 return v1alpha1.Injected, err
94 }
95 var pod v1.Pod
96 err = impl.Client.Get(ctx, podId, &pod)
97 if err != nil {
98 log.Error(err, "fail to get pod by record")
99
100 if k8sError.IsNotFound(err) {
101 return v1alpha1.NotInjected, nil
102 }
103 return v1alpha1.Injected, err
104 }
105
106 log = log.WithValues("pod", pod)
107
108 if err = impl.recoverPod(ctx, &pod, kernelChaos, containerID); err != nil {
109 log.Error(err, "failed to recover chaos on pod")
110 return v1alpha1.Injected, err
111 }
112
113 return v1alpha1.NotInjected, nil
114 }
115
116 func (impl *Impl) recoverPod(ctx context.Context, pod *v1.Pod, chaos *v1alpha1.KernelChaos, containerID string) error {
117 impl.Log.Info("try to recover pod", "namespace", pod.Namespace, "name", pod.Name)
118
119 pbClient, err := impl.chaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
120 Namespace: chaos.Namespace,
121 Name: chaos.Name,
122 })
123 if err != nil {
124 return err
125 }
126 defer pbClient.Close()
127
128 if len(pod.Status.ContainerStatuses) == 0 {
129 err = errors.Wrapf(utils.ErrContainerNotFound, "pod %s/%s has empty container status", pod.Namespace, pod.Name)
130 return err
131 }
132
133 containerResponse, err := pbClient.ContainerGetPid(ctx, &pb.ContainerRequest{
134 Action: &pb.ContainerAction{
135 Action: pb.ContainerAction_GETPID,
136 },
137 ContainerId: containerID,
138 })
139
140 if err != nil {
141 impl.Log.Error(err, "Get container pid error", "namespace", pod.Namespace, "name", pod.Name)
142 return err
143 }
144
145 impl.Log.Info("Get container pid", "namespace", pod.Namespace, "name", pod.Name)
146 conn, err := impl.CreateBPFKIConnection(ctx, impl.Client, pod)
147 if err != nil {
148 return err
149 }
150 defer conn.Close()
151
152 var callchain []*pb_kernel.FailKernRequestFrame
153 for _, frame := range chaos.Spec.FailKernRequest.Callchain {
154 callchain = append(callchain, &pb_kernel.FailKernRequestFrame{
155 Funcname: frame.Funcname,
156 Parameters: frame.Parameters,
157 Predicate: frame.Predicate,
158 })
159 }
160
161 bpfClient := pb_kernel.NewBPFKIServiceClient(conn)
162 _, err = bpfClient.RecoverMMOrBIO(ctx, &pb_kernel.FailKernRequest{
163 Pid: containerResponse.Pid,
164 Callchain: callchain,
165 })
166
167 return err
168 }
169
170 func (impl *Impl) applyPod(ctx context.Context, pod *v1.Pod, chaos *v1alpha1.KernelChaos, containerID string) error {
171 impl.Log.Info("Try to inject kernel on pod", "namespace", pod.Namespace, "name", pod.Name)
172
173 pbClient, err := impl.chaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
174 Namespace: chaos.Namespace,
175 Name: chaos.Name,
176 })
177 if err != nil {
178 return err
179 }
180 defer pbClient.Close()
181
182 if len(pod.Status.ContainerStatuses) == 0 {
183 err = errors.Wrapf(utils.ErrContainerNotFound, "pod %s/%s has empty container status", pod.Namespace, pod.Name)
184 return err
185 }
186
187 containerResponse, err := pbClient.ContainerGetPid(ctx, &pb.ContainerRequest{
188 Action: &pb.ContainerAction{
189 Action: pb.ContainerAction_GETPID,
190 },
191 ContainerId: containerID,
192 })
193 if err != nil {
194 impl.Log.Error(err, "Get container pid error", "namespace", pod.Namespace, "name", pod.Name)
195 return err
196 }
197
198 impl.Log.Info("Get container pid", "namespace", pod.Namespace, "name", pod.Name)
199 conn, err := impl.CreateBPFKIConnection(ctx, impl.Client, pod)
200 if err != nil {
201 return err
202 }
203 defer conn.Close()
204
205 var callchain []*pb_kernel.FailKernRequestFrame
206 for _, frame := range chaos.Spec.FailKernRequest.Callchain {
207 callchain = append(callchain, &pb_kernel.FailKernRequestFrame{
208 Funcname: frame.Funcname,
209 Parameters: frame.Parameters,
210 Predicate: frame.Predicate,
211 })
212 }
213
214 bpfClient := pb_kernel.NewBPFKIServiceClient(conn)
215 _, err = bpfClient.FailMMOrBIO(ctx, &pb_kernel.FailKernRequest{
216 Pid: containerResponse.Pid,
217 Ftype: pb_kernel.FailKernRequest_FAILTYPE(chaos.Spec.FailKernRequest.FailType),
218 Headers: chaos.Spec.FailKernRequest.Headers,
219 Callchain: callchain,
220 Probability: float32(chaos.Spec.FailKernRequest.Probability) / 100,
221 Times: chaos.Spec.FailKernRequest.Times,
222 })
223
224 return err
225 }
226
227
228 func (impl *Impl) CreateBPFKIConnection(ctx context.Context, c client.Client, pod *v1.Pod) (*grpc.ClientConn, error) {
229 daemonIP, err := impl.chaosDaemonClientBuilder.FindDaemonIP(ctx, pod)
230 if err != nil {
231 return nil, err
232 }
233 builder := grpcUtils.Builder(daemonIP, config.ControllerCfg.BPFKIPort).
234 WithDefaultTimeout().
235 Insecure()
236 return builder.Build()
237 }
238
239 func NewImpl(c client.Client, log logr.Logger, builder *chaosdaemon.ChaosDaemonClientBuilder) *impltypes.ChaosImplPair {
240 return &impltypes.ChaosImplPair{
241 Name: "kernelchaos",
242 Object: &v1alpha1.KernelChaos{},
243 Impl: &Impl{
244 Client: c,
245 Log: log.WithName("kernelchaos"),
246 chaosDaemonClientBuilder: builder,
247 },
248 ObjectList: &v1alpha1.KernelChaosList{},
249 }
250 }
251
252 var Module = fx.Provide(
253 fx.Annotated{
254 Group: "impl",
255 Target: NewImpl,
256 },
257 )
258