...

Source file src/github.com/chaos-mesh/chaos-mesh/controllers/common/fx.go

Documentation: github.com/chaos-mesh/chaos-mesh/controllers/common

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package common
    17  
    18  import (
    19  	"context"
    20  	"reflect"
    21  
    22  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    23  
    24  	"github.com/go-logr/logr"
    25  	"go.uber.org/fx"
    26  	k8sTypes "k8s.io/apimachinery/pkg/types"
    27  	ctrl "sigs.k8s.io/controller-runtime"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    29  	"sigs.k8s.io/controller-runtime/pkg/handler"
    30  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    31  	"sigs.k8s.io/controller-runtime/pkg/source"
    32  
    33  	chaosimpltypes "github.com/chaos-mesh/chaos-mesh/controllers/chaosimpl/types"
    34  	"github.com/chaos-mesh/chaos-mesh/controllers/common/pipeline"
    35  	"github.com/chaos-mesh/chaos-mesh/controllers/config"
    36  	"github.com/chaos-mesh/chaos-mesh/controllers/types"
    37  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/builder"
    38  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/controller"
    39  	"github.com/chaos-mesh/chaos-mesh/controllers/utils/recorder"
    40  	"github.com/chaos-mesh/chaos-mesh/pkg/selector"
    41  )
    42  
    43  type Params struct {
    44  	fx.In
    45  
    46  	Mgr             ctrl.Manager
    47  	Client          client.Client
    48  	Logger          logr.Logger
    49  	Selector        *selector.Selector
    50  	RecorderBuilder *recorder.RecorderBuilder
    51  	Impls           []*chaosimpltypes.ChaosImplPair `group:"impl"`
    52  	Reader          client.Reader                   `name:"no-cache"`
    53  	Steps           []pipeline.PipelineStep
    54  }
    55  
    56  func Bootstrap(params Params) error {
    57  	logger := params.Logger
    58  	pairs := params.Impls
    59  	mgr := params.Mgr
    60  	kubeclient := params.Client
    61  	reader := params.Reader
    62  	selector := params.Selector
    63  	recorderBuilder := params.RecorderBuilder
    64  
    65  	setupLog := logger.WithName("setup-common")
    66  	for _, pair := range pairs {
    67  		name := pair.Name + "-records"
    68  		if !config.ShouldSpawnController(name) {
    69  			return nil
    70  		}
    71  
    72  		setupLog.Info("setting up controller", "resource-name", pair.Name)
    73  
    74  		builder := builder.Default(mgr).
    75  			For(pair.Object).
    76  			Named(pair.Name + "-pipeline")
    77  
    78  		// Add owning resources
    79  		if len(pair.Controlls) > 0 {
    80  			pair := pair
    81  			for _, obj := range pair.Controlls {
    82  				builder.Watches(&source.Kind{
    83  					Type: obj,
    84  				},
    85  					handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
    86  						reqs := []reconcile.Request{}
    87  						objName := k8sTypes.NamespacedName{
    88  							Namespace: obj.GetNamespace(),
    89  							Name:      obj.GetName(),
    90  						}
    91  
    92  						list := pair.ObjectList.DeepCopyList()
    93  						err := kubeclient.List(context.TODO(), list)
    94  						if err != nil {
    95  							setupLog.Error(err, "fail to list object")
    96  						}
    97  
    98  						items := reflect.ValueOf(list).Elem().FieldByName("Items")
    99  						for i := 0; i < items.Len(); i++ {
   100  							item := items.Index(i).Addr().Interface().(v1alpha1.InnerObjectWithSelector)
   101  							for _, record := range item.GetStatus().Experiment.Records {
   102  								namespacedName, err := controller.ParseNamespacedName(record.Id)
   103  								if err != nil {
   104  									setupLog.Error(err, "failed to parse record", "record", record.Id)
   105  									continue
   106  								}
   107  								if namespacedName == objName {
   108  									id := k8sTypes.NamespacedName{
   109  										Namespace: item.GetNamespace(),
   110  										Name:      item.GetName(),
   111  									}
   112  									setupLog.Info("mapping requests", "source", objName, "target", id)
   113  									reqs = append(reqs, reconcile.Request{
   114  										NamespacedName: id,
   115  									})
   116  								}
   117  							}
   118  						}
   119  						return reqs
   120  					}),
   121  				)
   122  			}
   123  		}
   124  
   125  		pipe := pipeline.NewPipeline(&pipeline.PipelineContext{
   126  			Logger: logger,
   127  			Object: &types.Object{
   128  				Name:   pair.Name,
   129  				Object: pair.Object,
   130  			},
   131  			Impl:            pair.Impl,
   132  			Mgr:             mgr,
   133  			Client:          kubeclient,
   134  			Reader:          reader,
   135  			RecorderBuilder: recorderBuilder,
   136  			Selector:        selector,
   137  		})
   138  
   139  		pipe.AddSteps(params.Steps...)
   140  		err := builder.Complete(pipe)
   141  		if err != nil {
   142  			return err
   143  		}
   144  
   145  	}
   146  
   147  	return nil
   148  }
   149