1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package httpchaos
17
18 import (
19 "context"
20 "strings"
21
22 "github.com/go-logr/logr"
23 "github.com/pkg/errors"
24 "go.uber.org/fx"
25 v1 "k8s.io/api/core/v1"
26 k8sError "k8s.io/apimachinery/pkg/api/errors"
27 "k8s.io/apimachinery/pkg/types"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29
30 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
31 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/httpchaos/podhttpchaosmanager"
32 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/iochaos/podiochaosmanager"
33 impltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
34 "github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
35 )
36
37 var _ impltypes.ChaosImpl = (*Impl)(nil)
38
39 const (
40 waitForApplySync v1alpha1.Phase = "Not Injected/Wait"
41 waitForRecoverSync v1alpha1.Phase = "Injected/Wait"
42 )
43
44 type Impl struct {
45 client.Client
46 Log logr.Logger
47
48 builder *podhttpchaosmanager.Builder
49 }
50
51 func (impl *Impl) Apply(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
52
53
54 impl.Log.Info("httpchaos Apply", "namespace", obj.GetNamespace(), "name", obj.GetName())
55 httpchaos := obj.(*v1alpha1.HTTPChaos)
56 if httpchaos.Status.Instances == nil {
57 httpchaos.Status.Instances = make(map[string]int64)
58 }
59
60 record := records[index]
61 phase := record.Phase
62
63 if phase == waitForApplySync {
64 podhttpchaos := &v1alpha1.PodHttpChaos{}
65 namespacedName, err := controller.ParseNamespacedName(record.Id)
66 if err != nil {
67 return waitForApplySync, err
68 }
69 err = impl.Client.Get(ctx, namespacedName, podhttpchaos)
70 if err != nil {
71 return waitForApplySync, err
72 }
73
74 if podhttpchaos.Status.FailedMessage != "" {
75 return waitForApplySync, errors.New(podhttpchaos.Status.FailedMessage)
76 }
77
78 if podhttpchaos.Status.ObservedGeneration >= httpchaos.Status.Instances[record.Id] {
79 return v1alpha1.Injected, nil
80 }
81
82 return waitForApplySync, nil
83 }
84
85 podId, err := controller.ParseNamespacedName(records[index].Id)
86 if err != nil {
87 return v1alpha1.NotInjected, err
88 }
89 var pod v1.Pod
90 err = impl.Client.Get(ctx, podId, &pod)
91 if err != nil {
92 return v1alpha1.NotInjected, err
93 }
94
95 source := httpchaos.Namespace + "/" + httpchaos.Name
96 m := impl.builder.WithInit(source, types.NamespacedName{
97 Namespace: pod.Namespace,
98 Name: pod.Name,
99 })
100
101 m.T.Append(v1alpha1.PodHttpChaosRule{
102 Source: m.Source,
103 Port: httpchaos.Spec.Port,
104 PodHttpChaosBaseRule: v1alpha1.PodHttpChaosBaseRule{
105 Target: httpchaos.Spec.Target,
106 Selector: v1alpha1.PodHttpChaosSelector{
107 Port: &httpchaos.Spec.Port,
108 Path: httpchaos.Spec.Path,
109 Method: httpchaos.Spec.Method,
110 Code: httpchaos.Spec.Code,
111 RequestHeaders: httpchaos.Spec.RequestHeaders,
112 ResponseHeaders: httpchaos.Spec.ResponseHeaders,
113 },
114 Actions: httpchaos.Spec.PodHttpChaosActions,
115 },
116 })
117
118 if httpchaos.Spec.TLS != nil {
119 m.T.Append(httpchaos.Spec.TLS)
120 }
121
122 generationNumber, err := m.Commit(ctx)
123 if err != nil {
124 return v1alpha1.NotInjected, err
125 }
126
127
128 httpchaos.Status.Instances[record.Id] = generationNumber
129 return waitForApplySync, nil
130 }
131
132 func (impl *Impl) Recover(ctx context.Context, index int, records []*v1alpha1.Record, obj v1alpha1.InnerObject) (v1alpha1.Phase, error) {
133
134
135 httpchaos := obj.(*v1alpha1.HTTPChaos)
136 if httpchaos.Status.Instances == nil {
137 httpchaos.Status.Instances = make(map[string]int64)
138 }
139
140 record := records[index]
141 phase := record.Phase
142 if phase == waitForRecoverSync {
143 podhttpchaos := &v1alpha1.PodHttpChaos{}
144 namespacedName, err := controller.ParseNamespacedName(record.Id)
145 if err != nil {
146
147 return waitForRecoverSync, err
148 }
149 err = impl.Client.Get(ctx, namespacedName, podhttpchaos)
150 if err != nil {
151
152 if k8sError.IsNotFound(err) {
153 return v1alpha1.NotInjected, nil
154 }
155
156 if k8sError.IsForbidden(err) {
157 if strings.Contains(err.Error(), "because it is being terminated") {
158 return v1alpha1.NotInjected, nil
159 }
160 }
161
162 return waitForRecoverSync, err
163 }
164
165 if podhttpchaos.Status.FailedMessage != "" {
166 return waitForRecoverSync, errors.New(podhttpchaos.Status.FailedMessage)
167 }
168
169 if podhttpchaos.Status.ObservedGeneration >= httpchaos.Status.Instances[record.Id] {
170 return v1alpha1.NotInjected, nil
171 }
172
173 return waitForRecoverSync, nil
174 }
175
176 podId, err := controller.ParseNamespacedName(records[index].Id)
177 if err != nil {
178
179 return v1alpha1.NotInjected, err
180 }
181 var pod v1.Pod
182 err = impl.Client.Get(ctx, podId, &pod)
183 if err != nil {
184
185 if k8sError.IsNotFound(err) {
186 return v1alpha1.NotInjected, nil
187 }
188 return v1alpha1.Injected, err
189 }
190
191 source := httpchaos.Namespace + "/" + httpchaos.Name
192 m := impl.builder.WithInit(source, types.NamespacedName{
193 Namespace: pod.Namespace,
194 Name: pod.Name,
195 })
196
197 generationNumber, err := m.Commit(ctx)
198 if err != nil {
199 if err == podiochaosmanager.ErrPodNotFound || err == podiochaosmanager.ErrPodNotRunning {
200 return v1alpha1.NotInjected, nil
201 }
202
203 if k8sError.IsForbidden(err) {
204 if strings.Contains(err.Error(), "because it is being terminated") {
205 return v1alpha1.NotInjected, nil
206 }
207 }
208 return v1alpha1.Injected, err
209 }
210
211
212 httpchaos.Status.Instances[record.Id] = generationNumber
213 return waitForRecoverSync, nil
214 }
215
216 func NewImpl(c client.Client, b *podhttpchaosmanager.Builder, log logr.Logger) *impltypes.ChaosImplPair {
217 return &impltypes.ChaosImplPair{
218 Name: "httpchaos",
219 Object: &v1alpha1.HTTPChaos{},
220 Impl: &Impl{
221 Client: c,
222 Log: log.WithName("httpchaos"),
223 builder: b,
224 },
225 ObjectList: &v1alpha1.HTTPChaosList{},
226 Controlls: []client.Object{&v1alpha1.PodHttpChaos{}},
227 }
228 }
229
230 var Module = fx.Provide(
231 fx.Annotated{
232 Group: "impl",
233 Target: NewImpl,
234 },
235 podhttpchaosmanager.NewBuilder,
236 )
237