...

Source file src/github.com/chaos-mesh/chaos-mesh/e2e-test/e2e/util/util.go

Documentation: github.com/chaos-mesh/chaos-mesh/e2e-test/e2e/util

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package util
    17  
    18  import (
    19  	"bytes"
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"net/http"
    24  	"time"
    25  
    26  	apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
    27  	apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/labels"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	"k8s.io/client-go/kubernetes"
    33  	apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    34  	aggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
    35  	"k8s.io/kubernetes/test/e2e/framework"
    36  	"sigs.k8s.io/controller-runtime/pkg/client"
    37  
    38  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    39  )
    40  
    41  // WaitForAPIServicesAvailable waits for apiservices to be available
    42  func WaitForAPIServicesAvailable(client aggregatorclientset.Interface, selector labels.Selector) error {
    43  	isAvailable := func(status apiregistrationv1.APIServiceStatus) bool {
    44  		if status.Conditions == nil {
    45  			return false
    46  		}
    47  		for _, condition := range status.Conditions {
    48  			if condition.Type == apiregistrationv1.Available {
    49  				return condition.Status == apiregistrationv1.ConditionTrue
    50  			}
    51  		}
    52  		return false
    53  	}
    54  	return wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) {
    55  		apiServiceList, err := client.ApiregistrationV1().APIServices().List(
    56  			context.TODO(),
    57  			metav1.ListOptions{
    58  				LabelSelector: selector.String(),
    59  			})
    60  		if err != nil {
    61  			return false, err
    62  		}
    63  		for _, apiService := range apiServiceList.Items {
    64  			if !isAvailable(apiService.Status) {
    65  				framework.Logf("APIService %q is not available yet", apiService.Name)
    66  				return false, nil
    67  			}
    68  		}
    69  		for _, apiService := range apiServiceList.Items {
    70  			framework.Logf("APIService %q is available", apiService.Name)
    71  		}
    72  		return true, nil
    73  	})
    74  }
    75  
    76  // WaitForCRDsEstablished waits for all CRDs to be established
    77  func WaitForCRDsEstablished(client apiextensionsclientset.Interface, selector labels.Selector) error {
    78  	isEstablished := func(status apiextensionsv1beta1.CustomResourceDefinitionStatus) bool {
    79  		if status.Conditions == nil {
    80  			return false
    81  		}
    82  		for _, condition := range status.Conditions {
    83  			if condition.Type == apiextensionsv1beta1.Established {
    84  				return condition.Status == apiextensionsv1beta1.ConditionTrue
    85  			}
    86  		}
    87  		return false
    88  	}
    89  	return wait.PollImmediate(5*time.Second, 3*time.Minute, func() (bool, error) {
    90  		crdList, err := client.ApiextensionsV1beta1().CustomResourceDefinitions().List(
    91  			context.TODO(),
    92  			metav1.ListOptions{
    93  				LabelSelector: selector.String(),
    94  			})
    95  		if err != nil {
    96  			return false, err
    97  		}
    98  		for _, crd := range crdList.Items {
    99  			if !isEstablished(crd.Status) {
   100  				framework.Logf("CRD %q is not established yet", crd.Name)
   101  				return false, nil
   102  			}
   103  		}
   104  		for _, crd := range crdList.Items {
   105  			framework.Logf("CRD %q is established", crd.Name)
   106  		}
   107  		return true, nil
   108  	})
   109  }
   110  
   111  // WaitDeploymentReady waits for all pods which controlled by deployment to be ready.
   112  func WaitDeploymentReady(name, namespace string, cli kubernetes.Interface) error {
   113  	return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
   114  		d, err := cli.AppsV1().Deployments(namespace).Get(
   115  			context.TODO(),
   116  			name,
   117  			metav1.GetOptions{},
   118  		)
   119  		if err != nil {
   120  			return false, nil
   121  		}
   122  		if d.Status.AvailableReplicas != *d.Spec.Replicas {
   123  			return false, nil
   124  		}
   125  		if d.Status.UpdatedReplicas != *d.Spec.Replicas {
   126  			return false, nil
   127  		}
   128  		return true, nil
   129  	})
   130  }
   131  
   132  func PauseChaos(ctx context.Context, cli client.Client, chaos client.Object) error {
   133  	var mergePatch []byte
   134  	mergePatch, _ = json.Marshal(map[string]interface{}{
   135  		"metadata": map[string]interface{}{
   136  			"annotations": map[string]string{v1alpha1.PauseAnnotationKey: "true"},
   137  		},
   138  	})
   139  	return cli.Patch(ctx, chaos, client.RawPatch(types.MergePatchType, mergePatch))
   140  }
   141  
   142  func UnPauseChaos(ctx context.Context, cli client.Client, chaos client.Object) error {
   143  	var mergePatch []byte
   144  	mergePatch, _ = json.Marshal(map[string]interface{}{
   145  		"metadata": map[string]interface{}{
   146  			"annotations": map[string]string{v1alpha1.PauseAnnotationKey: "false"},
   147  		},
   148  	})
   149  	return cli.Patch(ctx, chaos, client.RawPatch(types.MergePatchType, mergePatch))
   150  }
   151  
   152  func WaitE2EHelperReady(c http.Client, port uint16) error {
   153  	return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
   154  		if _, err = c.Get(fmt.Sprintf("http://localhost:%d/ping", port)); err != nil {
   155  			return false, nil
   156  		}
   157  		return true, nil
   158  	})
   159  }
   160  
   161  func WaitHTTPE2EHelperReady(c http.Client, ip string, port uint16) error {
   162  	return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
   163  		if _, err = c.Get(fmt.Sprintf("http://%s:%d/ping", ip, port)); err != nil {
   164  			framework.Logf("Err : %v , IP : %s", err, ip)
   165  			return false, nil
   166  		}
   167  		return true, nil
   168  	})
   169  }
   170  
   171  func SetupHTTPE2EHelperTLSConfig(c http.Client, ip string, port uint16, tls_port uint16, body []byte) error {
   172  	return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
   173  		if _, err = c.Post(fmt.Sprintf("http://%s:%d/setup_https", ip, port), "application/json", bytes.NewBuffer(body)); err != nil {
   174  			framework.Logf("Err : %v , IP : %s", err, ip)
   175  			return false, nil
   176  		}
   177  		if _, err = c.Get(fmt.Sprintf("https://%s:%d/ping", ip, tls_port)); err != nil {
   178  			framework.Logf("Err : %v , IP : %s", err, ip)
   179  			return false, nil
   180  		}
   181  		return true, nil
   182  	})
   183  }
   184  
   185  func WaitHTTPE2EHelperTLSReady(c http.Client, ip string, port uint16) error {
   186  	return wait.Poll(2*time.Second, 5*time.Minute, func() (done bool, err error) {
   187  		if _, err = c.Get(fmt.Sprintf("https://%s:%d/ping", ip, port)); err != nil {
   188  			framework.Logf("Err : %v , IP : %s", err, ip)
   189  			return false, nil
   190  		}
   191  		return true, nil
   192  	})
   193  }
   194