1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package util
17
18 import (
19 "bytes"
20 "context"
21 "encoding/json"
22 "fmt"
23 "net/http"
24 "time"
25
26 apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
27 apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/labels"
30 "k8s.io/apimachinery/pkg/types"
31 "k8s.io/apimachinery/pkg/util/wait"
32 "k8s.io/client-go/kubernetes"
33 apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
34 aggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
35 "k8s.io/kubernetes/test/e2e/framework"
36 "sigs.k8s.io/controller-runtime/pkg/client"
37
38 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
39 )
40
41
42 func WaitForAPIServicesAvailable(client aggregatorclientset.Interface, selector labels.Selector) error {
43 isAvailable := func(status apiregistrationv1.APIServiceStatus) bool {
44 if status.Conditions == nil {
45 return false
46 }
47 for _, condition := range status.Conditions {
48 if condition.Type == apiregistrationv1.Available {
49 return condition.Status == apiregistrationv1.ConditionTrue
50 }
51 }
52 return false
53 }
54 return wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) {
55 apiServiceList, err := client.ApiregistrationV1().APIServices().List(
56 context.TODO(),
57 metav1.ListOptions{
58 LabelSelector: selector.String(),
59 })
60 if err != nil {
61 return false, err
62 }
63 for _, apiService := range apiServiceList.Items {
64 if !isAvailable(apiService.Status) {
65 framework.Logf("APIService %q is not available yet", apiService.Name)
66 return false, nil
67 }
68 }
69 for _, apiService := range apiServiceList.Items {
70 framework.Logf("APIService %q is available", apiService.Name)
71 }
72 return true, nil
73 })
74 }
75
76
77 func WaitForCRDsEstablished(client apiextensionsclientset.Interface, selector labels.Selector) error {
78 isEstablished := func(status apiextensionsv1beta1.CustomResourceDefinitionStatus) bool {
79 if status.Conditions == nil {
80 return false
81 }
82 for _, condition := range status.Conditions {
83 if condition.Type == apiextensionsv1beta1.Established {
84 return condition.Status == apiextensionsv1beta1.ConditionTrue
85 }
86 }
87 return false
88 }
89 return wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) {
90 crdList, err := client.ApiextensionsV1beta1().CustomResourceDefinitions().List(
91 context.TODO(),
92 metav1.ListOptions{
93 LabelSelector: selector.String(),
94 })
95 if err != nil {
96 return false, err
97 }
98 for _, crd := range crdList.Items {
99 if !isEstablished(crd.Status) {
100 framework.Logf("CRD %q is not established yet", crd.Name)
101 return false, nil
102 }
103 }
104 for _, crd := range crdList.Items {
105 framework.Logf("CRD %q is established", crd.Name)
106 }
107 return true, nil
108 })
109 }
110
111
112 func WaitDeploymentReady(name, namespace string, cli kubernetes.Interface) error {
113 return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
114 d, err := cli.AppsV1().Deployments(namespace).Get(
115 context.TODO(),
116 name,
117 metav1.GetOptions{},
118 )
119 if err != nil {
120 return false, nil
121 }
122 if d.Status.AvailableReplicas != *d.Spec.Replicas {
123 return false, nil
124 }
125 if d.Status.UpdatedReplicas != *d.Spec.Replicas {
126 return false, nil
127 }
128 return true, nil
129 })
130 }
131
132 func PauseChaos(ctx context.Context, cli client.Client, chaos client.Object) error {
133 var mergePatch []byte
134 mergePatch, _ = json.Marshal(map[string]interface{}{
135 "metadata": map[string]interface{}{
136 "annotations": map[string]string{v1alpha1.PauseAnnotationKey: "true"},
137 },
138 })
139 return cli.Patch(ctx, chaos, client.RawPatch(types.MergePatchType, mergePatch))
140 }
141
142 func UnPauseChaos(ctx context.Context, cli client.Client, chaos client.Object) error {
143 var mergePatch []byte
144 mergePatch, _ = json.Marshal(map[string]interface{}{
145 "metadata": map[string]interface{}{
146 "annotations": map[string]string{v1alpha1.PauseAnnotationKey: "false"},
147 },
148 })
149 return cli.Patch(ctx, chaos, client.RawPatch(types.MergePatchType, mergePatch))
150 }
151
152 func WaitE2EHelperReady(c http.Client, port uint16) error {
153 return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
154 if _, err = c.Get(fmt.Sprintf("http://localhost:%d/ping", port)); err != nil {
155 return false, nil
156 }
157 return true, nil
158 })
159 }
160
161 func WaitHTTPE2EHelperReady(c http.Client, ip string, port uint16) error {
162 return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
163 if _, err = c.Get(fmt.Sprintf("http://%s:%d/ping", ip, port)); err != nil {
164 framework.Logf("Err : %v , IP : %s", err, ip)
165 return false, nil
166 }
167 return true, nil
168 })
169 }
170
171 func SetupHTTPE2EHelperTLSConfig(c http.Client, ip string, port uint16, tls_port uint16, body []byte) error {
172 return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
173 if _, err = c.Post(fmt.Sprintf("http://%s:%d/setup_https", ip, port), "application/json", bytes.NewBuffer(body)); err != nil {
174 framework.Logf("Err : %v , IP : %s", err, ip)
175 return false, nil
176 }
177 if _, err = c.Get(fmt.Sprintf("https://%s:%d/ping", ip, tls_port)); err != nil {
178 framework.Logf("Err : %v , IP : %s", err, ip)
179 return false, nil
180 }
181 return true, nil
182 })
183 }
184
185 func WaitHTTPE2EHelperTLSReady(c http.Client, ip string, port uint16) error {
186 return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
187 if _, err = c.Get(fmt.Sprintf("https://%s:%d/ping", ip, port)); err != nil {
188 framework.Logf("Err : %v , IP : %s", err, ip)
189 return false, nil
190 }
191 return true, nil
192 })
193 }
194