1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package podhttpchaos
17
18 import (
19 "context"
20 "encoding/json"
21 "fmt"
22 "net/http"
23
24 "github.com/go-logr/logr"
25 "github.com/pkg/errors"
26 v1 "k8s.io/api/core/v1"
27 apierrors "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/types"
30 "k8s.io/client-go/tools/record"
31 "k8s.io/client-go/util/retry"
32 ctrl "sigs.k8s.io/controller-runtime"
33 "sigs.k8s.io/controller-runtime/pkg/client"
34
35 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
36 "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/utils"
37 "github.com/chaos-mesh/chaos-mesh/controllers/utils/chaosdaemon"
38 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
39 "github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/tproxyconfig"
40 )
41
42
43 type Reconciler struct {
44 client.Client
45
46 Recorder record.EventRecorder
47 Log logr.Logger
48 ChaosDaemonClientBuilder *chaosdaemon.ChaosDaemonClientBuilder
49 }
50
51 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
52 obj := &v1alpha1.PodHttpChaos{}
53
54 if err := r.Client.Get(ctx, req.NamespacedName, obj); err != nil {
55 if apierrors.IsNotFound(err) {
56 r.Log.Info("chaos not found")
57 } else {
58
59 r.Log.Error(err, "unable to get chaos")
60 }
61 return ctrl.Result{}, nil
62 }
63
64 if obj.ObjectMeta.Generation <= obj.Status.ObservedGeneration && obj.Status.FailedMessage == "" {
65 r.Log.Info("the target pod has been up to date", "pod", obj.Namespace+"/"+obj.Name)
66 return ctrl.Result{}, nil
67 }
68
69 r.Log.Info("updating http chaos", "pod", obj.Namespace+"/"+obj.Name, "spec", obj.Spec)
70
71 pod := &v1.Pod{}
72
73 err := r.Client.Get(ctx, types.NamespacedName{
74 Name: obj.Name,
75 Namespace: obj.Namespace,
76 }, pod)
77 if err != nil {
78 err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
79 r.Log.Error(err, "fail to find pod")
80 return ctrl.Result{}, nil
81 }
82
83 observedGeneration := obj.ObjectMeta.Generation
84 pid := obj.Status.Pid
85 startTime := obj.Status.StartTime
86
87 defer func() {
88 var failedMessage string
89 if err != nil {
90 failedMessage = err.Error()
91 }
92
93 updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
94 obj := &v1alpha1.PodHttpChaos{}
95
96 if err := r.Client.Get(context.TODO(), req.NamespacedName, obj); err != nil {
97 r.Log.Error(err, "unable to get chaos")
98 return err
99 }
100
101 obj.Status.FailedMessage = failedMessage
102 obj.Status.ObservedGeneration = observedGeneration
103 obj.Status.Pid = pid
104 obj.Status.StartTime = startTime
105
106 return r.Client.Status().Update(context.TODO(), obj)
107 })
108
109 if updateError != nil {
110 updateError = errors.Wrapf(updateError, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
111 r.Log.Error(updateError, "fail to update")
112 r.Recorder.Eventf(obj, "Normal", "Failed", "Failed to update status: %s", updateError.Error())
113 }
114 }()
115
116 pbClient, err := r.ChaosDaemonClientBuilder.Build(ctx, pod, &types.NamespacedName{
117 Namespace: obj.Namespace,
118 Name: obj.Name,
119 })
120 if err != nil {
121 err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
122 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
123 return ctrl.Result{Requeue: true}, nil
124 }
125 defer pbClient.Close()
126
127 if len(pod.Status.ContainerStatuses) == 0 {
128 err = errors.Wrapf(utils.ErrContainerNotFound, "pod %s/%s has empty container status", pod.Namespace, pod.Name)
129 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
130 return ctrl.Result{}, nil
131 }
132
133 containerID := pod.Status.ContainerStatuses[0].ContainerID
134
135 rules := make([]v1alpha1.PodHttpChaosBaseRule, 0)
136 proxyPortsMap := make(map[uint32]bool)
137
138 for _, rule := range obj.Spec.Rules {
139 proxyPortsMap[uint32(rule.Port)] = true
140 rules = append(rules, rule.PodHttpChaosBaseRule)
141 }
142
143 var proxyPorts []uint32
144 for port := range proxyPortsMap {
145 proxyPorts = append(proxyPorts, port)
146 }
147
148 inputRules, err := json.Marshal(rules)
149 if err != nil {
150 err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
151 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
152 return ctrl.Result{}, nil
153 }
154
155 inputTLS := []byte("")
156 if obj.Spec.TLS != nil {
157 tlsKeys := obj.Spec.TLS
158 secret := v1.Secret{
159 ObjectMeta: metav1.ObjectMeta{
160 Name: tlsKeys.SecretName,
161 Namespace: tlsKeys.SecretNamespace,
162 },
163 }
164 if err := r.Client.Get(context.TODO(), req.NamespacedName, &secret); err != nil {
165 r.Log.Error(err, "unable to get secret")
166 return ctrl.Result{}, nil
167 }
168
169 cert, ok := secret.Data[tlsKeys.CertName]
170 if !ok {
171 err = errors.Wrapf(err, "get cert %s", tlsKeys.CertName)
172 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
173 return ctrl.Result{}, nil
174 }
175
176 key, ok := secret.Data[tlsKeys.KeyName]
177 if !ok {
178 err = errors.Wrapf(err, "get key %s", tlsKeys.KeyName)
179 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
180 return ctrl.Result{}, nil
181 }
182
183 var ca []byte
184 if tlsKeys.CAName != nil {
185 ca, ok = secret.Data[*tlsKeys.CAName]
186 if !ok {
187 err = errors.Wrapf(err, "get ca %s", *tlsKeys.CAName)
188 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
189 return ctrl.Result{}, nil
190 }
191 }
192
193 tlsConfig := tproxyconfig.TLSConfig{
194 CertFile: tproxyconfig.TLSConfigItem{
195 Type: "Contents",
196 Value: cert,
197 },
198 KeyFile: tproxyconfig.TLSConfigItem{
199 Type: "Contents",
200 Value: key,
201 },
202 }
203
204 if ca != nil {
205 tlsConfig.CAFile = &tproxyconfig.TLSConfigItem{
206 Type: "Contents",
207 Value: ca,
208 }
209 }
210
211 inputTLS, err = json.Marshal(tlsConfig)
212 if err != nil {
213 err = errors.Wrapf(err, "apply for pod %s/%s", pod.Namespace, pod.Name)
214 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
215 return ctrl.Result{}, nil
216 }
217 }
218
219 r.Log.Info("input with", "rules", string(inputRules))
220
221 res, err := pbClient.ApplyHttpChaos(ctx, &pb.ApplyHttpChaosRequest{
222 Rules: string(inputRules),
223 Tls: string(inputTLS),
224 ProxyPorts: proxyPorts,
225 ContainerId: containerID,
226
227 Instance: obj.Status.Pid,
228 StartTime: obj.Status.StartTime,
229 EnterNS: true,
230 })
231 if err != nil {
232 err = errors.Wrapf(err, "failed to apply for pod %s/%s", pod.Namespace, pod.Name)
233 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
234 return ctrl.Result{Requeue: true}, nil
235 }
236
237 if res.StatusCode != http.StatusOK {
238 err = errors.Wrapf(fmt.Errorf("%s", res.Error),
239 "failed to apply for pod %s/%s, status(%d)",
240 pod.Namespace, pod.Name, res.StatusCode)
241 r.Recorder.Event(obj, "Warning", "Failed", err.Error())
242 return ctrl.Result{Requeue: true}, nil
243 }
244
245 pid = res.Instance
246 startTime = res.StartTime
247
248 return ctrl.Result{}, nil
249 }
250