1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package bpm
17
18 import (
19 "fmt"
20 "io"
21 "strings"
22 "sync"
23 "time"
24
25 . "github.com/onsi/ginkgo"
26 . "github.com/onsi/gomega"
27 "go.uber.org/zap/buffer"
28 )
29
30 var _ = Describe("internal buffer", func() {
31 var testLines = map[string]bool{
32 "a": true,
33 "ab": true,
34 "abc": true,
35 "abcd": true,
36 "abcde": true,
37 "b": true,
38 "bc": true,
39 "bcd": true,
40 "bcde": true,
41 }
42
43 const seperator = "\r\n"
44 const repeated = 100
45 const workers = 10
46 const testTimes = 10
47
48 testSequentially := func() {
49 linesChan := makeLinesChain(testLines, repeated)
50 buffer := newInternalBuffer()
51 Expect(writeBuffer(linesChan, buffer)).To(BeNil())
52 result, err := readTimeout(buffer, time.Second)
53 Expect(err).To(BeNil())
54 check(testLines, result, seperator, repeated)
55 }
56
57 testConcurrently := func() {
58 linesChan := makeLinesChain(testLines, repeated)
59 buffer := newInternalBuffer()
60
61 wg := sync.WaitGroup{}
62 for i := 0; i < workers; i++ {
63 wg.Add(1)
64 go func() {
65 defer GinkgoRecover()
66 Expect(writeBuffer(linesChan, buffer)).To(BeNil())
67 wg.Done()
68 }()
69 }
70 wg.Wait()
71 result, err := readTimeout(buffer, time.Second)
72 Expect(err).To(BeNil())
73 check(testLines, result, seperator, repeated)
74 }
75
76 multiple := func(fn func()) func() {
77 return func() {
78 wg := sync.WaitGroup{}
79 for i := 0; i < testTimes; i++ {
80 wg.Add(1)
81 go func() {
82 defer GinkgoRecover()
83 fn()
84 wg.Done()
85 }()
86 }
87 wg.Wait()
88 }
89 }
90
91 Context("sequential write and read", func() {
92 It("normal", testSequentially)
93 It("multiple times", multiple(testSequentially))
94 })
95 Context("concurrent write and read", func() {
96 It("normal", testConcurrently)
97 It("multiple times", multiple(testConcurrently))
98 })
99 })
100
101 func makeLinesChain(lines map[string]bool, repeated int) <-chan string {
102 linesChan := make(chan string, len(lines)*repeated)
103 for i := 0; i < repeated; i++ {
104 for line := range lines {
105 l := line
106 linesChan <- l
107 }
108 }
109 close(linesChan)
110 return linesChan
111 }
112
113 func writeBuffer(lines <-chan string, buffer io.Writer) error {
114 line, ok := <-lines
115 for ok {
116 _, err := buffer.Write([]byte(fmt.Sprintf("%s\r\n", line)))
117 if err != nil {
118 return err
119 }
120 line, ok = <-lines
121 }
122 return nil
123 }
124
125 func check(lines map[string]bool, result []byte, seperator string, repeated int) {
126 resultMap := make(map[string]int)
127 for _, line := range strings.Split(strings.TrimRight(string(result), "\r\n"), seperator) {
128 resultMap[line]++
129 }
130
131 for line := range lines {
132 Expect(resultMap[line]).To(Equal(repeated))
133 }
134
135 for line := range resultMap {
136 Expect(lines[line]).To(BeTrue())
137 }
138 }
139
140 func readTimeout(reader io.Reader, timeout time.Duration) ([]byte, error) {
141 errChan := make(chan error)
142 buffer := buffer.Buffer{}
143 go func() {
144 var err error
145 var ln int
146 chunk := make([]byte, 2)
147 for {
148 ln, err = reader.Read(chunk)
149 if err != nil {
150 break
151 }
152 _, err = buffer.Write(chunk[:ln])
153 if err != nil {
154 break
155 }
156 }
157 if err == io.EOF {
158 return
159 }
160 errChan <- err
161 }()
162
163 select {
164 case err := <-errChan:
165 return nil, err
166 case <-time.Tick(timeout):
167 return buffer.Bytes(), nil
168 }
169 }
170