1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package statuscheck
17
18 import (
19 "sync"
20
21 "github.com/go-logr/logr"
22 "github.com/pkg/errors"
23 "k8s.io/apimachinery/pkg/types"
24
25 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
26 "github.com/chaos-mesh/chaos-mesh/controllers/statuscheck/http"
27 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
28 )
29
30 type Manager interface {
31
32 Add(statusCheck v1alpha1.StatusCheck) error
33
34 Get(statusCheck v1alpha1.StatusCheck) (Result, bool)
35
36
37
38 Delete(key types.NamespacedName)
39
40
41 Complete(statusCheck v1alpha1.StatusCheck)
42 }
43
44 type manager struct {
45 logger logr.Logger
46 eventRecorder recorder.ChaosRecorder
47
48 workers workerCache
49 results resultCache
50 newExecutor newExecutorFunc
51 }
52
53 type newExecutorFunc func(logger logr.Logger, statusCheck v1alpha1.StatusCheck) (Executor, error)
54
55 func NewManager(logger logr.Logger, eventRecorder recorder.ChaosRecorder, newExecutorFunc newExecutorFunc) Manager {
56 return &manager{
57 logger: logger,
58 eventRecorder: eventRecorder,
59 workers: workerCache{workers: sync.Map{}},
60 results: resultCache{results: make(map[types.NamespacedName]Result)},
61 newExecutor: newExecutorFunc,
62 }
63 }
64
65 func (m *manager) Add(statusCheck v1alpha1.StatusCheck) error {
66 key := types.NamespacedName{Namespace: statusCheck.Namespace, Name: statusCheck.Name}
67 if _, ok := m.results.get(key); ok {
68 return nil
69 }
70 m.results.init(key, statusCheck.Status.Records, statusCheck.Status.Count, uint(statusCheck.Spec.RecordsHistoryLimit))
71
72 if statusCheck.IsCompleted() {
73
74 return errors.New("status check is completed")
75 }
76
77 executor, err := m.newExecutor(m.logger, statusCheck)
78 if err != nil {
79 return errors.Wrap(err, "new executor")
80 }
81 worker := newWorker(m.logger.WithName("worker").WithValues("statuscheck", key), m.eventRecorder, m, statusCheck, executor)
82 m.workers.add(key, worker)
83 return nil
84 }
85
86 func (m *manager) Get(statusCheck v1alpha1.StatusCheck) (Result, bool) {
87 key := types.NamespacedName{Namespace: statusCheck.Namespace, Name: statusCheck.Name}
88 result, ok := m.results.get(key)
89 if !ok {
90 return Result{}, false
91 }
92 return result, true
93 }
94
95 func (m *manager) Delete(key types.NamespacedName) {
96 m.results.delete(key)
97 m.workers.delete(key)
98 }
99
100 func (m *manager) Complete(statusCheck v1alpha1.StatusCheck) {
101 key := types.NamespacedName{Namespace: statusCheck.Namespace, Name: statusCheck.Name}
102 m.workers.delete(key)
103 }
104
105
106 type workerCache struct {
107
108 workers sync.Map
109 }
110
111 func (c *workerCache) add(key types.NamespacedName, worker *worker) {
112 _, ok := c.workers.LoadOrStore(key, worker)
113 if !ok {
114 go worker.run()
115 }
116 }
117
118 func (c *workerCache) delete(key types.NamespacedName) {
119 obj, ok := c.workers.LoadAndDelete(key)
120 if !ok {
121 return
122 }
123 worker := obj.(*worker)
124 worker.stop()
125 }
126
127
128 type resultCache struct {
129
130 results map[types.NamespacedName]Result
131 lock sync.RWMutex
132 }
133
134 type Result struct {
135 Records []v1alpha1.StatusCheckRecord
136 Count int64
137
138 recordsHistoryLimit uint
139 }
140
141
142 func (c *resultCache) init(key types.NamespacedName, obj []v1alpha1.StatusCheckRecord, count int64, limit uint) {
143 c.lock.Lock()
144 defer c.lock.Unlock()
145
146 if _, ok := c.results[key]; ok {
147 return
148 }
149 if len(obj) == 0 {
150 obj = make([]v1alpha1.StatusCheckRecord, 0)
151 count = 0
152 }
153 c.results[key] = Result{
154 Records: limitRecords(obj, limit),
155 Count: count,
156 recordsHistoryLimit: limit,
157 }
158 }
159
160
161
162 func (c *resultCache) append(key types.NamespacedName, obj v1alpha1.StatusCheckRecord) {
163 c.lock.Lock()
164 defer c.lock.Unlock()
165
166 result := c.results[key]
167 result.Records = append(result.Records, obj)
168 result.Records = limitRecords(result.Records, result.recordsHistoryLimit)
169 result.Count++
170 c.results[key] = result
171 }
172
173 func (c *resultCache) delete(key types.NamespacedName) {
174 c.lock.Lock()
175 defer c.lock.Unlock()
176
177 delete(c.results, key)
178 }
179
180 func (c *resultCache) get(key types.NamespacedName) (Result, bool) {
181 c.lock.RLock()
182 defer c.lock.RUnlock()
183 result, ok := c.results[key]
184 return result, ok
185 }
186
187 func limitRecords(records []v1alpha1.StatusCheckRecord, limit uint) []v1alpha1.StatusCheckRecord {
188 length := len(records)
189 if length < int(limit) {
190 return records
191 }
192 return records[length-int(limit):]
193 }
194
195 func newExecutor(logger logr.Logger, statusCheck v1alpha1.StatusCheck) (Executor, error) {
196 var executor Executor
197 switch statusCheck.Spec.Type {
198 case v1alpha1.TypeHTTP:
199 if statusCheck.Spec.EmbedStatusCheck == nil || statusCheck.Spec.HTTPStatusCheck == nil {
200
201 return nil, errors.New("illegal status check, http should not be empty")
202 }
203 executor = http.NewExecutor(
204 logger.WithName("http-executor").WithValues("url", statusCheck.Spec.HTTPStatusCheck.RequestUrl),
205 statusCheck.Spec.TimeoutSeconds, *statusCheck.Spec.HTTPStatusCheck)
206 default:
207 return nil, errors.Errorf("unsupported type '%s'", statusCheck.Spec.Type)
208 }
209 return executor, nil
210 }
211