1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package util
17
18 import (
19 "context"
20 "encoding/json"
21 "fmt"
22 "net/http"
23 "time"
24
25 apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
26 apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/labels"
29 "k8s.io/apimachinery/pkg/types"
30 "k8s.io/apimachinery/pkg/util/wait"
31 "k8s.io/client-go/kubernetes"
32 apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
33 aggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
34 "k8s.io/kubernetes/test/e2e/framework"
35 "sigs.k8s.io/controller-runtime/pkg/client"
36
37 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
38 )
39
40
41 func WaitForAPIServicesAvailable(client aggregatorclientset.Interface, selector labels.Selector) error {
42 isAvailable := func(status apiregistrationv1.APIServiceStatus) bool {
43 if status.Conditions == nil {
44 return false
45 }
46 for _, condition := range status.Conditions {
47 if condition.Type == apiregistrationv1.Available {
48 return condition.Status == apiregistrationv1.ConditionTrue
49 }
50 }
51 return false
52 }
53 return wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) {
54 apiServiceList, err := client.ApiregistrationV1().APIServices().List(
55 context.TODO(),
56 metav1.ListOptions{
57 LabelSelector: selector.String(),
58 })
59 if err != nil {
60 return false, err
61 }
62 for _, apiService := range apiServiceList.Items {
63 if !isAvailable(apiService.Status) {
64 framework.Logf("APIService %q is not available yet", apiService.Name)
65 return false, nil
66 }
67 }
68 for _, apiService := range apiServiceList.Items {
69 framework.Logf("APIService %q is available", apiService.Name)
70 }
71 return true, nil
72 })
73 }
74
75
76 func WaitForCRDsEstablished(client apiextensionsclientset.Interface, selector labels.Selector) error {
77 isEstablished := func(status apiextensionsv1beta1.CustomResourceDefinitionStatus) bool {
78 if status.Conditions == nil {
79 return false
80 }
81 for _, condition := range status.Conditions {
82 if condition.Type == apiextensionsv1beta1.Established {
83 return condition.Status == apiextensionsv1beta1.ConditionTrue
84 }
85 }
86 return false
87 }
88 return wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) {
89 crdList, err := client.ApiextensionsV1beta1().CustomResourceDefinitions().List(
90 context.TODO(),
91 metav1.ListOptions{
92 LabelSelector: selector.String(),
93 })
94 if err != nil {
95 return false, err
96 }
97 for _, crd := range crdList.Items {
98 if !isEstablished(crd.Status) {
99 framework.Logf("CRD %q is not established yet", crd.Name)
100 return false, nil
101 }
102 }
103 for _, crd := range crdList.Items {
104 framework.Logf("CRD %q is established", crd.Name)
105 }
106 return true, nil
107 })
108 }
109
110
111 func WaitDeploymentReady(name, namespace string, cli kubernetes.Interface) error {
112 return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
113 d, err := cli.AppsV1().Deployments(namespace).Get(
114 context.TODO(),
115 name,
116 metav1.GetOptions{},
117 )
118 if err != nil {
119 return false, nil
120 }
121 if d.Status.AvailableReplicas != *d.Spec.Replicas {
122 return false, nil
123 }
124 if d.Status.UpdatedReplicas != *d.Spec.Replicas {
125 return false, nil
126 }
127 return true, nil
128 })
129 }
130
131 func PauseChaos(ctx context.Context, cli client.Client, chaos client.Object) error {
132 var mergePatch []byte
133 mergePatch, _ = json.Marshal(map[string]interface{}{
134 "metadata": map[string]interface{}{
135 "annotations": map[string]string{v1alpha1.PauseAnnotationKey: "true"},
136 },
137 })
138 return cli.Patch(ctx, chaos, client.RawPatch(types.MergePatchType, mergePatch))
139 }
140
141 func UnPauseChaos(ctx context.Context, cli client.Client, chaos client.Object) error {
142 var mergePatch []byte
143 mergePatch, _ = json.Marshal(map[string]interface{}{
144 "metadata": map[string]interface{}{
145 "annotations": map[string]string{v1alpha1.PauseAnnotationKey: "false"},
146 },
147 })
148 return cli.Patch(ctx, chaos, client.RawPatch(types.MergePatchType, mergePatch))
149 }
150
151 func WaitE2EHelperReady(c http.Client, port uint16) error {
152 return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
153 if _, err = c.Get(fmt.Sprintf("http://localhost:%d/ping", port)); err != nil {
154 return false, nil
155 }
156 return true, nil
157 })
158 }
159