1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package test
17
18 import (
19 "context"
20 "fmt"
21 "os/exec"
22 "strings"
23 "time"
24
25 "github.com/pkg/errors"
26 corev1 "k8s.io/api/core/v1"
27 apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/labels"
30 "k8s.io/apimachinery/pkg/util/wait"
31 "k8s.io/client-go/kubernetes"
32 "k8s.io/klog/v2"
33 aggregatorclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
34 "k8s.io/kubernetes/test/e2e/framework"
35
36 "github.com/chaos-mesh/chaos-mesh/e2e-test/e2e/e2econst"
37 e2eutil "github.com/chaos-mesh/chaos-mesh/e2e-test/e2e/util"
38 )
39
40 const (
41 operatorChartName = "chaos-mesh"
42 )
43
44
45 type OperatorAction interface {
46 CleanCRDOrDie()
47 DeployOperator(config *OperatorConfig) error
48 UpgradeOperator(config *OperatorConfig) error
49 RestartDaemon(info *OperatorConfig) error
50 RestartControllerManager(info *OperatorConfig) error
51 InstallCRD(config *OperatorConfig) error
52 }
53
54
55 func BuildOperatorActionAndCfg(cfg *Config) (OperatorAction, *OperatorConfig, error) {
56
57 config, err := framework.LoadConfig()
58 if err != nil {
59 return nil, nil, errors.Wrap(err, "load config")
60 }
61 kubeCli, err := kubernetes.NewForConfig(config)
62 if err != nil {
63 return nil, nil, errors.Wrap(err, "create kube client")
64 }
65 aggrCli, err := aggregatorclientset.NewForConfig(config)
66 if err != nil {
67 return nil, nil, errors.Wrap(err, "create aggr client")
68 }
69 apiExtCli, err := apiextensionsclientset.NewForConfig(config)
70 if err != nil {
71 return nil, nil, errors.Wrap(err, "create apiExt clientset")
72 }
73 oa := NewOperatorAction(kubeCli, aggrCli, apiExtCli, cfg)
74 ocfg := NewDefaultOperatorConfig()
75 ocfg.Manager.ImageRegistry = cfg.ManagerImageRegistry
76 ocfg.Manager.ImageRepository = cfg.ManagerImage
77 ocfg.Manager.ImageTag = cfg.ManagerTag
78 ocfg.Daemon.ImageRegistry = cfg.DaemonImageRegistry
79 ocfg.Daemon.ImageRepository = cfg.DaemonImage
80 ocfg.Daemon.ImageTag = cfg.DaemonTag
81 ocfg.DNSImage = cfg.ChaosCoreDNSImage
82 ocfg.EnableDashboard = cfg.EnableDashboard
83
84 return oa, &ocfg, nil
85 }
86
87
88 func NewOperatorAction(
89 kubeCli kubernetes.Interface,
90 aggrCli aggregatorclientset.Interface,
91 apiExtCli apiextensionsclientset.Interface,
92 cfg *Config) OperatorAction {
93
94 oa := &operatorAction{
95 kubeCli: kubeCli,
96 aggrCli: aggrCli,
97 apiExtCli: apiExtCli,
98 cfg: cfg,
99 }
100 return oa
101 }
102
103 func (oa *operatorAction) DeployOperator(info *OperatorConfig) error {
104 klog.Infof("create namespace chaos-mesh")
105 cmd := fmt.Sprintf(`kubectl create ns %s`, e2econst.ChaosMeshNamespace)
106 klog.Infof(cmd)
107 output, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
108 if err != nil {
109 return errors.Errorf("failed to create namespace chaos-mesh: %v %s", err, string(output))
110 }
111 return oa.UpgradeOperator(info)
112 }
113
114 func (oa *operatorAction) UpgradeOperator(info *OperatorConfig) error {
115 klog.Infof("deploying chaos-mesh:%v", info.ReleaseName)
116 cmd := fmt.Sprintf(`helm upgrade --install %s %s --namespace %s --set %s --skip-crds`,
117 info.ReleaseName,
118 oa.operatorChartPath(info.Tag),
119 info.Namespace,
120 info.operatorHelmSetValue())
121 klog.Info(cmd)
122 res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
123 if err != nil {
124 return errors.Errorf("failed to deploy operator: %v, %s", err, string(res))
125 }
126 klog.Infof("start to waiting chaos-mesh ready")
127 err = wait.Poll(5*time.Second, 5*time.Minute, func() (done bool, err error) {
128 ls := &metav1.LabelSelector{
129 MatchLabels: map[string]string{
130 "app.kubernetes.io/instance": "chaos-mesh",
131 },
132 }
133 l, err := metav1.LabelSelectorAsSelector(ls)
134 if err != nil {
135 klog.Errorf("failed to get selector, err:%v", err)
136 return false, nil
137 }
138 pods, err := oa.kubeCli.CoreV1().Pods(info.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: l.String()})
139 if err != nil {
140 klog.Errorf("failed to get chaos-mesh pods, err:%v", err)
141 return false, nil
142 }
143 for _, pod := range pods.Items {
144 if pod.Status.Phase != corev1.PodRunning {
145 return false, nil
146 }
147 }
148 return true, nil
149 })
150 if err != nil {
151 return err
152 }
153 return e2eutil.WaitForAPIServicesAvailable(oa.aggrCli, labels.Everything())
154 }
155
156 func (oa *operatorAction) InstallCRD(info *OperatorConfig) error {
157 klog.Infof("deploying chaos-mesh crd :%v", info.ReleaseName)
158 oa.runKubectlOrDie("create", "-f", oa.manifestPath("e2e/crd.yaml"), "--validate=false")
159
160 e2eutil.WaitForCRDsEstablished(oa.apiExtCli, labels.Everything())
161
162 klog.Infof("force sync kubectl cache")
163 cmdArgs := []string{"sh", "-c", "rm -rf ~/.kube/cache ~/.kube/http-cache"}
164 _, err := exec.Command(cmdArgs[0], cmdArgs[1:]...).CombinedOutput()
165 if err != nil {
166 klog.Fatalf("Failed to run '%s': %v", strings.Join(cmdArgs, " "), err)
167 }
168 return nil
169 }
170
171 func (oa *operatorAction) RestartDaemon(info *OperatorConfig) error {
172 return oa.restartComponent(info, "chaos-daemon-")
173 }
174
175 func (oa *operatorAction) RestartControllerManager(info *OperatorConfig) error {
176 return oa.restartComponent(info, "chaos-controller-manager-")
177 }
178
179 func (oa *operatorAction) restartComponent(info *OperatorConfig, prefix string) error {
180 klog.Infof("klling component %v", prefix)
181 ls := &metav1.LabelSelector{
182 MatchLabels: map[string]string{
183 "app.kubernetes.io/instance": "chaos-mesh",
184 },
185 }
186 l, err := metav1.LabelSelectorAsSelector(ls)
187 if err != nil {
188 return errors.Wrap(err, "get selector")
189 }
190
191 pods, err := oa.kubeCli.CoreV1().Pods(info.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: l.String()})
192 if err != nil {
193 return errors.Wrap(err, "select pods")
194 }
195
196 for _, pod := range pods.Items {
197 if strings.HasPrefix(pod.Name, prefix) {
198 err = oa.kubeCli.CoreV1().Pods(info.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
199 if err != nil {
200 return errors.Wrapf(err, "delete pod(%s)", pod.Name)
201 }
202 }
203 }
204
205 klog.Infof("start to waiting chaos-mesh ready")
206 err = wait.Poll(5*time.Second, 5*time.Minute, func() (done bool, err error) {
207 pods, err := oa.kubeCli.CoreV1().Pods(info.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: l.String()})
208 if err != nil {
209 klog.Errorf("get chaos-mesh pods: %v", err)
210 return false, nil
211 }
212 for _, pod := range pods.Items {
213 if pod.Status.Phase != corev1.PodRunning {
214 return false, nil
215 }
216 }
217 return true, nil
218 })
219 if err != nil {
220 return err
221 }
222 return e2eutil.WaitForAPIServicesAvailable(oa.aggrCli, labels.Everything())
223 }
224
225 func (oa *operatorAction) CleanCRDOrDie() {
226 oa.runKubectlOrDie("delete", "crds", "--all")
227 }
228