...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/dashboard/collector/collector.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/dashboard/collector

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package collector
    17  
    18  import (
    19  	"context"
    20  	"encoding/json"
    21  	"errors"
    22  
    23  	"github.com/go-logr/logr"
    24  	"github.com/jinzhu/gorm"
    25  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	ctrl "sigs.k8s.io/controller-runtime"
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  
    31  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    32  	"github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
    33  )
    34  
    35  // ChaosCollector represents a collector for Chaos Object.
    36  type ChaosCollector struct {
    37  	client.Client
    38  	Log     logr.Logger
    39  	apiType runtime.Object
    40  	archive core.ExperimentStore
    41  	event   core.EventStore
    42  }
    43  
    44  // Reconcile reconciles a chaos collector.
    45  func (r *ChaosCollector) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    46  	var (
    47  		chaosMeta  metav1.Object
    48  		ok         bool
    49  		manageFlag bool
    50  	)
    51  
    52  	if r.apiType == nil {
    53  		r.Log.Error(nil, "apiType has not been initialized")
    54  		return ctrl.Result{}, nil
    55  	}
    56  
    57  	manageFlag = false
    58  
    59  	obj, ok := r.apiType.DeepCopyObject().(v1alpha1.InnerObject)
    60  	if !ok {
    61  		r.Log.Error(nil, "it's not a stateful object")
    62  		return ctrl.Result{}, nil
    63  	}
    64  
    65  	err := r.Get(ctx, req.NamespacedName, obj)
    66  	if apierrors.IsNotFound(err) {
    67  		if chaosMeta, ok = obj.(metav1.Object); !ok {
    68  			r.Log.Error(nil, "failed to get chaos meta information")
    69  		}
    70  		if chaosMeta.GetLabels()[v1alpha1.LabelManagedBy] != "" {
    71  			manageFlag = true
    72  		}
    73  		if !manageFlag {
    74  			if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
    75  				r.Log.Error(err, "failed to archive experiment")
    76  			}
    77  		} else {
    78  			if err = r.event.DeleteByUID(ctx, string(chaosMeta.GetUID())); err != nil {
    79  				r.Log.Error(err, "failed to delete experiment related events")
    80  			}
    81  		}
    82  		return ctrl.Result{}, nil
    83  	}
    84  
    85  	if err != nil {
    86  		r.Log.Error(err, "failed to get chaos object", "request", req.NamespacedName)
    87  		return ctrl.Result{}, nil
    88  	}
    89  
    90  	if chaosMeta, ok = obj.(metav1.Object); !ok {
    91  		r.Log.Error(nil, "failed to get chaos meta information")
    92  	}
    93  
    94  	if chaosMeta.GetLabels()[v1alpha1.LabelManagedBy] != "" {
    95  		manageFlag = true
    96  	}
    97  
    98  	if obj.IsDeleted() {
    99  		if !manageFlag {
   100  			if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
   101  				r.Log.Error(err, "failed to archive experiment")
   102  			}
   103  		} else {
   104  			if err = r.event.DeleteByUID(ctx, string(chaosMeta.GetUID())); err != nil {
   105  				r.Log.Error(err, "failed to delete experiment related events")
   106  			}
   107  		}
   108  		return ctrl.Result{}, nil
   109  	}
   110  
   111  	if err := r.setUnarchivedExperiment(req, obj); err != nil {
   112  		r.Log.Error(err, "failed to archive experiment")
   113  		// ignore error here
   114  	}
   115  
   116  	return ctrl.Result{}, nil
   117  }
   118  
   119  // Setup setups collectors by Manager.
   120  func (r *ChaosCollector) Setup(mgr ctrl.Manager, apiType client.Object) error {
   121  	r.apiType = apiType
   122  
   123  	return ctrl.NewControllerManagedBy(mgr).
   124  		For(apiType).
   125  		Complete(r)
   126  }
   127  
   128  func (r *ChaosCollector) setUnarchivedExperiment(req ctrl.Request, obj v1alpha1.InnerObject) error {
   129  	var (
   130  		chaosMeta metav1.Object
   131  		ok        bool
   132  	)
   133  
   134  	if chaosMeta, ok = obj.(metav1.Object); !ok {
   135  		r.Log.Error(nil, "failed to get chaos meta information")
   136  	}
   137  	UID := string(chaosMeta.GetUID())
   138  
   139  	archive := &core.Experiment{
   140  		ExperimentMeta: core.ExperimentMeta{
   141  			Namespace: req.Namespace,
   142  			Name:      req.Name,
   143  			Kind:      obj.GetObjectKind().GroupVersionKind().Kind,
   144  			UID:       UID,
   145  			Archived:  false,
   146  		},
   147  	}
   148  
   149  	switch chaos := obj.(type) {
   150  	case *v1alpha1.PodChaos:
   151  		archive.Action = string(chaos.Spec.Action)
   152  	case *v1alpha1.NetworkChaos:
   153  		archive.Action = string(chaos.Spec.Action)
   154  	case *v1alpha1.IOChaos:
   155  		archive.Action = string(chaos.Spec.Action)
   156  	case *v1alpha1.TimeChaos, *v1alpha1.KernelChaos, *v1alpha1.StressChaos, *v1alpha1.HTTPChaos:
   157  		archive.Action = ""
   158  	case *v1alpha1.DNSChaos:
   159  		archive.Action = string(chaos.Spec.Action)
   160  	case *v1alpha1.PhysicalMachineChaos:
   161  		archive.Action = string(chaos.Spec.Action)
   162  	case *v1alpha1.AWSChaos:
   163  		archive.Action = string(chaos.Spec.Action)
   164  	case *v1alpha1.GCPChaos:
   165  		archive.Action = string(chaos.Spec.Action)
   166  	case *v1alpha1.JVMChaos:
   167  		archive.Action = string(chaos.Spec.Action)
   168  	default:
   169  		return errors.New("unsupported chaos type " + archive.Kind)
   170  	}
   171  
   172  	archive.StartTime = chaosMeta.GetCreationTimestamp().Time
   173  	if chaosMeta.GetDeletionTimestamp() != nil {
   174  		archive.FinishTime = chaosMeta.GetDeletionTimestamp().Time
   175  	}
   176  
   177  	data, err := json.Marshal(chaosMeta)
   178  	if err != nil {
   179  		r.Log.Error(err, "failed to marshal chaos", "kind", archive.Kind,
   180  			"namespace", archive.Namespace, "name", archive.Name)
   181  		return err
   182  	}
   183  
   184  	archive.Experiment = string(data)
   185  
   186  	find, err := r.archive.FindByUID(context.Background(), UID)
   187  	if err != nil && !gorm.IsRecordNotFoundError(err) {
   188  		r.Log.Error(err, "failed to find experiment", "UID", UID)
   189  		return err
   190  	}
   191  
   192  	if find != nil {
   193  		archive.ID = find.ID
   194  		archive.CreatedAt = find.CreatedAt
   195  		archive.UpdatedAt = find.UpdatedAt
   196  	}
   197  
   198  	if err := r.archive.Set(context.Background(), archive); err != nil {
   199  		r.Log.Error(err, "failed to update experiment", "archive", archive)
   200  		return err
   201  	}
   202  
   203  	return nil
   204  }
   205  
   206  func (r *ChaosCollector) archiveExperiment(ns, name string) error {
   207  	if err := r.archive.Archive(context.Background(), ns, name); err != nil {
   208  		r.Log.Error(err, "failed to archive experiment", "namespace", ns, "name", name)
   209  		return err
   210  	}
   211  
   212  	return nil
   213  }
   214