...
1
2
3
4
5
6
7
8
9
10
11
12
13
14 package collector
15
16 import (
17 "context"
18
19 "github.com/go-logr/logr"
20 "github.com/jinzhu/gorm"
21 apierrors "k8s.io/apimachinery/pkg/api/errors"
22 "k8s.io/apimachinery/pkg/runtime"
23 ctrl "sigs.k8s.io/controller-runtime"
24 "sigs.k8s.io/controller-runtime/pkg/client"
25 "sigs.k8s.io/controller-runtime/pkg/reconcile"
26
27 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
28 "github.com/chaos-mesh/chaos-mesh/pkg/core"
29 )
30
31 type WorkflowCollector struct {
32 kubeClient client.Client
33 Log logr.Logger
34 apiType runtime.Object
35 store core.WorkflowStore
36 }
37
38 func (it *WorkflowCollector) Setup(mgr ctrl.Manager, apiType runtime.Object) error {
39 it.apiType = apiType
40
41 return ctrl.NewControllerManagedBy(mgr).
42 For(apiType).
43 Complete(it)
44 }
45
46 func (it *WorkflowCollector) Reconcile(request reconcile.Request) (reconcile.Result, error) {
47 if it.apiType == nil {
48 it.Log.Error(nil, "apiType has not been initialized")
49 return ctrl.Result{}, nil
50 }
51 ctx := context.Background()
52 workflow := v1alpha1.Workflow{}
53 err := it.kubeClient.Get(ctx, request.NamespacedName, &workflow)
54 if apierrors.IsNotFound(err) {
55
56 if err = it.markAsArchived(ctx, request.Namespace, request.Name); err != nil {
57 it.Log.Error(err, "failed to archive experiment")
58 }
59 return ctrl.Result{}, nil
60 }
61 if err != nil {
62 it.Log.Error(err, "failed to get workflow object", "request", request.NamespacedName)
63 return ctrl.Result{}, nil
64 }
65
66 if err := it.persistentWorkflow(&workflow); err != nil {
67 it.Log.Error(err, "failed to archive workflow")
68 }
69
70 return ctrl.Result{}, nil
71 }
72
73 func (it *WorkflowCollector) markAsArchived(ctx context.Context, namespace, name string) error {
74 return it.store.MarkAsArchived(ctx, namespace, name)
75 }
76
77 func (it *WorkflowCollector) persistentWorkflow(workflow *v1alpha1.Workflow) error {
78 newEntity, err := core.WorkflowCR2WorkflowEntity(workflow)
79 if err != nil {
80 return err
81 }
82
83 existedEntity, err := it.store.FindByUID(context.Background(), string(workflow.UID))
84 if err != nil && !gorm.IsRecordNotFoundError(err) {
85 it.Log.Error(err, "failed to find workflow", "UID", workflow.UID)
86 return err
87 }
88
89 if existedEntity != nil {
90 newEntity.ID = existedEntity.ID
91 }
92
93 err = it.store.Save(context.Background(), newEntity)
94 if err != nil {
95 it.Log.Error(err, "failed to update workflow", "archive", newEntity)
96 }
97 return err
98 }
99