...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/collector/collector.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/collector

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package collector
    17  
    18  import (
    19  	"context"
    20  	"encoding/json"
    21  
    22  	"github.com/go-logr/logr"
    23  	"github.com/jinzhu/gorm"
    24  	"github.com/pkg/errors"
    25  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	ctrl "sigs.k8s.io/controller-runtime"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  
    31  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    32  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
    33  )
    34  
    35  // ChaosCollector represents a collector for Chaos Object.
    36  type ChaosCollector struct {
    37  	client.Client
    38  	Log     logr.Logger
    39  	apiType runtime.Object
    40  	archive core.ExperimentStore
    41  	event   core.EventStore
    42  }
    43  
    44  // Reconcile reconciles a chaos collector.
    45  func (r *ChaosCollector) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    46  	if r.apiType == nil {
    47  		r.Log.Error(nil, "apiType has not been initialized")
    48  		return ctrl.Result{}, nil
    49  	}
    50  
    51  	obj, ok := r.apiType.DeepCopyObject().(v1alpha1.InnerObject)
    52  	if !ok {
    53  		r.Log.Error(nil, "it's not a stateful object")
    54  		return ctrl.Result{}, nil
    55  	}
    56  
    57  	err := r.Get(ctx, req.NamespacedName, obj)
    58  	if apierrors.IsNotFound(err) {
    59  		if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
    60  			r.Log.Error(err, "failed to archive experiment")
    61  		}
    62  
    63  		// If the experiment was created by schedule or workflow,
    64  		// it and its events will be deleted from database.
    65  		if err = r.deleteManagedExperiments(req.Namespace, req.Name); err != nil {
    66  			r.Log.Error(err, "delete managed experiments", "namespace", req.Namespace, "name", req.Name)
    67  		}
    68  
    69  		return ctrl.Result{}, nil
    70  	}
    71  
    72  	if err != nil {
    73  		r.Log.Error(err, "failed to get chaos object", "request", req.NamespacedName)
    74  		return ctrl.Result{}, nil
    75  	}
    76  
    77  	if err := r.setUnarchivedExperiment(req, obj); err != nil {
    78  		r.Log.Error(err, "failed to archive experiment")
    79  		// ignore error here
    80  	}
    81  
    82  	return ctrl.Result{}, nil
    83  }
    84  
    85  // Setup setups collectors by Manager.
    86  func (r *ChaosCollector) Setup(mgr ctrl.Manager, apiType client.Object) error {
    87  	r.apiType = apiType
    88  
    89  	return ctrl.NewControllerManagedBy(mgr).
    90  		For(apiType).
    91  		Complete(r)
    92  }
    93  
    94  func (r *ChaosCollector) setUnarchivedExperiment(req ctrl.Request, obj v1alpha1.InnerObject) error {
    95  	archive, err := convertInnerObjectToExperiment(obj)
    96  	if err != nil {
    97  		r.Log.Error(err, "failed to covert InnerObject")
    98  		return err
    99  	}
   100  
   101  	find, err := r.archive.FindByUID(context.Background(), archive.UID)
   102  	if err != nil && !gorm.IsRecordNotFoundError(err) {
   103  		r.Log.Error(err, "failed to find experiment", "UID", archive.UID)
   104  		return err
   105  	}
   106  
   107  	if find != nil {
   108  		archive.ID = find.ID
   109  		archive.CreatedAt = find.CreatedAt
   110  		archive.UpdatedAt = find.UpdatedAt
   111  	}
   112  
   113  	if err := r.archive.Set(context.Background(), archive); err != nil {
   114  		r.Log.Error(err, "failed to update experiment", "archive", archive)
   115  		return err
   116  	}
   117  
   118  	return nil
   119  }
   120  
   121  func (r *ChaosCollector) archiveExperiment(ns, name string) error {
   122  	if err := r.archive.Archive(context.Background(), ns, name); err != nil {
   123  		r.Log.Error(err, "failed to archive experiment", "namespace", ns, "name", name)
   124  		return err
   125  	}
   126  
   127  	return nil
   128  }
   129  
   130  func (r *ChaosCollector) deleteManagedExperiments(ns, name string) error {
   131  	archives, err := r.archive.FindManagedByNamespaceName(context.Background(), ns, name)
   132  	if gorm.IsRecordNotFoundError(err) {
   133  		return nil
   134  	}
   135  
   136  	if err != nil {
   137  		return err
   138  	}
   139  
   140  	for _, expr := range archives {
   141  		if err = r.event.DeleteByUID(context.Background(), expr.UID); err != nil {
   142  			r.Log.Error(err, "failed to delete experiment related events")
   143  		}
   144  
   145  		if err = r.archive.Delete(context.Background(), expr); err != nil {
   146  			r.Log.Error(err, "failed to delete managed experiment")
   147  		}
   148  	}
   149  
   150  	return nil
   151  }
   152  
   153  func convertInnerObjectToExperiment(obj v1alpha1.InnerObject) (*core.Experiment, error) {
   154  	chaosMeta, ok := obj.(metav1.Object)
   155  	if !ok {
   156  		return nil, errors.New("chaos meta information not found")
   157  	}
   158  	UID := string(chaosMeta.GetUID())
   159  
   160  	archive := &core.Experiment{
   161  		ExperimentMeta: core.ExperimentMeta{
   162  			Namespace: chaosMeta.GetNamespace(),
   163  			Name:      chaosMeta.GetName(),
   164  			Kind:      obj.GetObjectKind().GroupVersionKind().Kind,
   165  			UID:       UID,
   166  			Archived:  false,
   167  		},
   168  	}
   169  
   170  	switch chaos := obj.(type) {
   171  	case *v1alpha1.PodChaos:
   172  		archive.Action = string(chaos.Spec.Action)
   173  	case *v1alpha1.NetworkChaos:
   174  		archive.Action = string(chaos.Spec.Action)
   175  	case *v1alpha1.IOChaos:
   176  		archive.Action = string(chaos.Spec.Action)
   177  	case *v1alpha1.TimeChaos, *v1alpha1.KernelChaos, *v1alpha1.StressChaos, *v1alpha1.HTTPChaos:
   178  		archive.Action = ""
   179  	case *v1alpha1.DNSChaos:
   180  		archive.Action = string(chaos.Spec.Action)
   181  	case *v1alpha1.PhysicalMachineChaos:
   182  		archive.Action = string(chaos.Spec.Action)
   183  	case *v1alpha1.AWSChaos:
   184  		archive.Action = string(chaos.Spec.Action)
   185  	case *v1alpha1.GCPChaos:
   186  		archive.Action = string(chaos.Spec.Action)
   187  	case *v1alpha1.JVMChaos:
   188  		archive.Action = string(chaos.Spec.Action)
   189  	case *v1alpha1.BlockChaos:
   190  		archive.Action = string(chaos.Spec.Action)
   191  	default:
   192  		return nil, errors.New("unsupported chaos type " + archive.Kind)
   193  	}
   194  
   195  	archive.StartTime = chaosMeta.GetCreationTimestamp().Time
   196  	if chaosMeta.GetDeletionTimestamp() != nil {
   197  		archive.FinishTime = &chaosMeta.GetDeletionTimestamp().Time
   198  	}
   199  
   200  	data, err := json.Marshal(chaosMeta)
   201  	if err != nil {
   202  		return nil, err
   203  	}
   204  
   205  	archive.Experiment = string(data)
   206  
   207  	return archive, nil
   208  }
   209