1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package iochaos
17
18 import (
19 "context"
20 "strings"
21
22 "github.com/go-logr/logr"
23 "github.com/pkg/errors"
24 "go.uber.org/fx"
25 v1 "k8s.io/api/core/v1"
26 k8sError "k8s.io/apimachinery/pkg/api/errors"
27 "k8s.io/apimachinery/pkg/types"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/iochaos/podiochaosmanager"
32 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
33 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
34 )
35
36 var _ impltypes.ChaosImpl = (*Impl)(nil)
37
38 const (
39 waitForApplySync v1alpha1.Phase = "Not Injected/Wait"
40 waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
41 )
42
43 type Impl struct {
44 client.Client
45 Log logr.Logger
46
47 builder *podiochaosmanager.Builder
48 }
49
50 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
51
52
53 impl.Log.Info("iochaos Apply", "namespace", obj.GetNamespace(), "name", obj.GetName())
54 iochaos := obj.(*v1alpha1.IOChaos)
55 if iochaos.Status.Instances == nil {
56 iochaos.Status.Instances = make(map[string]int64)
57 }
58
59 record := records[index]
60 phase := record.Phase
61
62 if phase == waitForApplySync {
63 podiochaos := &v1alpha1.PodIOChaos{}
64 namespacedName, err := controller.ParseNamespacedName(record.Id)
65 if err != nil {
66 return waitForApplySync, err
67 }
68 err = impl.Client.Get(ctx, namespacedName, podiochaos)
69 if err != nil {
70 if k8sError.IsNotFound(err) {
71 return v1alpha1.NotInjected, nil
72 }
73
74 if k8sError.IsForbidden(err) {
75 if strings.Contains(err.Error(), "because it is being terminated") {
76 return v1alpha1.NotInjected, nil
77 }
78 }
79
80 return waitForApplySync, err
81 }
82
83 if podiochaos.Status.FailedMessage != "" {
84 return waitForApplySync, errors.New(podiochaos.Status.FailedMessage)
85 }
86
87 if podiochaos.Status.ObservedGeneration >= iochaos.Status.Instances[record.Id] {
88 return v1alpha1.Injected, nil
89 }
90
91 return waitForApplySync, nil
92 }
93
94 podId, containerName, err := controller.ParseNamespacedNameContainer(records[index].Id)
95 if err != nil {
96 return v1alpha1.NotInjected, err
97 }
98 var pod v1.Pod
99 err = impl.Client.Get(ctx, podId, &pod)
100 if err != nil {
101 return v1alpha1.NotInjected, err
102 }
103
104 source := iochaos.Namespace + "/" + iochaos.Name
105 m := impl.builder.WithInit(source, types.NamespacedName{
106 Namespace: pod.Namespace,
107 Name: pod.Name,
108 })
109
110 m.T.SetVolumePath(iochaos.Spec.VolumePath)
111 m.T.SetContainer(containerName)
112
113 m.T.Append(v1alpha1.IOChaosAction{
114 Type: iochaos.Spec.Action,
115 Filter: v1alpha1.Filter{
116 Path: iochaos.Spec.Path,
117 Percent: iochaos.Spec.Percent,
118 Methods: iochaos.Spec.Methods,
119 },
120 Faults: []v1alpha1.IoFault{
121 {
122 Errno: iochaos.Spec.Errno,
123 Weight: 1,
124 },
125 },
126 Latency: iochaos.Spec.Delay,
127 AttrOverrideSpec: iochaos.Spec.Attr,
128 MistakeSpec: iochaos.Spec.Mistake,
129 Source: m.Source,
130 })
131 generationNumber, err := m.Commit(ctx, iochaos)
132 if err != nil {
133 return v1alpha1.NotInjected, err
134 }
135
136
137 iochaos.Status.Instances[record.Id] = generationNumber
138 return waitForApplySync, nil
139 }
140
141 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
142
143
144 iochaos := obj.(*v1alpha1.IOChaos)
145 if iochaos.Status.Instances == nil {
146 iochaos.Status.Instances = make(map[string]int64)
147 }
148
149 record := records[index]
150 phase := record.Phase
151 if phase == waitForRecoverSync {
152 podiochaos := &v1alpha1.PodIOChaos{}
153 namespacedName, err := controller.ParseNamespacedName(record.Id)
154 if err != nil {
155
156 return waitForRecoverSync, nil
157 }
158 err = impl.Client.Get(ctx, namespacedName, podiochaos)
159 if err != nil {
160
161 if k8sError.IsNotFound(err) {
162 return v1alpha1.NotInjected, nil
163 }
164 return waitForRecoverSync, err
165 }
166
167 if podiochaos.Status.FailedMessage != "" {
168 return waitForRecoverSync, errors.New(podiochaos.Status.FailedMessage)
169 }
170
171 if podiochaos.Status.ObservedGeneration >= iochaos.Status.Instances[record.Id] {
172 return v1alpha1.NotInjected, nil
173 }
174
175 return waitForRecoverSync, nil
176 }
177
178 podId, _, err := controller.ParseNamespacedNameContainer(records[index].Id)
179 if err != nil {
180
181 return v1alpha1.NotInjected, err
182 }
183 var pod v1.Pod
184 err = impl.Client.Get(ctx, podId, &pod)
185 if err != nil {
186
187 if k8sError.IsNotFound(err) {
188 return v1alpha1.NotInjected, nil
189 }
190 return v1alpha1.Injected, err
191 }
192
193 source := iochaos.Namespace + "/" + iochaos.Name
194 m := impl.builder.WithInit(source, types.NamespacedName{
195 Namespace: pod.Namespace,
196 Name: pod.Name,
197 })
198
199 generationNumber, err := m.Commit(ctx, iochaos)
200 if err != nil {
201 if err == podiochaosmanager.ErrPodNotFound || err == podiochaosmanager.ErrPodNotRunning {
202 return v1alpha1.NotInjected, nil
203 }
204
205 if k8sError.IsForbidden(err) {
206 if strings.Contains(err.Error(), "because it is being terminated") {
207 return v1alpha1.NotInjected, nil
208 }
209 }
210 return v1alpha1.Injected, err
211 }
212
213
214 iochaos.Status.Instances[record.Id] = generationNumber
215 return waitForRecoverSync, nil
216 }
217
218 func NewImpl(c client.Client, b *podiochaosmanager.Builder, log logr.Logger) *impltypes.ChaosImplPair {
219 return &impltypes.ChaosImplPair{
220 Name: "iochaos",
221 Object: &v1alpha1.IOChaos{},
222 Impl: &Impl{
223 Client: c,
224 Log: log.WithName("iochaos"),
225 builder: b,
226 },
227 ObjectList: &v1alpha1.IOChaosList{},
228 Controlls: []client.Object{&v1alpha1.PodIOChaos{}},
229 }
230 }
231
232 var Module = fx.Provide(
233 fx.Annotated{
234 Group: "impl",
235 Target: NewImpl,
236 },
237 podiochaosmanager.NewBuilder,
238 )
239