...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/collector/collector.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/collector

     1  // Copyright 2019 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package collector
    15  
    16  import (
    17  	"context"
    18  	"encoding/json"
    19  	"errors"
    20  	"fmt"
    21  
    22  	"github.com/go-logr/logr"
    23  	"github.com/jinzhu/gorm"
    24  
    25  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    26  	"github.com/chaos-mesh/chaos-mesh/pkg/core"
    27  
    28  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  	ctrl "sigs.k8s.io/controller-runtime"
    32  	"sigs.k8s.io/controller-runtime/pkg/client"
    33  )
    34  
    35  // ChaosCollector represents a collector for Chaos Object.
    36  type ChaosCollector struct {
    37  	client.Client
    38  	Log     logr.Logger
    39  	apiType runtime.Object
    40  	archive core.ExperimentStore
    41  	event   core.EventStore
    42  }
    43  
    44  // Reconcile reconciles a chaos collector.
    45  func (r *ChaosCollector) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    46  	if r.apiType == nil {
    47  		r.Log.Error(nil, "apiType has not been initialized")
    48  		return ctrl.Result{}, nil
    49  	}
    50  	ctx := context.Background()
    51  
    52  	obj, ok := r.apiType.DeepCopyObject().(v1alpha1.InnerObject)
    53  	if !ok {
    54  		r.Log.Error(nil, "it's not a stateful object")
    55  		return ctrl.Result{}, nil
    56  	}
    57  
    58  	err := r.Get(ctx, req.NamespacedName, obj)
    59  	if apierrors.IsNotFound(err) {
    60  		if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
    61  			r.Log.Error(err, "failed to archive experiment")
    62  		}
    63  		return ctrl.Result{}, nil
    64  	}
    65  
    66  	if err != nil {
    67  		r.Log.Error(err, "failed to get chaos object", "request", req.NamespacedName)
    68  		return ctrl.Result{}, nil
    69  	}
    70  
    71  	if obj.IsDeleted() {
    72  		if err = r.archiveExperiment(req.Namespace, req.Name); err != nil {
    73  			r.Log.Error(err, "failed to archive experiment")
    74  		}
    75  		return ctrl.Result{}, nil
    76  	}
    77  
    78  	if err := r.setUnarchivedExperiment(req, obj); err != nil {
    79  		r.Log.Error(err, "failed to archive experiment")
    80  		// ignore error here
    81  	}
    82  
    83  	if err := r.recordEvent(req, obj); err != nil {
    84  		r.Log.Error(err, "failed to record event")
    85  	}
    86  
    87  	return ctrl.Result{}, nil
    88  }
    89  
    90  // Setup setups collectors by Manager.
    91  func (r *ChaosCollector) Setup(mgr ctrl.Manager, apiType runtime.Object) error {
    92  	r.apiType = apiType
    93  
    94  	return ctrl.NewControllerManagedBy(mgr).
    95  		For(apiType).
    96  		Complete(r)
    97  }
    98  
    99  func (r *ChaosCollector) recordEvent(req ctrl.Request, obj v1alpha1.InnerObject) error {
   100  	var (
   101  		chaosMeta metav1.Object
   102  		ok        bool
   103  	)
   104  
   105  	if chaosMeta, ok = obj.(metav1.Object); !ok {
   106  		return errors.New("failed to get chaos meta information")
   107  	}
   108  
   109  	UID := chaosMeta.GetUID()
   110  	status := obj.GetStatus()
   111  	kind := obj.GetObjectKind().GroupVersionKind().Kind
   112  
   113  	switch status.Experiment.Phase {
   114  	case v1alpha1.ExperimentPhaseRunning:
   115  		return r.createEvent(req, kind, status, string(UID))
   116  	case v1alpha1.ExperimentPhaseFinished, v1alpha1.ExperimentPhasePaused, v1alpha1.ExperimentPhaseWaiting:
   117  		return r.updateOrCreateEvent(req, kind, status, string(UID))
   118  	}
   119  
   120  	return nil
   121  }
   122  
   123  func (r *ChaosCollector) createEvent(req ctrl.Request, kind string, status *v1alpha1.ChaosStatus, UID string) error {
   124  	if status.Experiment.StartTime == nil {
   125  		r.Log.Info("failed to create event, because experiment startTime is empty")
   126  		return fmt.Errorf("failed to create event, because experiment startTime is empty")
   127  	}
   128  
   129  	event := &core.Event{
   130  		Experiment:   req.Name,
   131  		Namespace:    req.Namespace,
   132  		Kind:         kind,
   133  		StartTime:    &status.Experiment.StartTime.Time,
   134  		ExperimentID: UID,
   135  		// TODO: add state for each event
   136  		Message: status.FailedMessage,
   137  	}
   138  
   139  	if _, err := r.event.FindByExperimentAndStartTime(
   140  		context.Background(), event.Experiment, event.Namespace, event.StartTime); err == nil {
   141  		r.Log.Info("event has been created")
   142  		return nil
   143  	}
   144  
   145  	for _, pod := range status.Experiment.PodRecords {
   146  		podRecord := &core.PodRecord{
   147  			EventID:   event.ID,
   148  			PodIP:     pod.PodIP,
   149  			PodName:   pod.Name,
   150  			Namespace: pod.Namespace,
   151  			Message:   pod.Message,
   152  			Action:    pod.Action,
   153  		}
   154  		event.Pods = append(event.Pods, podRecord)
   155  	}
   156  	if err := r.event.Create(context.Background(), event); err != nil {
   157  		r.Log.Error(err, "failed to store event", "event", event)
   158  		return err
   159  	}
   160  
   161  	return nil
   162  }
   163  
   164  func (r *ChaosCollector) updateOrCreateEvent(req ctrl.Request, kind string, status *v1alpha1.ChaosStatus, UID string) error {
   165  	if status.Experiment.StartTime == nil || status.Experiment.EndTime == nil {
   166  		return fmt.Errorf("failed to get experiment time, startTime or endTime is empty")
   167  	}
   168  
   169  	event := &core.Event{
   170  		Experiment:   req.Name,
   171  		Namespace:    req.Namespace,
   172  		Kind:         kind,
   173  		StartTime:    &status.Experiment.StartTime.Time,
   174  		FinishTime:   &status.Experiment.EndTime.Time,
   175  		Duration:     status.Experiment.Duration,
   176  		ExperimentID: UID,
   177  	}
   178  
   179  	if _, err := r.event.FindByExperimentAndStartTime(
   180  		context.Background(), event.Experiment, event.Namespace, event.StartTime); err != nil && gorm.IsRecordNotFoundError(err) {
   181  		if err := r.createEvent(req, kind, status, UID); err != nil {
   182  			return err
   183  		}
   184  	}
   185  
   186  	if err := r.event.Update(context.Background(), event); err != nil {
   187  		r.Log.Error(err, "failed to update event", "event", event)
   188  		return err
   189  	}
   190  
   191  	return nil
   192  }
   193  
   194  func (r *ChaosCollector) setUnarchivedExperiment(req ctrl.Request, obj v1alpha1.InnerObject) error {
   195  	var (
   196  		chaosMeta metav1.Object
   197  		ok        bool
   198  	)
   199  
   200  	if chaosMeta, ok = obj.(metav1.Object); !ok {
   201  		r.Log.Error(nil, "failed to get chaos meta information")
   202  	}
   203  	UID := string(chaosMeta.GetUID())
   204  
   205  	archive := &core.Experiment{
   206  		ExperimentMeta: core.ExperimentMeta{
   207  			Namespace: req.Namespace,
   208  			Name:      req.Name,
   209  			Kind:      obj.GetObjectKind().GroupVersionKind().Kind,
   210  			UID:       UID,
   211  			Archived:  false,
   212  		},
   213  	}
   214  
   215  	switch chaos := obj.(type) {
   216  	case *v1alpha1.PodChaos:
   217  		archive.Action = string(chaos.Spec.Action)
   218  	case *v1alpha1.NetworkChaos:
   219  		archive.Action = string(chaos.Spec.Action)
   220  	case *v1alpha1.IoChaos:
   221  		archive.Action = string(chaos.Spec.Action)
   222  	case *v1alpha1.TimeChaos, *v1alpha1.KernelChaos, *v1alpha1.StressChaos:
   223  		archive.Action = ""
   224  	case *v1alpha1.DNSChaos:
   225  		archive.Action = string(chaos.Spec.Action)
   226  	default:
   227  		return errors.New("unsupported chaos type " + archive.Kind)
   228  	}
   229  
   230  	archive.StartTime = chaosMeta.GetCreationTimestamp().Time
   231  	if chaosMeta.GetDeletionTimestamp() != nil {
   232  		archive.FinishTime = chaosMeta.GetDeletionTimestamp().Time
   233  	}
   234  
   235  	data, err := json.Marshal(chaosMeta)
   236  	if err != nil {
   237  		r.Log.Error(err, "failed to marshal chaos", "kind", archive.Kind,
   238  			"namespace", archive.Namespace, "name", archive.Name)
   239  		return err
   240  	}
   241  
   242  	archive.Experiment = string(data)
   243  
   244  	find, err := r.archive.FindByUID(context.Background(), UID)
   245  	if err != nil && !gorm.IsRecordNotFoundError(err) {
   246  		r.Log.Error(err, "failed to find experiment", "UID", UID)
   247  		return err
   248  	}
   249  
   250  	if find != nil {
   251  		archive.ID = find.ID
   252  		archive.CreatedAt = find.CreatedAt
   253  		archive.UpdatedAt = find.UpdatedAt
   254  	}
   255  
   256  	if err := r.archive.Set(context.Background(), archive); err != nil {
   257  		r.Log.Error(err, "failed to update experiment", "archive", archive)
   258  		return err
   259  	}
   260  
   261  	return nil
   262  }
   263  
   264  func (r *ChaosCollector) archiveExperiment(ns, name string) error {
   265  	if err := r.event.UpdateIncompleteEvents(context.Background(), ns, name); err != nil {
   266  		r.Log.Error(err, "failed to update incomplete events", "namespace", ns, "name", name)
   267  		return err
   268  	}
   269  
   270  	if err := r.archive.Archive(context.Background(), ns, name); err != nil {
   271  		r.Log.Error(err, "failed to archive experiment", "namespace", ns, "name", name)
   272  		return err
   273  	}
   274  
   275  	return nil
   276  }
   277