...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/common/fx.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/common

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package common
    17  
    18  import (
    19  	"context"
    20  	"reflect"
    21  
    22  	"github.com/go-logr/logr"
    23  	"go.uber.org/fx"
    24  	k8sTypes "k8s.io/apimachinery/pkg/types"
    25  	ctrl "sigs.k8s.io/controller-runtime"
    26  	"sigs.k8s.io/controller-runtime/pkg/client"
    27  	"sigs.k8s.io/controller-runtime/pkg/event"
    28  	"sigs.k8s.io/controller-runtime/pkg/handler"
    29  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    30  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    31  
    32  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    33  	chaosimpltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    34  	"github.com/chaos-mesh/chaos-mesh/controllers/common/pipeline"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    36  	"github.com/chaos-mesh/chaos-mesh/controllers/types"
    37  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
    38  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
    39  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    40  	"github.com/chaos-mesh/chaos-mesh/pkg/selector"
    41  )
    42  
    43  type Params struct {
    44  	fx.In
    45  
    46  	Mgr             ctrl.Manager
    47  	Client          client.Client
    48  	Logger          logr.Logger
    49  	Selector        *selector.Selector
    50  	RecorderBuilder *recorder.RecorderBuilder
    51  	Impls           []*chaosimpltypes.ChaosImplPair `group:"impl"`
    52  	Reader          client.Reader                   `name:"no-cache"`
    53  	Steps           []pipeline.PipelineStep
    54  }
    55  
    56  func Bootstrap(params Params) error {
    57  	logger := params.Logger
    58  	pairs := params.Impls
    59  	mgr := params.Mgr
    60  	kubeclient := params.Client
    61  	reader := params.Reader
    62  	selector := params.Selector
    63  	recorderBuilder := params.RecorderBuilder
    64  
    65  	setupLog := logger.WithName("setup-common")
    66  	for _, pair := range pairs {
    67  		name := pair.Name + "-records"
    68  		if !config.ShouldSpawnController(name) {
    69  			return nil
    70  		}
    71  
    72  		setupLog.Info("setting up controller", "resource-name", pair.Name)
    73  
    74  		builder := builder.Default(mgr).
    75  			For(pair.Object).
    76  			Named(pair.Name + "-pipeline")
    77  
    78  		// for common CRDs, since we don't want to reconcile the object,
    79  		// when we only change the object.status.experiment.records[].events
    80  		predicaters := []predicate.Predicate{StatusRecordEventsChangePredicate{}}
    81  
    82  		// Add owning resources
    83  		if len(pair.Controlls) > 0 {
    84  			pair := pair
    85  			for _, obj := range pair.Controlls {
    86  				builder.Watches(obj,
    87  					handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
    88  						reqs := []reconcile.Request{}
    89  						objName := k8sTypes.NamespacedName{
    90  							Namespace: obj.GetNamespace(),
    91  							Name:      obj.GetName(),
    92  						}
    93  
    94  						list := pair.ObjectList.DeepCopyList()
    95  						err := kubeclient.List(context.TODO(), list)
    96  						if err != nil {
    97  							setupLog.Error(err, "fail to list object")
    98  						}
    99  
   100  						items := reflect.ValueOf(list).Elem().FieldByName("Items")
   101  						for i := 0; i < items.Len(); i++ {
   102  							item := items.Index(i).Addr().Interface().(v1alpha1.InnerObjectWithSelector)
   103  							for _, record := range item.GetStatus().Experiment.Records {
   104  								namespacedName, err := controller.ParseNamespacedName(record.Id)
   105  								if err != nil {
   106  									setupLog.Error(err, "failed to parse record", "record", record.Id)
   107  									continue
   108  								}
   109  								if namespacedName == objName {
   110  									id := k8sTypes.NamespacedName{
   111  										Namespace: item.GetNamespace(),
   112  										Name:      item.GetName(),
   113  									}
   114  									setupLog.Info("mapping requests", "source", objName, "target", id)
   115  									reqs = append(reqs, reconcile.Request{
   116  										NamespacedName: id,
   117  									})
   118  								}
   119  							}
   120  						}
   121  						return reqs
   122  					}),
   123  				)
   124  			}
   125  			predicaters = append(predicaters, PickChildCRDPredicate{})
   126  		}
   127  
   128  		pipe := pipeline.NewPipeline(&pipeline.PipelineContext{
   129  			Logger: logger,
   130  			Object: &types.Object{
   131  				Name:   pair.Name,
   132  				Object: pair.Object,
   133  			},
   134  			Impl:            pair.Impl,
   135  			Mgr:             mgr,
   136  			Client:          kubeclient,
   137  			Reader:          reader,
   138  			RecorderBuilder: recorderBuilder,
   139  			Selector:        selector,
   140  		})
   141  
   142  		pipe.AddSteps(params.Steps...)
   143  		builder = builder.WithEventFilter(predicate.And(predicate.Or(predicaters...), RemoteChaosPredicate{}))
   144  		err := builder.Complete(pipe)
   145  		if err != nil {
   146  			return err
   147  		}
   148  
   149  	}
   150  
   151  	return nil
   152  }
   153  
   154  // PickChildCRDPredicate allows events to trigger the Reconcile of Chaos CRD,
   155  // for example:
   156  // Reconcile of IOChaos could be triggered by changes on PodIOChaos.
   157  // For now, we have PodHttpChaos/PodIOChaos/PodNetworkChaos which require to follow this pattern.
   158  type PickChildCRDPredicate struct {
   159  	predicate.Funcs
   160  }
   161  
   162  // Update implements UpdateEvent filter for child CRD.
   163  func (PickChildCRDPredicate) Update(e event.UpdateEvent) bool {
   164  	switch e.ObjectNew.(type) {
   165  	case *v1alpha1.PodHttpChaos, *v1alpha1.PodIOChaos, *v1alpha1.PodNetworkChaos:
   166  		return true
   167  	}
   168  	return false
   169  }
   170  
   171  // StatusRecordEventsChangePredicate skip the update event,
   172  // when we Only update object.status.experiment.records[].events
   173  type StatusRecordEventsChangePredicate struct {
   174  	predicate.Funcs
   175  }
   176  
   177  // Update implements UpdateEvent filter for update to filter the events
   178  // which we Only update object.status.experiment.records[].events
   179  func (StatusRecordEventsChangePredicate) Update(e event.UpdateEvent) bool {
   180  	objNew, ok := e.ObjectNew.DeepCopyObject().(v1alpha1.StatefulObject)
   181  	if !ok {
   182  		return false
   183  	}
   184  	objOld, ok := e.ObjectOld.DeepCopyObject().(v1alpha1.StatefulObject)
   185  	if !ok {
   186  		return false
   187  	}
   188  	statusNew := objNew.GetStatus()
   189  	statusOld := objOld.GetStatus()
   190  	if statusNew == nil || statusOld == nil {
   191  		return true
   192  	}
   193  	objNew.SetGeneration(0)
   194  	objOld.SetGeneration(0)
   195  	objNew.SetResourceVersion("")
   196  	objOld.SetResourceVersion("")
   197  	for i := range statusNew.Experiment.Records {
   198  		statusNew.Experiment.Records[i].Events = nil
   199  	}
   200  	for i := range statusOld.Experiment.Records {
   201  		statusOld.Experiment.Records[i].Events = nil
   202  	}
   203  	return !reflect.DeepEqual(objNew, objOld)
   204  }
   205  
   206  type RemoteChaosPredicate struct {
   207  	predicate.Funcs
   208  }
   209  
   210  func (RemoteChaosPredicate) Create(e event.CreateEvent) bool {
   211  	obj, ok := e.Object.DeepCopyObject().(v1alpha1.RemoteObject)
   212  	if !ok {
   213  		return true
   214  	}
   215  
   216  	if obj.GetRemoteCluster() == "" {
   217  		return true
   218  	}
   219  
   220  	return false
   221  }
   222  
   223  func (RemoteChaosPredicate) Update(e event.UpdateEvent) bool {
   224  	obj, ok := e.ObjectNew.DeepCopyObject().(v1alpha1.RemoteObject)
   225  	if !ok {
   226  		return true
   227  	}
   228  
   229  	if obj.GetRemoteCluster() == "" {
   230  		return true
   231  	}
   232  
   233  	return false
   234  }
   235  
   236  func (RemoteChaosPredicate) Delete(e event.DeleteEvent) bool {
   237  	obj, ok := e.Object.DeepCopyObject().(v1alpha1.RemoteObject)
   238  	if !ok {
   239  		return true
   240  	}
   241  
   242  	if obj.GetRemoteCluster() == "" {
   243  		return true
   244  	}
   245  
   246  	return false
   247  }
   248  
   249  func (RemoteChaosPredicate) Generic(e event.GenericEvent) bool {
   250  	obj, ok := e.Object.DeepCopyObject().(v1alpha1.RemoteObject)
   251  	if !ok {
   252  		return true
   253  	}
   254  
   255  	if obj.GetRemoteCluster() == "" {
   256  		return true
   257  	}
   258  
   259  	return false
   260  }
   261