...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package bpm
17
18 import (
19 "bytes"
20 "io"
21 "sync"
22
23 "go.uber.org/atomic"
24 )
25
26 type blockingBuffer struct {
27 buf io.ReadWriteCloser
28
29 cond *sync.Cond
30 closed *atomic.Bool
31 }
32
33 func NewBlockingBuffer() io.ReadWriteCloser {
34 m := sync.Mutex{}
35 return &blockingBuffer{
36 cond: sync.NewCond(&m),
37 buf: newInternalBuffer(),
38 closed: atomic.NewBool(false),
39 }
40 }
41
42 func (br *blockingBuffer) Write(b []byte) (ln int, err error) {
43 if br.closed.Load() {
44 return 0, nil
45 }
46 ln, err = br.buf.Write(b)
47
48 br.cond.Broadcast()
49 return
50 }
51
52 func (br *blockingBuffer) Read(b []byte) (ln int, err error) {
53 if br.closed.Load() {
54 return 0, io.EOF
55 }
56 ln, err = br.buf.Read(b)
57
58 for err == io.EOF {
59 br.cond.L.Lock()
60 if br.closed.Load() {
61 return 0, io.EOF
62 }
63 br.cond.Wait()
64 br.cond.L.Unlock()
65
66 ln, err = br.buf.Read(b)
67 }
68 return
69 }
70
71 func (br *blockingBuffer) Close() error {
72 br.closed.Store(true)
73
74 br.cond.Broadcast()
75
76 br.buf.Close()
77 return nil
78 }
79
80
81 type internalBuffer struct {
82 buf bytes.Buffer
83 mutex sync.Mutex
84 }
85
86 func newInternalBuffer() io.ReadWriteCloser {
87 buffer := &internalBuffer{
88 buf: bytes.Buffer{},
89 mutex: sync.Mutex{},
90 }
91
92 return buffer
93 }
94
95 func (cb *internalBuffer) Write(buf []byte) (ln int, err error) {
96 cb.mutex.Lock()
97 defer cb.mutex.Unlock()
98
99 return cb.buf.Write(buf)
100 }
101
102 func (cb *internalBuffer) Read(buf []byte) (ln int, err error) {
103 cb.mutex.Lock()
104 defer cb.mutex.Unlock()
105
106 return cb.buf.Read(buf)
107 }
108
109 func (cb *internalBuffer) Close() error {
110 return nil
111 }
112