...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/schedule/active/controller.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/schedule/active

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package active
    17  
    18  import (
    19  	"context"
    20  	"reflect"
    21  	"sort"
    22  
    23  	"github.com/go-logr/logr"
    24  	"go.uber.org/fx"
    25  	v1 "k8s.io/api/core/v1"
    26  	k8sError "k8s.io/apimachinery/pkg/api/errors"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	"k8s.io/client-go/tools/reference"
    29  	"k8s.io/client-go/util/retry"
    30  	ctrl "sigs.k8s.io/controller-runtime"
    31  	"sigs.k8s.io/controller-runtime/pkg/client"
    32  
    33  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    34  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/schedule/utils"
    36  	"github.com/chaos-mesh/chaos-mesh/controllers/types"
    37  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
    38  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    39  )
    40  
    41  type Reconciler struct {
    42  	scheme *runtime.Scheme
    43  
    44  	client.Client
    45  	Log logr.Logger
    46  
    47  	ActiveLister *utils.ActiveLister
    48  
    49  	Recorder recorder.ChaosRecorder
    50  }
    51  
    52  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    53  	schedule := &v1alpha1.Schedule{}
    54  	err := r.Get(ctx, req.NamespacedName, schedule)
    55  	if err != nil {
    56  		if !k8sError.IsNotFound(err) {
    57  			r.Log.Error(err, "unable to get chaos")
    58  		}
    59  		return ctrl.Result{}, nil
    60  	}
    61  
    62  	list, err := r.ActiveLister.ListActiveJobs(ctx, schedule)
    63  	if err != nil {
    64  		r.Recorder.Event(schedule, recorder.Failed{
    65  			Activity: "list active jobs",
    66  			Err:      err.Error(),
    67  		})
    68  		return ctrl.Result{}, nil
    69  	}
    70  
    71  	active := []v1.ObjectReference{}
    72  	items := reflect.ValueOf(list).Elem().FieldByName("Items")
    73  	for i := 0; i < items.Len(); i++ {
    74  		item := items.Index(i).Addr().Interface().(runtime.Object)
    75  
    76  		ref, err := reference.GetReference(r.scheme, item)
    77  		if err != nil {
    78  			r.Log.Error(err, "fail to get reference")
    79  			r.Recorder.Event(schedule, recorder.Failed{
    80  				Activity: "get reference from object",
    81  				Err:      err.Error(),
    82  			})
    83  			return ctrl.Result{}, nil
    84  		}
    85  
    86  		active = append(active, *ref)
    87  	}
    88  	sort.Slice(active, func(i, j int) bool {
    89  		return active[i].Name < active[j].Name
    90  	})
    91  	if reflect.DeepEqual(active, schedule.Status.Active) {
    92  		r.Log.Info("don't need to update active")
    93  		return ctrl.Result{}, nil
    94  	}
    95  
    96  	updateError := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
    97  		r.Log.Info("updating active", "active", active)
    98  		schedule = schedule.DeepCopyObject().(*v1alpha1.Schedule)
    99  
   100  		if err := r.Client.Get(ctx, req.NamespacedName, schedule); err != nil {
   101  			r.Log.Error(err, "unable to get schedule")
   102  			return err
   103  		}
   104  
   105  		schedule.Status.Active = active
   106  		return r.Client.Update(ctx, schedule)
   107  	})
   108  	if updateError != nil {
   109  		r.Log.Error(updateError, "fail to update")
   110  		r.Recorder.Event(schedule, recorder.Failed{
   111  			Activity: "update active",
   112  			Err:      updateError.Error(),
   113  		})
   114  		return ctrl.Result{}, nil
   115  	}
   116  
   117  	return ctrl.Result{}, nil
   118  }
   119  
   120  type Objs struct {
   121  	fx.In
   122  
   123  	Objs []types.Object `group:"objs"`
   124  }
   125  
   126  const controllerName = "schedule-active"
   127  
   128  func Bootstrap(mgr ctrl.Manager, client client.Client, log logr.Logger, objs Objs, scheme *runtime.Scheme, lister *utils.ActiveLister, recorderBuilder *recorder.RecorderBuilder) error {
   129  	if !config.ShouldSpawnController(controllerName) {
   130  		return nil
   131  	}
   132  	builder := builder.Default(mgr).
   133  		For(&v1alpha1.Schedule{}).
   134  		Named(controllerName)
   135  
   136  	for _, obj := range objs.Objs {
   137  		// TODO: support workflow
   138  		builder = builder.Owns(obj.Object)
   139  	}
   140  	builder = builder.Owns(&v1alpha1.Workflow{})
   141  
   142  	return builder.Complete(&Reconciler{
   143  		scheme,
   144  		client,
   145  		log.WithName(controllerName),
   146  		lister,
   147  		recorderBuilder.Build(controllerName),
   148  	})
   149  }
   150