1
2
3
4
5
6
7
8
9
10
11
12
13
14 package router
15
16 import (
17 "context"
18 "fmt"
19 "strings"
20
21 "github.com/pkg/errors"
22 v1 "k8s.io/api/core/v1"
23 apierrors "k8s.io/apimachinery/pkg/api/errors"
24 "k8s.io/apimachinery/pkg/runtime"
25 ctrl "sigs.k8s.io/controller-runtime"
26 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
27 "sigs.k8s.io/controller-runtime/pkg/event"
28 "sigs.k8s.io/controller-runtime/pkg/predicate"
29 "sigs.k8s.io/controller-runtime/pkg/reconcile"
30
31 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
32 "github.com/chaos-mesh/chaos-mesh/controllers/common"
33 "github.com/chaos-mesh/chaos-mesh/controllers/twophase"
34 "github.com/chaos-mesh/chaos-mesh/pkg/events"
35 ctx "github.com/chaos-mesh/chaos-mesh/pkg/router/context"
36 end "github.com/chaos-mesh/chaos-mesh/pkg/router/endpoint"
37 )
38
39
40 type Reconciler struct {
41 Name string
42 Object runtime.Object
43 Endpoints []routeEndpoint
44 ClusterScoped bool
45 TargetNamespace string
46
47 ctx.Context
48 }
49
50
51 func (r *Reconciler) Reconcile(req ctrl.Request) (result ctrl.Result, err error) {
52 if !r.ClusterScoped && req.Namespace != r.TargetNamespace {
53
54 r.Log.Info("ignore chaos which belongs to an unexpected namespace within namespace scoped mode",
55 "chaosName", req.Name, "expectedNamespace", r.TargetNamespace, "actualNamespace", req.Namespace)
56 return ctrl.Result{}, nil
57 }
58
59 ctx := r.Context.LogWithValues("reconciler", r.Name, "resource name", req.NamespacedName)
60
61 chaos, ok := r.Object.DeepCopyObject().(v1alpha1.InnerSchedulerObject)
62 if !ok {
63 err := errors.New("object is not InnerSchedulerObject")
64 r.Log.Error(err, "object is not InnerSchedulerObject", "object", r.Object.DeepCopyObject())
65 return ctrl.Result{}, err
66 }
67
68 if err := r.Client.Get(context.Background(), req.NamespacedName, chaos); err != nil {
69 if apierrors.IsNotFound(err) {
70 r.Log.Info("chaos not found")
71 } else {
72 r.Log.Error(err, "unable to get chaos")
73 }
74 return ctrl.Result{}, nil
75 }
76
77 scheduler := chaos.GetScheduler()
78 duration, err := chaos.GetDuration()
79 if err != nil {
80 r.Log.Error(err, fmt.Sprintf("unable to get chaos[%s/%s]'s duration", chaos.GetChaos().Namespace, chaos.GetChaos().Name))
81 return ctrl.Result{}, err
82 }
83
84 var controller end.Endpoint
85 for _, end := range r.Endpoints {
86 if end.RouteFunc(chaos.(runtime.Object)) {
87 controller = end.NewEndpoint(ctx)
88 }
89 }
90 if controller == nil {
91 err := errors.Errorf("cannot route object to one of the endpoint")
92 r.Log.Error(err, "fail to route to endpoint", "object", chaos, "endpoints", r.Endpoints)
93 return ctrl.Result{}, err
94 }
95
96 var reconciler reconcile.Reconciler
97 if scheduler == nil && duration == nil {
98 reconciler = common.NewReconciler(req, controller, ctx)
99 } else if scheduler != nil {
100
101
102 reconciler = twophase.NewReconciler(req, controller, ctx)
103 } else {
104 err := errors.Errorf("both scheduler and duration should be nil or not nil")
105 r.Log.Error(err, "fail to construct reconciler", "scheduler", scheduler, "duration", duration)
106 return ctrl.Result{}, err
107 }
108
109 result, err = reconciler.Reconcile(req)
110 if err != nil {
111 if chaos.IsDeleted() || chaos.IsPaused() {
112 r.Event(chaos, v1.EventTypeWarning, events.ChaosRecoverFailed, err.Error())
113 } else {
114 r.Event(chaos, v1.EventTypeWarning, events.ChaosInjectFailed, err.Error())
115 }
116 }
117 return result, nil
118 }
119
120
121 func NewReconciler(name string, object runtime.Object, mgr ctrl.Manager, endpoints []routeEndpoint, clusterScoped bool, targetNamespace string) *Reconciler {
122 return &Reconciler{
123 Name: name,
124 Object: object,
125 Endpoints: endpoints,
126 ClusterScoped: clusterScoped,
127 TargetNamespace: targetNamespace,
128
129 Context: ctx.Context{
130 Client: mgr.GetClient(),
131 Reader: mgr.GetAPIReader(),
132 EventRecorder: mgr.GetEventRecorderFor(name + "-controller"),
133 Log: ctrl.Log.WithName("controllers").WithName(name),
134 },
135 }
136 }
137
138
139 func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
140 err := ctrl.NewControllerManagedBy(mgr).
141 For(r.Object.DeepCopyObject()).
142 WithEventFilter(predicate.Funcs{
143 UpdateFunc: func(e event.UpdateEvent) bool {
144 old, _ := e.ObjectOld.(v1alpha1.InnerObject).GetSpecAndMetaString()
145 new, _ := e.ObjectNew.(v1alpha1.InnerObject).GetSpecAndMetaString()
146
147 return old != new
148 },
149 }).
150 Complete(r)
151
152 if err != nil {
153 return err
154 }
155
156 kind, err := apiutil.GVKForObject(r.Object.DeepCopyObject(), mgr.GetScheme())
157 if err != nil {
158 return err
159 }
160
161 return ctrl.NewControllerManagedBy(mgr).
162 For(r.Object.DeepCopyObject()).
163 Named(strings.ToLower(kind.Kind) + "-scheduler-updater").
164 WithEventFilter(predicate.Funcs{
165 CreateFunc: func(_ event.CreateEvent) bool {
166 return false
167 },
168 DeleteFunc: func(_ event.DeleteEvent) bool {
169 return false
170 },
171 GenericFunc: func(_ event.GenericEvent) bool {
172 return false
173 },
174 UpdateFunc: func(e event.UpdateEvent) bool {
175 old := e.ObjectOld.(v1alpha1.InnerSchedulerObject).GetScheduler()
176 new := e.ObjectNew.(v1alpha1.InnerSchedulerObject).GetScheduler()
177
178 if (old == nil) || (new == nil) {
179 return false
180 }
181
182 return old.Cron != new.Cron
183 },
184 }).
185 Complete(&twophase.SchedulerUpdater{
186 Context: r.Context,
187 Object: r.Object,
188 })
189 }
190