...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/collector/collector.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/collector

     1  // Copyright 2019 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package collector
    15  
    16  import (
    17  	"context"
    18  	"encoding/json"
    19  	"errors"
    20  
    21  	"github.com/go-logr/logr"
    22  	"github.com/jinzhu/gorm"
    23  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    24  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    25  	"k8s.io/apimachinery/pkg/runtime"
    26  	ctrl "sigs.k8s.io/controller-runtime"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  
    29  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    30  	"github.com/chaos-mesh/chaos-mesh/pkg/core"
    31  )
    32  
    33  // ChaosCollector represents a collector for Chaos Object.
    34  type ChaosCollector struct {
    35  	client.Client
    36  	Log     logr.Logger
    37  	apiType runtime.Object
    38  	archive core.ExperimentStore
    39  	event   core.EventStore
    40  }
    41  
    42  // Reconcile reconciles a chaos collector.
    43  func (r *ChaosCollector) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    44  	var (
    45  		chaosMeta  metav1.Object
    46  		ok         bool
    47  		manageFlag bool
    48  	)
    49  
    50  	if r.apiType == nil {
    51  		r.Log.Error(nil, "apiType has not been initialized")
    52  		return ctrl.Result{}, nil
    53  	}
    54  	ctx := context.Background()
    55  	manageFlag = false
    56  
    57  	obj, ok := r.apiType.DeepCopyObject().(v1alpha1.InnerObject)
    58  	if !ok {
    59  		r.Log.Error(nil, "it's not a stateful object")
    60  		return ctrl.Result{}, nil
    61  	}
    62  
    63  	err := r.Get(ctx, req.NamespacedName, obj)
    64  	if apierrors.IsNotFound(err) {
    65  		if chaosMeta, ok = obj.(metav1.Object); !ok {
    66  			r.Log.Error(nil, "failed to get chaos meta information")
    67  		}
    68  		if chaosMeta.GetLabels()["managed-by"] != "" {
    69  			manageFlag = true
    70  		}
    71  		if !manageFlag {
    72  			if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
    73  				r.Log.Error(err, "failed to archive experiment")
    74  			}
    75  		} else {
    76  			if err = r.event.DeleteByUID(ctx, string(chaosMeta.GetUID())); err != nil {
    77  				r.Log.Error(err, "failed to delete experiment related events")
    78  			}
    79  		}
    80  		return ctrl.Result{}, nil
    81  	}
    82  
    83  	if err != nil {
    84  		r.Log.Error(err, "failed to get chaos object", "request", req.NamespacedName)
    85  		return ctrl.Result{}, nil
    86  	}
    87  
    88  	if chaosMeta, ok = obj.(metav1.Object); !ok {
    89  		r.Log.Error(nil, "failed to get chaos meta information")
    90  	}
    91  
    92  	if chaosMeta.GetLabels()["managed-by"] != "" {
    93  		manageFlag = true
    94  	}
    95  
    96  	if obj.IsDeleted() {
    97  		if !manageFlag {
    98  			if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
    99  				r.Log.Error(err, "failed to archive experiment")
   100  			}
   101  		} else {
   102  			if err = r.event.DeleteByUID(ctx, string(chaosMeta.GetUID())); err != nil {
   103  				r.Log.Error(err, "failed to delete experiment related events")
   104  			}
   105  		}
   106  		return ctrl.Result{}, nil
   107  	}
   108  
   109  	if err := r.setUnarchivedExperiment(req, obj); err != nil {
   110  		r.Log.Error(err, "failed to archive experiment")
   111  		// ignore error here
   112  	}
   113  
   114  	return ctrl.Result{}, nil
   115  }
   116  
   117  // Setup setups collectors by Manager.
   118  func (r *ChaosCollector) Setup(mgr ctrl.Manager, apiType runtime.Object) error {
   119  	r.apiType = apiType
   120  
   121  	return ctrl.NewControllerManagedBy(mgr).
   122  		For(apiType).
   123  		Complete(r)
   124  }
   125  
   126  func (r *ChaosCollector) setUnarchivedExperiment(req ctrl.Request, obj v1alpha1.InnerObject) error {
   127  	var (
   128  		chaosMeta metav1.Object
   129  		ok        bool
   130  	)
   131  
   132  	if chaosMeta, ok = obj.(metav1.Object); !ok {
   133  		r.Log.Error(nil, "failed to get chaos meta information")
   134  	}
   135  	UID := string(chaosMeta.GetUID())
   136  
   137  	archive := &core.Experiment{
   138  		ExperimentMeta: core.ExperimentMeta{
   139  			Namespace: req.Namespace,
   140  			Name:      req.Name,
   141  			Kind:      obj.GetObjectKind().GroupVersionKind().Kind,
   142  			UID:       UID,
   143  			Archived:  false,
   144  		},
   145  	}
   146  
   147  	switch chaos := obj.(type) {
   148  	case *v1alpha1.PodChaos:
   149  		archive.Action = string(chaos.Spec.Action)
   150  	case *v1alpha1.NetworkChaos:
   151  		archive.Action = string(chaos.Spec.Action)
   152  	case *v1alpha1.IOChaos:
   153  		archive.Action = string(chaos.Spec.Action)
   154  	case *v1alpha1.TimeChaos, *v1alpha1.KernelChaos, *v1alpha1.StressChaos:
   155  		archive.Action = ""
   156  	case *v1alpha1.DNSChaos:
   157  		archive.Action = string(chaos.Spec.Action)
   158  	case *v1alpha1.AWSChaos:
   159  		archive.Action = string(chaos.Spec.Action)
   160  	case *v1alpha1.GCPChaos:
   161  		archive.Action = string(chaos.Spec.Action)
   162  	default:
   163  		return errors.New("unsupported chaos type " + archive.Kind)
   164  	}
   165  
   166  	archive.StartTime = chaosMeta.GetCreationTimestamp().Time
   167  	if chaosMeta.GetDeletionTimestamp() != nil {
   168  		archive.FinishTime = chaosMeta.GetDeletionTimestamp().Time
   169  	}
   170  
   171  	data, err := json.Marshal(chaosMeta)
   172  	if err != nil {
   173  		r.Log.Error(err, "failed to marshal chaos", "kind", archive.Kind,
   174  			"namespace", archive.Namespace, "name", archive.Name)
   175  		return err
   176  	}
   177  
   178  	archive.Experiment = string(data)
   179  
   180  	find, err := r.archive.FindByUID(context.Background(), UID)
   181  	if err != nil && !gorm.IsRecordNotFoundError(err) {
   182  		r.Log.Error(err, "failed to find experiment", "UID", UID)
   183  		return err
   184  	}
   185  
   186  	if find != nil {
   187  		archive.ID = find.ID
   188  		archive.CreatedAt = find.CreatedAt
   189  		archive.UpdatedAt = find.UpdatedAt
   190  	}
   191  
   192  	if err := r.archive.Set(context.Background(), archive); err != nil {
   193  		r.Log.Error(err, "failed to update experiment", "archive", archive)
   194  		return err
   195  	}
   196  
   197  	return nil
   198  }
   199  
   200  func (r *ChaosCollector) archiveExperiment(ns, name string) error {
   201  	if err := r.archive.Archive(context.Background(), ns, name); err != nil {
   202  		r.Log.Error(err, "failed to archive experiment", "namespace", ns, "name", name)
   203  		return err
   204  	}
   205  
   206  	return nil
   207  }
   208