...
1
2
3
4
5
6
7
8
9
10
11
12
13
14 package podiochaosmanager
15
16 import (
17 "context"
18 "errors"
19
20 "github.com/go-logr/logr"
21 "golang.org/x/sync/errgroup"
22 v1 "k8s.io/api/core/v1"
23 k8sError "k8s.io/apimachinery/pkg/api/errors"
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/apimachinery/pkg/types"
26 "k8s.io/client-go/util/retry"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28
29 "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
30 )
31
32 var (
33
34 ErrPodNotFound = errors.New("pod not found")
35
36
37
38 ErrPodNotRunning = errors.New("pod not running")
39 )
40
41
42 type PodIoManager struct {
43 Source string
44 Log logr.Logger
45 client.Client
46
47 Modifications map[types.NamespacedName]*PodIoTransaction
48 }
49
50
51 func New(source string, logger logr.Logger, client client.Client) *PodIoManager {
52 return &PodIoManager{
53 Source: source,
54 Log: logger,
55 Client: client,
56 Modifications: make(map[types.NamespacedName]*PodIoTransaction),
57 }
58 }
59
60
61 func (m *PodIoManager) WithInit(key types.NamespacedName) *PodIoTransaction {
62 t, ok := m.Modifications[key]
63 if ok {
64 return t
65 }
66
67 t = &PodIoTransaction{}
68 t.Clear(m.Source)
69 m.Modifications[key] = t
70 return t
71 }
72
73
74 type CommitResponse struct {
75 Key types.NamespacedName
76 Err error
77 }
78
79
80 func (m *PodIoManager) Commit(ctx context.Context) []CommitResponse {
81 g := errgroup.Group{}
82 results := make([]CommitResponse, len(m.Modifications))
83 index := 0
84 for key, t := range m.Modifications {
85 i := index
86
87 key := key
88 t := t
89 g.Go(func() error {
90 m.Log.Info("running modification on pod", "key", key, "modification", t)
91 updateError := retry.RetryOnConflict(retry.DefaultRetry, func() error {
92 chaos := &v1alpha1.PodIoChaos{}
93
94 err := m.Client.Get(ctx, key, chaos)
95 if err != nil {
96 if !k8sError.IsNotFound(err) {
97 m.Log.Error(err, "error while getting podiochaos")
98 return err
99 }
100
101 pod := v1.Pod{}
102 err = m.Client.Get(ctx, key, &pod)
103 if err != nil {
104 if !k8sError.IsNotFound(err) {
105 m.Log.Error(err, "error while finding pod")
106 return err
107 }
108
109 m.Log.Info("pod not found", "key", key, "error", err.Error())
110 err = ErrPodNotFound
111 return err
112 }
113
114 if pod.Status.Phase != v1.PodRunning {
115 m.Log.Info("pod is not running", "key", key)
116 err = ErrPodNotRunning
117 return err
118 }
119
120 chaos.Name = key.Name
121 chaos.Namespace = key.Namespace
122 chaos.OwnerReferences = []metav1.OwnerReference{
123 {
124 APIVersion: pod.APIVersion,
125 Kind: pod.Kind,
126 Name: pod.Name,
127 UID: pod.UID,
128 },
129 }
130 err = m.Client.Create(ctx, chaos)
131
132 if err != nil {
133 m.Log.Error(err, "error while creating podiochaos")
134 return err
135 }
136 }
137
138 err = t.Apply(chaos)
139 if err != nil {
140 m.Log.Error(err, "error while applying transactions", "transaction", t)
141 return err
142 }
143
144 return m.Client.Update(ctx, chaos)
145 })
146
147 results[i] = CommitResponse{
148 Key: key,
149 Err: updateError,
150 }
151
152 return nil
153 })
154
155 index++
156 }
157
158 g.Wait()
159
160 return results
161 }
162