...

Source file src/github.com/chaos-mesh/chaos-mesh/pkg/bpm/buffer_test.go

Documentation: github.com/chaos-mesh/chaos-mesh/pkg/bpm

     1  // Copyright 2020 Chaos Mesh Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package bpm
    15  
    16  import (
    17  	"fmt"
    18  	"io"
    19  	"strings"
    20  	"sync"
    21  	"time"
    22  
    23  	. "github.com/onsi/ginkgo"
    24  	. "github.com/onsi/gomega"
    25  	"go.uber.org/zap/buffer"
    26  )
    27  
    28  var _ = Describe("internal buffer", func() {
    29  	var testLines = map[string]bool{
    30  		"a":     true,
    31  		"ab":    true,
    32  		"abc":   true,
    33  		"abcd":  true,
    34  		"abcde": true,
    35  		"b":     true,
    36  		"bc":    true,
    37  		"bcd":   true,
    38  		"bcde":  true,
    39  	}
    40  
    41  	const seperator = "\r\n"
    42  	const repeated = 100
    43  	const workers = 10
    44  	const testTimes = 10
    45  
    46  	testSequentially := func() {
    47  		linesChan := makeLinesChain(testLines, repeated)
    48  		buffer := newInternalBuffer()
    49  		Expect(writeBuffer(linesChan, buffer)).To(BeNil())
    50  		result, err := readTimeout(buffer, time.Second)
    51  		Expect(err).To(BeNil())
    52  		check(testLines, result, seperator, repeated)
    53  	}
    54  
    55  	testConcurrently := func() {
    56  		linesChan := makeLinesChain(testLines, repeated)
    57  		buffer := newInternalBuffer()
    58  
    59  		wg := sync.WaitGroup{}
    60  		for i := 0; i < workers; i++ {
    61  			wg.Add(1)
    62  			go func() {
    63  				defer GinkgoRecover()
    64  				Expect(writeBuffer(linesChan, buffer)).To(BeNil())
    65  				wg.Done()
    66  			}()
    67  		}
    68  		wg.Wait()
    69  		result, err := readTimeout(buffer, time.Second)
    70  		Expect(err).To(BeNil())
    71  		check(testLines, result, seperator, repeated)
    72  	}
    73  
    74  	multiple := func(fn func()) func() {
    75  		return func() {
    76  			wg := sync.WaitGroup{}
    77  			for i := 0; i < testTimes; i++ {
    78  				wg.Add(1)
    79  				go func() {
    80  					defer GinkgoRecover()
    81  					fn()
    82  					wg.Done()
    83  				}()
    84  			}
    85  			wg.Wait()
    86  		}
    87  	}
    88  
    89  	Context("sequential write and read", func() {
    90  		It("normal", testSequentially)
    91  		It("multiple times", multiple(testSequentially))
    92  	})
    93  	Context("concurrent write and read", func() {
    94  		It("normal", testConcurrently)
    95  		It("multiple times", multiple(testConcurrently))
    96  	})
    97  })
    98  
    99  func makeLinesChain(lines map[string]bool, repeated int) <-chan string {
   100  	linesChan := make(chan string, len(lines)*repeated)
   101  	for i := 0; i < repeated; i++ {
   102  		for line := range lines {
   103  			l := line
   104  			linesChan <- l
   105  		}
   106  	}
   107  	close(linesChan)
   108  	return linesChan
   109  }
   110  
   111  func writeBuffer(lines <-chan string, buffer io.Writer) error {
   112  	line, ok := <-lines
   113  	for ok {
   114  		_, err := buffer.Write([]byte(fmt.Sprintf("%s\r\n", line)))
   115  		if err != nil {
   116  			return err
   117  		}
   118  		line, ok = <-lines
   119  	}
   120  	return nil
   121  }
   122  
   123  func check(lines map[string]bool, result []byte, seperator string, repeated int) {
   124  	resultMap := make(map[string]int)
   125  	for _, line := range strings.Split(strings.TrimRight(string(result), "\r\n"), seperator) {
   126  		resultMap[line]++
   127  	}
   128  
   129  	for line := range lines {
   130  		Expect(resultMap[line]).To(Equal(repeated))
   131  	}
   132  
   133  	for line := range resultMap {
   134  		Expect(lines[line]).To(BeTrue())
   135  	}
   136  }
   137  
   138  func readTimeout(reader io.Reader, timeout time.Duration) ([]byte, error) {
   139  	errChan := make(chan error)
   140  	buffer := buffer.Buffer{}
   141  	go func() {
   142  		var err error
   143  		var ln int
   144  		chunk := make([]byte, 2)
   145  		for {
   146  			ln, err = reader.Read(chunk)
   147  			if err != nil {
   148  				break
   149  			}
   150  			_, err = buffer.Write(chunk[:ln])
   151  			if err != nil {
   152  				break
   153  			}
   154  		}
   155  		if err == io.EOF {
   156  			return
   157  		}
   158  		errChan <- err
   159  	}()
   160  
   161  	select {
   162  	case err := <-errChan:
   163  		return nil, err
   164  	case <-time.Tick(timeout):
   165  		return buffer.Bytes(), nil
   166  	}
   167  }
   168