1
2
3
4
5
6
7
8
9
10
11
12
13
14 package util
15
16 import (
17 "context"
18 "encoding/json"
19 "fmt"
20 "net/http"
21 "time"
22
23 apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
24 apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/labels"
27 "k8s.io/apimachinery/pkg/runtime"
28 "k8s.io/apimachinery/pkg/types"
29 "k8s.io/apimachinery/pkg/util/wait"
30 "k8s.io/client-go/kubernetes"
31 apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
32 aggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
33 "k8s.io/kubernetes/test/e2e/framework"
34 "sigs.k8s.io/controller-runtime/pkg/client"
35
36 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
37 )
38
39
40 func WaitForAPIServicesAvailable(client aggregatorclientset.Interface, selector labels.Selector) error {
41 isAvailable := func(status apiregistrationv1.APIServiceStatus) bool {
42 if status.Conditions == nil {
43 return false
44 }
45 for _, condition := range status.Conditions {
46 if condition.Type == apiregistrationv1.Available {
47 return condition.Status == apiregistrationv1.ConditionTrue
48 }
49 }
50 return false
51 }
52 return wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) {
53 apiServiceList, err := client.ApiregistrationV1().APIServices().List(metav1.ListOptions{
54 LabelSelector: selector.String(),
55 })
56 if err != nil {
57 return false, err
58 }
59 for _, apiService := range apiServiceList.Items {
60 if !isAvailable(apiService.Status) {
61 framework.Logf("APIService %q is not available yet", apiService.Name)
62 return false, nil
63 }
64 }
65 for _, apiService := range apiServiceList.Items {
66 framework.Logf("APIService %q is available", apiService.Name)
67 }
68 return true, nil
69 })
70 }
71
72
73 func WaitForCRDsEstablished(client apiextensionsclientset.Interface, selector labels.Selector) error {
74 isEstablished := func(status apiextensionsv1beta1.CustomResourceDefinitionStatus) bool {
75 if status.Conditions == nil {
76 return false
77 }
78 for _, condition := range status.Conditions {
79 if condition.Type == apiextensionsv1beta1.Established {
80 return condition.Status == apiextensionsv1beta1.ConditionTrue
81 }
82 }
83 return false
84 }
85 return wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) {
86 crdList, err := client.ApiextensionsV1beta1().CustomResourceDefinitions().List(metav1.ListOptions{
87 LabelSelector: selector.String(),
88 })
89 if err != nil {
90 return false, err
91 }
92 for _, crd := range crdList.Items {
93 if !isEstablished(crd.Status) {
94 framework.Logf("CRD %q is not established yet", crd.Name)
95 return false, nil
96 }
97 }
98 for _, crd := range crdList.Items {
99 framework.Logf("CRD %q is established", crd.Name)
100 }
101 return true, nil
102 })
103 }
104
105
106 func WaitDeploymentReady(name, namespace string, cli kubernetes.Interface) error {
107 return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
108 d, err := cli.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{})
109 if err != nil {
110 return false, nil
111 }
112 if d.Status.AvailableReplicas != *d.Spec.Replicas {
113 return false, nil
114 }
115 if d.Status.UpdatedReplicas != *d.Spec.Replicas {
116 return false, nil
117 }
118 return true, nil
119 })
120 }
121
122 func PauseChaos(ctx context.Context, cli client.Client, chaos runtime.Object) error {
123 var mergePatch []byte
124 mergePatch, _ = json.Marshal(map[string]interface{}{
125 "metadata": map[string]interface{}{
126 "annotations": map[string]string{v1alpha1.PauseAnnotationKey: "true"},
127 },
128 })
129 return cli.Patch(ctx, chaos, client.ConstantPatch(types.MergePatchType, mergePatch))
130 }
131
132 func UnPauseChaos(ctx context.Context, cli client.Client, chaos runtime.Object) error {
133 var mergePatch []byte
134 mergePatch, _ = json.Marshal(map[string]interface{}{
135 "metadata": map[string]interface{}{
136 "annotations": map[string]string{v1alpha1.PauseAnnotationKey: "false"},
137 },
138 })
139 return cli.Patch(ctx, chaos, client.ConstantPatch(types.MergePatchType, mergePatch))
140 }
141
142 func WaitE2EHelperReady(c http.Client, port uint16) error {
143 return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
144 if _, err = c.Get(fmt.Sprintf("http://localhost:%d/ping", port)); err != nil {
145 return false, nil
146 }
147 return true, nil
148 })
149 }
150