...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/bpm/buffer_test.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/bpm

     1  // Copyright 2021 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  // http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package bpm
    17  
    18  import (
    19  	"fmt"
    20  	"io"
    21  	"strings"
    22  	"sync"
    23  	"time"
    24  
    25  	. "github.com/onsi/ginkgo"
    26  	. "github.com/onsi/gomega"
    27  	"go.uber.org/zap/buffer"
    28  )
    29  
    30  var _ = Describe("internal buffer", func() {
    31  	var testLines = map[string]bool{
    32  		"a":     true,
    33  		"ab":    true,
    34  		"abc":   true,
    35  		"abcd":  true,
    36  		"abcde": true,
    37  		"b":     true,
    38  		"bc":    true,
    39  		"bcd":   true,
    40  		"bcde":  true,
    41  	}
    42  
    43  	const seperator = "\r\n"
    44  	const repeated = 100
    45  	const workers = 10
    46  	const testTimes = 10
    47  
    48  	testSequentially := func() {
    49  		linesChan := makeLinesChain(testLines, repeated)
    50  		buffer := newInternalBuffer()
    51  		Expect(writeBuffer(linesChan, buffer)).To(BeNil())
    52  		result, err := readTimeout(buffer, time.Second)
    53  		Expect(err).To(BeNil())
    54  		check(testLines, result, seperator, repeated)
    55  	}
    56  
    57  	testConcurrently := func() {
    58  		linesChan := makeLinesChain(testLines, repeated)
    59  		buffer := newInternalBuffer()
    60  
    61  		wg := sync.WaitGroup{}
    62  		for i := 0; i < workers; i++ {
    63  			wg.Add(1)
    64  			go func() {
    65  				defer GinkgoRecover()
    66  				Expect(writeBuffer(linesChan, buffer)).To(BeNil())
    67  				wg.Done()
    68  			}()
    69  		}
    70  		wg.Wait()
    71  		result, err := readTimeout(buffer, time.Second)
    72  		Expect(err).To(BeNil())
    73  		check(testLines, result, seperator, repeated)
    74  	}
    75  
    76  	multiple := func(fn func()) func() {
    77  		return func() {
    78  			wg := sync.WaitGroup{}
    79  			for i := 0; i < testTimes; i++ {
    80  				wg.Add(1)
    81  				go func() {
    82  					defer GinkgoRecover()
    83  					fn()
    84  					wg.Done()
    85  				}()
    86  			}
    87  			wg.Wait()
    88  		}
    89  	}
    90  
    91  	Context("sequential write and read", func() {
    92  		It("normal", testSequentially)
    93  		It("multiple times", multiple(testSequentially))
    94  	})
    95  	Context("concurrent write and read", func() {
    96  		It("normal", testConcurrently)
    97  		It("multiple times", multiple(testConcurrently))
    98  	})
    99  })
   100  
   101  func makeLinesChain(lines map[string]bool, repeated int) <-chan string {
   102  	linesChan := make(chan string, len(lines)*repeated)
   103  	for i := 0; i < repeated; i++ {
   104  		for line := range lines {
   105  			l := line
   106  			linesChan <- l
   107  		}
   108  	}
   109  	close(linesChan)
   110  	return linesChan
   111  }
   112  
   113  func writeBuffer(lines <-chan string, buffer io.Writer) error {
   114  	line, ok := <-lines
   115  	for ok {
   116  		_, err := buffer.Write([]byte(fmt.Sprintf("%s\r\n", line)))
   117  		if err != nil {
   118  			return err
   119  		}
   120  		line, ok = <-lines
   121  	}
   122  	return nil
   123  }
   124  
   125  func check(lines map[string]bool, result []byte, seperator string, repeated int) {
   126  	resultMap := make(map[string]int)
   127  	for _, line := range strings.Split(strings.TrimRight(string(result), "\r\n"), seperator) {
   128  		resultMap[line]++
   129  	}
   130  
   131  	for line := range lines {
   132  		Expect(resultMap[line]).To(Equal(repeated))
   133  	}
   134  
   135  	for line := range resultMap {
   136  		Expect(lines[line]).To(BeTrue())
   137  	}
   138  }
   139  
   140  func readTimeout(reader io.Reader, timeout time.Duration) ([]byte, error) {
   141  	errChan := make(chan error)
   142  	buffer := buffer.Buffer{}
   143  	go func() {
   144  		var err error
   145  		var ln int
   146  		chunk := make([]byte, 2)
   147  		for {
   148  			ln, err = reader.Read(chunk)
   149  			if err != nil {
   150  				break
   151  			}
   152  			_, err = buffer.Write(chunk[:ln])
   153  			if err != nil {
   154  				break
   155  			}
   156  		}
   157  		if err == io.EOF {
   158  			return
   159  		}
   160  		errChan <- err
   161  	}()
   162  
   163  	select {
   164  	case err := <-errChan:
   165  		return nil, err
   166  	case <-time.Tick(timeout):
   167  		return buffer.Bytes(), nil
   168  	}
   169  }
   170