...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package collector
17
18 import (
19 "context"
20
21 "github.com/go-logr/logr"
22 "github.com/jinzhu/gorm"
23 apierrors "k8s.io/apimachinery/pkg/api/errors"
24 "k8s.io/apimachinery/pkg/runtime"
25 ctrl "sigs.k8s.io/controller-runtime"
26 "sigs.k8s.io/controller-runtime/pkg/client"
27 "sigs.k8s.io/controller-runtime/pkg/reconcile"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 "github.com/chaos-mesh/chaos-mesh/pkg/dashboard/core"
31 )
32
33 type WorkflowCollector struct {
34 kubeClient client.Client
35 Log logr.Logger
36 apiType runtime.Object
37 store core.WorkflowStore
38 }
39
40 func (it *WorkflowCollector) Setup(mgr ctrl.Manager, apiType client.Object) error {
41 it.apiType = apiType
42
43 return ctrl.NewControllerManagedBy(mgr).
44 For(apiType).
45 Complete(it)
46 }
47
48 func (it *WorkflowCollector) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
49 if it.apiType == nil {
50 it.Log.Error(nil, "apiType has not been initialized")
51 return ctrl.Result{}, nil
52 }
53 workflow := v1alpha1.Workflow{}
54 err := it.kubeClient.Get(ctx, request.NamespacedName, &workflow)
55 if apierrors.IsNotFound(err) {
56
57 if err = it.markAsArchived(ctx, request.Namespace, request.Name); err != nil {
58 it.Log.Error(err, "failed to archive experiment")
59 }
60 return ctrl.Result{}, nil
61 }
62 if err != nil {
63 it.Log.Error(err, "failed to get workflow object", "request", request.NamespacedName)
64 return ctrl.Result{}, nil
65 }
66
67 if err := it.persistentWorkflow(&workflow); err != nil {
68 it.Log.Error(err, "failed to archive workflow")
69 }
70
71 return ctrl.Result{}, nil
72 }
73
74 func (it *WorkflowCollector) markAsArchived(ctx context.Context, namespace, name string) error {
75 return it.store.MarkAsArchived(ctx, namespace, name)
76 }
77
78 func (it *WorkflowCollector) persistentWorkflow(workflow *v1alpha1.Workflow) error {
79 newEntity, err := core.WorkflowCR2WorkflowEntity(workflow)
80 if err != nil {
81 return err
82 }
83
84 existedEntity, err := it.store.FindByUID(context.Background(), string(workflow.UID))
85 if err != nil && !gorm.IsRecordNotFoundError(err) {
86 it.Log.Error(err, "failed to find workflow", "UID", workflow.UID)
87 return err
88 }
89
90 if existedEntity != nil {
91 newEntity.ID = existedEntity.ID
92 }
93
94 err = it.store.Save(context.Background(), newEntity)
95 if err != nil {
96 it.Log.Error(err, "failed to update workflow", "archive", newEntity)
97 }
98 return err
99 }
100