1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package remotecluster
17
18 import (
19 "context"
20 "encoding/json"
21
22 "github.com/go-logr/logr"
23 "github.com/pkg/errors"
24 "helm.sh/helm/v3/pkg/release"
25 "helm.sh/helm/v3/pkg/storage/driver"
26 corev1 "k8s.io/api/core/v1"
27 apierrors "k8s.io/apimachinery/pkg/api/errors"
28 "k8s.io/apimachinery/pkg/types"
29 "k8s.io/client-go/tools/clientcmd"
30 "k8s.io/client-go/util/retry"
31 ctrl "sigs.k8s.io/controller-runtime"
32 "sigs.k8s.io/controller-runtime/pkg/client"
33
34 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
35 "github.com/chaos-mesh/chaos-mesh/controllers/config"
36 "github.com/chaos-mesh/chaos-mesh/controllers/multicluster/clusterregistry"
37 "github.com/chaos-mesh/chaos-mesh/pkg/helm"
38 )
39
40 const remoteClusterControllerFinalizer = "chaos-mesh/remotecluster-controllers"
41 const chaosMeshReleaseName = "chaos-mesh"
42
43 type Reconciler struct {
44 Log logr.Logger
45 registry *clusterregistry.RemoteClusterRegistry
46
47 client.Client
48 }
49
50 func (r *Reconciler) getRestConfig(ctx context.Context, secretRef v1alpha1.RemoteClusterSecretRef) (clientcmd.ClientConfig, error) {
51 var secret corev1.Secret
52 err := r.Client.Get(ctx, types.NamespacedName{
53 Namespace: secretRef.Namespace,
54 Name: secretRef.Name,
55 }, &secret)
56 if err != nil {
57 return nil, errors.Wrapf(err, "get secret %s/%s", secretRef.Namespace, secretRef.Name)
58 }
59
60 kubeconfig := secret.Data[secretRef.Key]
61
62 config, err := clientcmd.Load(kubeconfig)
63 if err != nil {
64 return nil, errors.Wrap(err, "load kubeconfig")
65 }
66
67 return clientcmd.NewDefaultClientConfig(*config, nil), nil
68 }
69
70 func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
71 var obj v1alpha1.RemoteCluster
72 err := r.Client.Get(ctx, req.NamespacedName, &obj)
73 if err != nil {
74 if apierrors.IsNotFound(err) {
75 r.Log.Info("remote cluster not found", "namespace", req.Namespace, "name", req.Name)
76 } else {
77
78 r.Log.Error(err, "unable to get remote cluster", "namespace", req.Namespace, "name", req.Name)
79 }
80 return ctrl.Result{}, nil
81 }
82
83 r.Log.Info("remote cluster", "Generation:", obj.ObjectMeta.Generation, "ObservedGeneration:", obj.Status.ObservedGeneration)
84
85 if obj.ObjectMeta.Generation <= obj.Status.ObservedGeneration {
86 r.Log.Info("the target remote cluster has been up to date", "remote cluster", obj.Namespace+"/"+obj.Name)
87 return ctrl.Result{}, nil
88 }
89
90 clientConfig, err := r.getRestConfig(ctx, obj.Spec.KubeConfig.SecretRef)
91 if err != nil {
92 r.Log.Error(err, "fail to get clientConfig from secret")
93 return ctrl.Result{Requeue: true}, nil
94 }
95
96
97 if !obj.DeletionTimestamp.IsZero() {
98 err := r.registry.Stop(ctx, obj.Name)
99 if err != nil {
100 if !errors.Is(err, clusterregistry.ErrNotExist) {
101 r.Log.Error(err, "fail to stop cluster")
102 return ctrl.Result{Requeue: true}, nil
103 }
104 }
105
106 err = r.uninstallHelmRelease(ctx, &obj, clientConfig)
107 if err != nil {
108 r.Log.Error(err, "fail to uninstall helm release")
109 return ctrl.Result{Requeue: true}, nil
110 }
111
112 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
113 var newObj v1alpha1.RemoteCluster
114 r.Client.Get(ctx, req.NamespacedName, &newObj)
115
116 newObj.Finalizers = []string{}
117 setRemoteClusterCondition(&newObj, v1alpha1.RemoteClusterConditionInstalled, corev1.ConditionFalse, "")
118
119 return r.Client.Update(ctx, &newObj)
120 })
121 if err != nil {
122 r.Log.Error(err, "fail to update finalizer", "name", obj.Name)
123 return ctrl.Result{Requeue: true}, nil
124 }
125 return ctrl.Result{}, nil
126 }
127
128 currentVersion, err := r.ensureHelmRelease(ctx, &obj, clientConfig)
129 if err != nil {
130 r.Log.Error(err, "fail to list or install remote helm release")
131 return ctrl.Result{Requeue: true}, nil
132 }
133
134 err = r.ensureClusterControllerManager(ctx, &obj, clientConfig)
135 if err != nil {
136 r.Log.Error(err, "fail to boot remote cluster controller manager")
137 return ctrl.Result{Requeue: true}, nil
138 }
139 obj.Finalizers = []string{remoteClusterControllerFinalizer}
140
141 if err != nil {
142 r.Log.Error(err, "fail to operate the helm release in remote cluster")
143 return ctrl.Result{Requeue: true}, nil
144 }
145
146 observedGeneration := obj.ObjectMeta.Generation
147 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
148 var newObj v1alpha1.RemoteCluster
149 r.Client.Get(ctx, req.NamespacedName, &newObj)
150
151 newObj.Finalizers = obj.Finalizers
152 setRemoteClusterCondition(&newObj, v1alpha1.RemoteClusterConditionInstalled, corev1.ConditionTrue, "")
153
154 if err = r.Client.Update(ctx, &newObj); err != nil {
155 return err
156 }
157 newObj.Status.CurrentVersion = currentVersion
158 newObj.Status.ObservedGeneration = observedGeneration
159 err = r.Client.Status().Update(ctx, &newObj)
160 return err
161 })
162 if err != nil {
163 r.Log.Error(err, "fail to update finalizer", "name", obj.Name)
164 return ctrl.Result{Requeue: true}, nil
165 }
166
167 return ctrl.Result{}, nil
168 }
169
170 func (r *Reconciler) ensureClusterControllerManager(ctx context.Context, obj *v1alpha1.RemoteCluster, config clientcmd.ClientConfig) error {
171 restConfig, err := config.ClientConfig()
172 if err != nil {
173 return errors.Wrap(err, "get rest config from client config")
174 }
175
176 err = r.registry.Spawn(obj.Name, restConfig)
177 if err != nil {
178 if !errors.Is(err, clusterregistry.ErrAlreadyExist) {
179 return err
180 }
181 }
182
183 return nil
184 }
185
186 func (r *Reconciler) getHelmClient(ctx context.Context, clientConfig clientcmd.ClientConfig) (*helm.HelmClient, error) {
187 restClientGetter := helm.NewRESTClientGetter(clientConfig)
188
189 helmClient, err := helm.NewHelmClient(restClientGetter, r.Log)
190 if err != nil {
191 return nil, err
192 }
193
194 return helmClient, nil
195 }
196
197 func (r *Reconciler) ensureHelmRelease(ctx context.Context, obj *v1alpha1.RemoteCluster, clientConfig clientcmd.ClientConfig) (string, error) {
198 helmClient, err := r.getHelmClient(ctx, clientConfig)
199 if err != nil {
200 return "", err
201 }
202 _, releaseErr := helmClient.GetRelease(obj.Spec.Namespace, chaosMeshReleaseName)
203 if releaseErr != nil && !errors.Is(releaseErr, driver.ErrReleaseNotFound) {
204 return "", releaseErr
205 }
206 chart, err := helm.FetchChaosMeshChart(ctx, obj.Spec.Version, config.ControllerCfg.LocalHelmChartPath)
207 if err != nil {
208 return "", err
209 }
210
211 values := make(map[string]interface{})
212 if obj.Spec.ConfigOverride != nil {
213 err = json.Unmarshal(obj.Spec.ConfigOverride, &values)
214 if err != nil {
215 return "", err
216 }
217 }
218 var release *release.Release
219 if errors.Is(releaseErr, driver.ErrReleaseNotFound) {
220 release, err = helmClient.InstallRelease(obj.Spec.Namespace, chaosMeshReleaseName, chart, values)
221 if err != nil {
222 return "", err
223 }
224 } else {
225 release, err = helmClient.UpgradeRelease(obj.Spec.Namespace, chaosMeshReleaseName, chart, values)
226 if err != nil {
227 return "", err
228 }
229 }
230
231 return release.Chart.AppVersion(), nil
232 }
233
234 func (r *Reconciler) uninstallHelmRelease(ctx context.Context, obj *v1alpha1.RemoteCluster, clientConfig clientcmd.ClientConfig) error {
235 helmClient, err := r.getHelmClient(ctx, clientConfig)
236 if err != nil {
237 return err
238 }
239
240 _, err = helmClient.GetRelease(obj.Spec.Namespace, chaosMeshReleaseName)
241 if err != nil {
242 if errors.Is(err, driver.ErrReleaseNotFound) {
243 return nil
244 }
245
246 return err
247 }
248
249
250 _, err = helmClient.UninstallRelease(obj.Spec.Namespace, chaosMeshReleaseName)
251 if err != nil {
252 if errors.Is(err, driver.ErrReleaseNotFound) {
253 return nil
254 }
255
256 return err
257 }
258
259 return nil
260 }
261