...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/bpm/buffer.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/bpm

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package bpm
    17  
    18  import (
    19  	"bytes"
    20  	"io"
    21  	"sync"
    22  
    23  	"go.uber.org/atomic"
    24  )
    25  
    26  type blockingBuffer struct {
    27  	buf io.ReadWriteCloser
    28  
    29  	cond   *sync.Cond
    30  	closed *atomic.Bool
    31  }
    32  
    33  func NewBlockingBuffer() io.ReadWriteCloser {
    34  	m := sync.Mutex{}
    35  	return &blockingBuffer{
    36  		cond:   sync.NewCond(&m),
    37  		buf:    newInternalBuffer(),
    38  		closed: atomic.NewBool(false),
    39  	}
    40  }
    41  
    42  func (br *blockingBuffer) Write(b []byte) (ln int, err error) {
    43  	if br.closed.Load() {
    44  		return 0, nil
    45  	}
    46  	ln, err = br.buf.Write(b)
    47  
    48  	br.cond.Broadcast()
    49  	return
    50  }
    51  
    52  func (br *blockingBuffer) Read(b []byte) (ln int, err error) {
    53  	if br.closed.Load() {
    54  		return 0, io.EOF
    55  	}
    56  	ln, err = br.buf.Read(b)
    57  
    58  	for err == io.EOF {
    59  		br.cond.L.Lock()
    60  		if br.closed.Load() {
    61  			return 0, io.EOF
    62  		}
    63  		br.cond.Wait()
    64  		br.cond.L.Unlock()
    65  
    66  		ln, err = br.buf.Read(b)
    67  	}
    68  	return
    69  }
    70  
    71  func (br *blockingBuffer) Close() error {
    72  	br.closed.Store(true)
    73  
    74  	br.cond.Broadcast()
    75  
    76  	br.buf.Close()
    77  	return nil
    78  }
    79  
    80  // internalBuffer is a buffer with a lock
    81  type internalBuffer struct {
    82  	buf   bytes.Buffer
    83  	mutex sync.Mutex
    84  }
    85  
    86  func newInternalBuffer() io.ReadWriteCloser {
    87  	buffer := &internalBuffer{
    88  		buf:   bytes.Buffer{},
    89  		mutex: sync.Mutex{},
    90  	}
    91  
    92  	return buffer
    93  }
    94  
    95  func (cb *internalBuffer) Write(buf []byte) (ln int, err error) {
    96  	cb.mutex.Lock()
    97  	defer cb.mutex.Unlock()
    98  
    99  	return cb.buf.Write(buf)
   100  }
   101  
   102  func (cb *internalBuffer) Read(buf []byte) (ln int, err error) {
   103  	cb.mutex.Lock()
   104  	defer cb.mutex.Unlock()
   105  
   106  	return cb.buf.Read(buf)
   107  }
   108  
   109  func (cb *internalBuffer) Close() error {
   110  	return nil
   111  }
   112