...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package pipeline
17
18 import (
19 "context"
20 "time"
21
22 "github.com/go-logr/logr"
23 ctrl "sigs.k8s.io/controller-runtime"
24 "sigs.k8s.io/controller-runtime/pkg/client"
25 "sigs.k8s.io/controller-runtime/pkg/reconcile"
26
27 chaosimpltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
28 "github.com/chaos-mesh/chaos-mesh/controllers/types"
29 "github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
30 "github.com/chaos-mesh/chaos-mesh/pkg/selector"
31 )
32
33 type Pipeline struct {
34 controllers []reconcile.Reconciler
35 ctx *PipelineContext
36 }
37
38 type PipelineContext struct {
39 Object *types.Object
40 Mgr ctrl.Manager
41 Client client.Client
42 client.Reader
43
44 Logger logr.Logger
45 RecorderBuilder *recorder.RecorderBuilder
46 Impl chaosimpltypes.ChaosImpl
47 Selector *selector.Selector
48 }
49
50 type PipelineStep func(ctx *PipelineContext) reconcile.Reconciler
51
52 func NewPipeline(ctx *PipelineContext) *Pipeline {
53 return &Pipeline{
54 ctx: ctx,
55 }
56 }
57
58 func (p *Pipeline) AddSteps(steps ...PipelineStep) {
59 for _, step := range steps {
60 reconciler := step(p.ctx)
61 if reconciler == nil {
62 return
63 }
64 p.controllers = append(p.controllers, reconciler)
65 }
66 }
67
68
69 func (p *Pipeline) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
70 var deadline *time.Time
71
72 for _, controller := range p.controllers {
73 ret, err := controller.Reconcile(ctx, req)
74 if err != nil {
75 return ctrl.Result{}, err
76 }
77
78 p.ctx.Logger.WithName("pipeline").Info("reconcile result", "result", ret)
79
80 if ret.Requeue || deadline != nil && deadline.Before(time.Now()) {
81 ret.Requeue = true
82 return ret, nil
83 }
84
85 if ret.RequeueAfter != 0 {
86
87
88
89
90 end := time.Now().Add(ret.RequeueAfter)
91 deadline = minTime(deadline, &end)
92 }
93 }
94
95 ret := ctrl.Result{}
96
97 if deadline != nil {
98 if deadline.Before(time.Now()) {
99 ret.Requeue = true
100 } else {
101 ret.RequeueAfter = time.Until(*deadline)
102 }
103 }
104
105 return ret, nil
106 }
107
108 func minTime(d1, d2 *time.Time) *time.Time {
109 if d1 == nil {
110 return d2
111 }
112
113 if d2 == nil {
114 return d1
115 }
116
117 if d1.Before(*d2) {
118 return d1
119 }
120
121 return d2
122 }
123