...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/schedule/pause/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/schedule/pause

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package pause
    15  
    16  import (
    17  	"context"
    18  	"fmt"
    19  	"reflect"
    20  	"strconv"
    21  
    22  	"github.com/go-logr/logr"
    23  	k8sError "k8s.io/apimachinery/pkg/api/errors"
    24  	k8sTypes "k8s.io/apimachinery/pkg/types"
    25  	"k8s.io/client-go/util/retry"
    26  	ctrl "sigs.k8s.io/controller-runtime"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  
    29  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    30  	"github.com/chaos-mesh/chaos-mesh/controllers/schedule/utils"
    31  	"github.com/chaos-mesh/chaos-mesh/controllers/types"
    32  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
    33  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    34  )
    35  
    36  type Reconciler struct {
    37  	client.Client
    38  	Log          logr.Logger
    39  	ActiveLister *utils.ActiveLister
    40  
    41  	Recorder recorder.ChaosRecorder
    42  }
    43  
    44  func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    45  	ctx := context.Background()
    46  
    47  	schedule := &v1alpha1.Schedule{}
    48  	err := r.Get(ctx, req.NamespacedName, schedule)
    49  	if err != nil {
    50  		if !k8sError.IsNotFound(err) {
    51  			r.Log.Error(err, "unable to get chaos")
    52  		}
    53  		return ctrl.Result{}, nil
    54  	}
    55  
    56  	if schedule.Spec.Type == v1alpha1.ScheduleTypeWorkflow {
    57  		if schedule.IsPaused() {
    58  			r.Recorder.Event(schedule, recorder.NotSupported{
    59  				Activity: "pausing a workflow schedule",
    60  			})
    61  		}
    62  		return ctrl.Result{}, nil
    63  	}
    64  
    65  	list, err := r.ActiveLister.ListActiveJobs(ctx, schedule)
    66  	if err != nil {
    67  		r.Recorder.Event(schedule, recorder.Failed{
    68  			Activity: "list active jobs",
    69  			Err:      err.Error(),
    70  		})
    71  		return ctrl.Result{}, nil
    72  	}
    73  
    74  	items := reflect.ValueOf(list).Elem().FieldByName("Items")
    75  	for i := 0; i < items.Len(); i++ {
    76  		item := items.Index(i).Addr().Interface().(v1alpha1.InnerObject)
    77  		if item.IsPaused() != schedule.IsPaused() {
    78  			key := k8sTypes.NamespacedName{
    79  				Namespace: item.GetObjectMeta().GetNamespace(),
    80  				Name:      item.GetObjectMeta().GetName(),
    81  			}
    82  			pause := strconv.FormatBool(schedule.IsPaused())
    83  
    84  			updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
    85  				r.Log.Info("updating object", "pause", schedule.IsPaused())
    86  
    87  				if err := r.Client.Get(ctx, key, item); err != nil {
    88  					r.Log.Error(err, "unable to get schedule")
    89  					return err
    90  				}
    91  				if item.GetObjectMeta().Annotations == nil {
    92  					item.GetObjectMeta().Annotations = make(map[string]string)
    93  				}
    94  				item.GetObjectMeta().Annotations[v1alpha1.PauseAnnotationKey] = pause
    95  
    96  				return r.Client.Update(ctx, item)
    97  			})
    98  			if updateError != nil {
    99  				r.Log.Error(updateError, "fail to update")
   100  				r.Recorder.Event(schedule, recorder.Failed{
   101  					Activity: fmt.Sprintf("set pause to %s for %s", pause, key),
   102  					Err:      updateError.Error(),
   103  				})
   104  				return ctrl.Result{}, nil
   105  			}
   106  		}
   107  	}
   108  
   109  	return ctrl.Result{}, nil
   110  }
   111  
   112  func NewController(mgr ctrl.Manager, client client.Client, log logr.Logger, lister *utils.ActiveLister, recorderBuilder *recorder.RecorderBuilder) (types.Controller, error) {
   113  	builder.Default(mgr).
   114  		For(&v1alpha1.Schedule{}).
   115  		Named("schedule-pause").
   116  		Complete(&Reconciler{
   117  			client,
   118  			log.WithName("schedule-pause"),
   119  			lister,
   120  			recorderBuilder.Build("schedule-pause"),
   121  		})
   122  	return "schedule-pause", nil
   123  }
   124