...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/collector/schedule_collector.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/collector

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package collector
    15  
    16  import (
    17  	"context"
    18  	"encoding/json"
    19  	"errors"
    20  
    21  	"github.com/go-logr/logr"
    22  	"github.com/jinzhu/gorm"
    23  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    24  	"k8s.io/apimachinery/pkg/runtime"
    25  	ctrl "sigs.k8s.io/controller-runtime"
    26  	"sigs.k8s.io/controller-runtime/pkg/client"
    27  
    28  	"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
    29  	"github.com/chaos-mesh/chaos-mesh/pkg/core"
    30  )
    31  
    32  // ScheduleCollector represents a collector for Schedule Object.
    33  type ScheduleCollector struct {
    34  	client.Client
    35  	Log     logr.Logger
    36  	apiType runtime.Object
    37  	archive core.ScheduleStore
    38  }
    39  
    40  // Reconcile reconciles a Schedule collector.
    41  func (r *ScheduleCollector) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    42  	if r.apiType == nil {
    43  		r.Log.Error(nil, "apiType has not been initialized")
    44  		return ctrl.Result{}, nil
    45  	}
    46  	ctx := context.Background()
    47  
    48  	schedule := &v1alpha1.Schedule{}
    49  	err := r.Get(ctx, req.NamespacedName, schedule)
    50  	if apierrors.IsNotFound(err) {
    51  		if err = r.archiveSchedule(req.Namespace, req.Name); err != nil {
    52  			r.Log.Error(err, "failed to archive schedule")
    53  		}
    54  		return ctrl.Result{}, nil
    55  	}
    56  	if err != nil {
    57  		r.Log.Error(err, "failed to get schedule object", "request", req.NamespacedName)
    58  		return ctrl.Result{}, nil
    59  	}
    60  
    61  	if !schedule.DeletionTimestamp.IsZero() {
    62  		if err = r.archiveSchedule(req.Namespace, req.Name); err != nil {
    63  			r.Log.Error(err, "failed to archive schedule")
    64  		}
    65  		return ctrl.Result{}, nil
    66  	}
    67  
    68  	if err := r.setUnarchivedSchedule(req, *schedule); err != nil {
    69  		r.Log.Error(err, "failed to archive schedule")
    70  		// ignore error here
    71  	}
    72  
    73  	return ctrl.Result{}, nil
    74  }
    75  
    76  // Setup setups collectors by Manager.
    77  func (r *ScheduleCollector) Setup(mgr ctrl.Manager, apiType runtime.Object) error {
    78  	r.apiType = apiType
    79  
    80  	return ctrl.NewControllerManagedBy(mgr).
    81  		For(apiType).
    82  		Complete(r)
    83  }
    84  
    85  func (r *ScheduleCollector) setUnarchivedSchedule(req ctrl.Request, schedule v1alpha1.Schedule) error {
    86  	archive := &core.Schedule{
    87  		ScheduleMeta: core.ScheduleMeta{
    88  			Namespace: req.Namespace,
    89  			Name:      req.Name,
    90  			Kind:      schedule.Kind,
    91  			UID:       string(schedule.UID),
    92  			Archived:  false,
    93  		},
    94  	}
    95  
    96  	switch schedule.Spec.Type {
    97  	case v1alpha1.ScheduleTypePodChaos:
    98  		archive.Action = string(schedule.Spec.ScheduleItem.PodChaos.Action)
    99  	case v1alpha1.ScheduleTypeNetworkChaos:
   100  		archive.Action = string(schedule.Spec.ScheduleItem.NetworkChaos.Action)
   101  	case v1alpha1.ScheduleTypeIOChaos:
   102  		archive.Action = string(schedule.Spec.ScheduleItem.IOChaos.Action)
   103  	case v1alpha1.ScheduleTypeTimeChaos, v1alpha1.ScheduleTypeKernelChaos, v1alpha1.ScheduleTypeStressChaos:
   104  		archive.Action = ""
   105  	case v1alpha1.ScheduleTypeDNSChaos:
   106  		archive.Action = string(schedule.Spec.ScheduleItem.DNSChaos.Action)
   107  	case v1alpha1.ScheduleTypeAWSChaos:
   108  		archive.Action = string(schedule.Spec.ScheduleItem.AWSChaos.Action)
   109  	case v1alpha1.ScheduleTypeGCPChaos:
   110  		archive.Action = string(schedule.Spec.ScheduleItem.GCPChaos.Action)
   111  	default:
   112  		return errors.New("unsupported chaos type " + string(schedule.Spec.Type))
   113  	}
   114  
   115  	archive.StartTime = schedule.GetCreationTimestamp().Time
   116  	if schedule.GetDeletionTimestamp() != nil {
   117  		archive.FinishTime = schedule.GetDeletionTimestamp().Time
   118  	}
   119  
   120  	data, err := json.Marshal(schedule)
   121  	if err != nil {
   122  		r.Log.Error(err, "failed to marshal schedule", "kind", archive.Kind,
   123  			"namespace", archive.Namespace, "name", archive.Name)
   124  		return err
   125  	}
   126  
   127  	archive.Schedule = string(data)
   128  
   129  	find, err := r.archive.FindByUID(context.Background(), string(schedule.UID))
   130  	if err != nil && !gorm.IsRecordNotFoundError(err) {
   131  		r.Log.Error(err, "failed to find schedule", "UID", schedule.UID)
   132  		return err
   133  	}
   134  
   135  	if find != nil {
   136  		archive.ID = find.ID
   137  		archive.CreatedAt = find.CreatedAt
   138  		archive.UpdatedAt = find.UpdatedAt
   139  	}
   140  
   141  	if err := r.archive.Set(context.Background(), archive); err != nil {
   142  		r.Log.Error(err, "failed to update schedule", "archive", archive)
   143  		return err
   144  	}
   145  
   146  	return nil
   147  }
   148  
   149  func (r *ScheduleCollector) archiveSchedule(ns, name string) error {
   150  	if err := r.archive.Archive(context.Background(), ns, name); err != nil {
   151  		r.Log.Error(err, "failed to archive schedule", "namespace", ns, "name", name)
   152  		return err
   153  	}
   154  
   155  	return nil
   156  }
   157