...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/common/pipeline/pipeline.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/common/pipeline

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package pipeline
    17  
    18  import (
    19  	"context"
    20  	"time"
    21  
    22  	"github.com/go-logr/logr"
    23  	ctrl "sigs.k8s.io/controller-runtime"
    24  	"sigs.k8s.io/controller-runtime/pkg/client"
    25  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    26  
    27  	chaosimpltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    28  	"github.com/chaos-mesh/chaos-mesh/controllers/types"
    29  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    30  	"github.com/chaos-mesh/chaos-mesh/pkg/selector"
    31  )
    32  
    33  type Pipeline struct {
    34  	controllers []reconcile.Reconciler
    35  	ctx         *PipelineContext
    36  }
    37  
    38  type PipelineContext struct {
    39  	Object *types.Object
    40  	Mgr    ctrl.Manager
    41  	Client client.Client
    42  	client.Reader
    43  
    44  	Logger          logr.Logger
    45  	RecorderBuilder *recorder.RecorderBuilder
    46  	Impl            chaosimpltypes.ChaosImpl
    47  	Selector        *selector.Selector
    48  }
    49  
    50  type PipelineStep func(ctx *PipelineContext) reconcile.Reconciler
    51  
    52  func NewPipeline(ctx *PipelineContext) *Pipeline {
    53  	return &Pipeline{
    54  		ctx: ctx,
    55  	}
    56  }
    57  
    58  func (p *Pipeline) AddSteps(steps ...PipelineStep) {
    59  	for _, step := range steps {
    60  		reconciler := step(p.ctx)
    61  		if reconciler == nil {
    62  			return
    63  		}
    64  		p.controllers = append(p.controllers, reconciler)
    65  	}
    66  }
    67  
    68  // Reconcile the steps
    69  func (p *Pipeline) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    70  	var deadline *time.Time
    71  
    72  	for _, controller := range p.controllers {
    73  		ret, err := controller.Reconcile(ctx, req)
    74  		if err != nil {
    75  			return ctrl.Result{}, err
    76  		}
    77  
    78  		p.ctx.Logger.WithName("pipeline").Info("reconcile result", "result", ret)
    79  
    80  		if ret.Requeue || deadline != nil && deadline.Before(time.Now()) {
    81  			ret.Requeue = true
    82  			return ret, nil
    83  		}
    84  
    85  		if ret.RequeueAfter != 0 {
    86  			// The controller wants us to re-enqueue after a certain amount of time,
    87  			// and the desiredphase controller will always return a RequeueAfter before the experiment is finished.
    88  			//
    89  			// So, DO NOT re-queue immediately.
    90  			end := time.Now().Add(ret.RequeueAfter)
    91  			deadline = minTime(deadline, &end)
    92  		}
    93  	}
    94  
    95  	ret := ctrl.Result{}
    96  
    97  	if deadline != nil {
    98  		if deadline.Before(time.Now()) {
    99  			ret.Requeue = true
   100  		} else {
   101  			ret.RequeueAfter = time.Until(*deadline)
   102  		}
   103  	}
   104  
   105  	return ret, nil
   106  }
   107  
   108  func minTime(d1, d2 *time.Time) *time.Time {
   109  	if d1 == nil {
   110  		return d2
   111  	}
   112  
   113  	if d2 == nil {
   114  		return d1
   115  	}
   116  
   117  	if d1.Before(*d2) {
   118  		return d1
   119  	}
   120  
   121  	return d2
   122  }
   123