...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/schedule/pause/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/schedule/pause

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package pause
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"reflect"
    22  	"strconv"
    23  
    24  	"github.com/go-logr/logr"
    25  	k8sError "k8s.io/apimachinery/pkg/api/errors"
    26  	k8sTypes "k8s.io/apimachinery/pkg/types"
    27  	"k8s.io/client-go/util/retry"
    28  	ctrl "sigs.k8s.io/controller-runtime"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  
    31  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/schedule/utils"
    34  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    36  )
    37  
    38  type Reconciler struct {
    39  	client.Client
    40  	Log          logr.Logger
    41  	ActiveLister *utils.ActiveLister
    42  
    43  	Recorder recorder.ChaosRecorder
    44  }
    45  
    46  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    47  	schedule := &v1alpha1.Schedule{}
    48  	err := r.Get(ctx, req.NamespacedName, schedule)
    49  	if err != nil {
    50  		if !k8sError.IsNotFound(err) {
    51  			r.Log.Error(err, "unable to get chaos")
    52  		}
    53  		return ctrl.Result{}, nil
    54  	}
    55  
    56  	if schedule.Spec.Type == v1alpha1.ScheduleTypeWorkflow {
    57  		if schedule.IsPaused() {
    58  			r.Recorder.Event(schedule, recorder.NotSupported{
    59  				Activity: "pausing a workflow schedule",
    60  			})
    61  		}
    62  		return ctrl.Result{}, nil
    63  	}
    64  
    65  	list, err := r.ActiveLister.ListActiveJobs(ctx, schedule)
    66  	if err != nil {
    67  		r.Recorder.Event(schedule, recorder.Failed{
    68  			Activity: "list active jobs",
    69  			Err:      err.Error(),
    70  		})
    71  		return ctrl.Result{}, nil
    72  	}
    73  
    74  	items := reflect.ValueOf(list).Elem().FieldByName("Items")
    75  	for i := 0; i < items.Len(); i++ {
    76  		item := items.Index(i).Addr().Interface().(v1alpha1.InnerObject)
    77  		if item.IsPaused() != schedule.IsPaused() {
    78  			key := k8sTypes.NamespacedName{
    79  				Namespace: item.GetNamespace(),
    80  				Name:      item.GetName(),
    81  			}
    82  			pause := strconv.FormatBool(schedule.IsPaused())
    83  
    84  			updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
    85  				r.Log.Info("updating object", "pause", schedule.IsPaused())
    86  
    87  				if err := r.Client.Get(ctx, key, item); err != nil {
    88  					r.Log.Error(err, "unable to get schedule")
    89  					return err
    90  				}
    91  				annotations := item.GetAnnotations()
    92  				if annotations == nil {
    93  					annotations = make(map[string]string)
    94  				}
    95  				annotations[v1alpha1.PauseAnnotationKey] = pause
    96  				item.SetAnnotations(annotations)
    97  
    98  				return r.Client.Update(ctx, item)
    99  			})
   100  			if updateError != nil {
   101  				r.Log.Error(updateError, "fail to update")
   102  				r.Recorder.Event(schedule, recorder.Failed{
   103  					Activity: fmt.Sprintf("set pause to %s for %s", pause, key),
   104  					Err:      updateError.Error(),
   105  				})
   106  				return ctrl.Result{}, nil
   107  			}
   108  		}
   109  	}
   110  
   111  	return ctrl.Result{}, nil
   112  }
   113  
   114  const controllerName = "schedule-pause"
   115  
   116  func Bootstrap(mgr ctrl.Manager, client client.Client, log logr.Logger, lister *utils.ActiveLister, recorderBuilder *recorder.RecorderBuilder) error {
   117  	if !config.ShouldSpawnController(controllerName) {
   118  		return nil
   119  	}
   120  	return builder.Default(mgr).
   121  		For(&v1alpha1.Schedule{}).
   122  		Named(controllerName).
   123  		Complete(&Reconciler{
   124  			client,
   125  			log.WithName(controllerName),
   126  			lister,
   127  			recorderBuilder.Build(controllerName),
   128  		})
   129  }
   130