1
2
3
4
5
6
7
8
9
10
11
12
13
14 package bpm
15
16 import (
17 "fmt"
18 "io"
19 "strings"
20 "sync"
21 "time"
22
23 . "github.com/onsi/ginkgo"
24 . "github.com/onsi/gomega"
25 "go.uber.org/zap/buffer"
26 )
27
28 var _ = Describe("internal buffer", func() {
29 var testLines = map[string]bool{
30 "a": true,
31 "ab": true,
32 "abc": true,
33 "abcd": true,
34 "abcde": true,
35 "b": true,
36 "bc": true,
37 "bcd": true,
38 "bcde": true,
39 }
40
41 const seperator = "\r\n"
42 const repeated = 100
43 const workers = 10
44 const testTimes = 10
45
46 testSequentially := func() {
47 linesChan := makeLinesChain(testLines, repeated)
48 buffer := newInternalBuffer()
49 Expect(writeBuffer(linesChan, buffer)).To(BeNil())
50 result, err := readTimeout(buffer, time.Second)
51 Expect(err).To(BeNil())
52 check(testLines, result, seperator, repeated)
53 }
54
55 testConcurrently := func() {
56 linesChan := makeLinesChain(testLines, repeated)
57 buffer := newInternalBuffer()
58
59 wg := sync.WaitGroup{}
60 for i := 0; i < workers; i++ {
61 wg.Add(1)
62 go func() {
63 defer GinkgoRecover()
64 Expect(writeBuffer(linesChan, buffer)).To(BeNil())
65 wg.Done()
66 }()
67 }
68 wg.Wait()
69 result, err := readTimeout(buffer, time.Second)
70 Expect(err).To(BeNil())
71 check(testLines, result, seperator, repeated)
72 }
73
74 multiple := func(fn func()) func() {
75 return func() {
76 wg := sync.WaitGroup{}
77 for i := 0; i < testTimes; i++ {
78 wg.Add(1)
79 go func() {
80 defer GinkgoRecover()
81 fn()
82 wg.Done()
83 }()
84 }
85 wg.Wait()
86 }
87 }
88
89 Context("sequential write and read", func() {
90 It("normal", testSequentially)
91 It("multiple times", multiple(testSequentially))
92 })
93 Context("concurrent write and read", func() {
94 It("normal", testConcurrently)
95 It("multiple times", multiple(testConcurrently))
96 })
97 })
98
99 func makeLinesChain(lines map[string]bool, repeated int) <-chan string {
100 linesChan := make(chan string, len(lines)*repeated)
101 for i := 0; i < repeated; i++ {
102 for line := range lines {
103 l := line
104 linesChan <- l
105 }
106 }
107 close(linesChan)
108 return linesChan
109 }
110
111 func writeBuffer(lines <-chan string, buffer io.Writer) error {
112 line, ok := <-lines
113 for ok {
114 _, err := buffer.Write([]byte(fmt.Sprintf("%s\r\n", line)))
115 if err != nil {
116 return err
117 }
118 line, ok = <-lines
119 }
120 return nil
121 }
122
123 func check(lines map[string]bool, result []byte, seperator string, repeated int) {
124 resultMap := make(map[string]int)
125 for _, line := range strings.Split(strings.TrimRight(string(result), "\r\n"), seperator) {
126 resultMap[line]++
127 }
128
129 for line := range lines {
130 Expect(resultMap[line]).To(Equal(repeated))
131 }
132
133 for line := range resultMap {
134 Expect(lines[line]).To(BeTrue())
135 }
136 }
137
138 func readTimeout(reader io.Reader, timeout time.Duration) ([]byte, error) {
139 errChan := make(chan error)
140 buffer := buffer.Buffer{}
141 go func() {
142 var err error
143 var ln int
144 chunk := make([]byte, 2)
145 for {
146 ln, err = reader.Read(chunk)
147 if err != nil {
148 break
149 }
150 _, err = buffer.Write(chunk[:ln])
151 if err != nil {
152 break
153 }
154 }
155 if err == io.EOF {
156 return
157 }
158 errChan <- err
159 }()
160
161 select {
162 case err := <-errChan:
163 return nil, err
164 case <-time.Tick(timeout):
165 return buffer.Bytes(), nil
166 }
167 }
168