...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package statuscheck
17
18 import (
19 "sync"
20 "time"
21
22 "github.com/go-logr/logr"
23 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24 "k8s.io/apimachinery/pkg/types"
25
26 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
27 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
28 )
29
30 type Executor interface {
31
32
33
34
35
36 Do() (bool, string, error)
37
38 Type() string
39 }
40
41 type worker struct {
42 logger logr.Logger
43 eventRecorder recorder.ChaosRecorder
44
45
46 stopCh chan struct{}
47 once sync.Once
48
49 manager *manager
50
51 statusCheck v1alpha1.StatusCheck
52 executor Executor
53
54 lastResult bool
55 sameResultCount int
56 }
57
58 func newWorker(logger logr.Logger, eventRecorder recorder.ChaosRecorder,
59 manager *manager, statusCheck v1alpha1.StatusCheck, executor Executor) *worker {
60 return &worker{
61 logger: logger,
62 eventRecorder: eventRecorder,
63 manager: manager,
64 statusCheck: statusCheck,
65 executor: executor,
66 stopCh: make(chan struct{}, 1),
67 }
68 }
69
70
71 func (w *worker) run() {
72 w.logger.V(1).Info("worker start")
73 interval := time.Duration(w.statusCheck.Spec.IntervalSeconds) * time.Second
74 ticker := time.NewTicker(interval)
75 defer func() {
76 w.logger.V(1).Info("worker stop")
77 ticker.Stop()
78 key := types.NamespacedName{Namespace: w.statusCheck.Namespace, Name: w.statusCheck.Name}
79
80 w.manager.workers.delete(key)
81 }()
82
83 for {
84 select {
85 case <-ticker.C:
86 if !w.execute() {
87 return
88 }
89 case <-w.stopCh:
90 return
91 }
92 }
93 }
94
95
96 func (w *worker) stop() {
97 w.once.Do(func() {
98 close(w.stopCh)
99 })
100 }
101
102
103
104 func (w *worker) execute() bool {
105 startTime := time.Now()
106 result, output, err := w.executor.Do()
107 if err != nil {
108
109 w.logger.Error(err, "executor internal error")
110 return true
111 }
112
113 if w.lastResult == result {
114 w.sameResultCount++
115 } else {
116 w.lastResult = result
117 w.sameResultCount = 1
118 }
119
120 key := types.NamespacedName{Namespace: w.statusCheck.Namespace, Name: w.statusCheck.Name}
121 if result {
122 w.logger.V(1).Info("status check execution succeed", "msg", output)
123 w.eventRecorder.Event(&w.statusCheck, recorder.StatusCheckExecutionSucceed{ExecutorType: w.executor.Type()})
124 w.manager.results.append(key, v1alpha1.StatusCheckRecord{
125 StartTime: &metav1.Time{Time: startTime},
126 Outcome: v1alpha1.StatusCheckOutcomeSuccess,
127 })
128
129
130
131
132
133 if w.statusCheck.Spec.Mode == v1alpha1.StatusCheckSynchronous &&
134 w.sameResultCount >= w.statusCheck.Spec.SuccessThreshold {
135 w.logger.Info("exceed the success threshold")
136
137
138 return false
139 }
140 } else {
141 w.logger.Info("status check execution failed", "msg", output)
142 w.eventRecorder.Event(&w.statusCheck, recorder.StatusCheckExecutionFailed{ExecutorType: w.executor.Type(), Msg: output})
143 w.manager.results.append(key, v1alpha1.StatusCheckRecord{
144 StartTime: &metav1.Time{Time: startTime},
145 Outcome: v1alpha1.StatusCheckOutcomeFailure,
146 })
147
148
149
150
151
152 if w.sameResultCount >= w.statusCheck.Spec.FailureThreshold {
153 w.logger.Info("exceed the failure threshold")
154
155 return false
156 }
157 }
158
159 return true
160 }
161