diff options
Diffstat (limited to 'pipe')
| -rw-r--r-- | pipe/pipe.go | 64 | ||||
| -rw-r--r-- | pipe/pipe_test.go | 117 | 
2 files changed, 181 insertions, 0 deletions
| diff --git a/pipe/pipe.go b/pipe/pipe.go new file mode 100644 index 0000000..2ca1f8b --- /dev/null +++ b/pipe/pipe.go @@ -0,0 +1,64 @@ +package pipe + +import ( +	"sync" +) + +func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V { +	cout := make(chan V) +	var wg sync.WaitGroup + +	for i := 0; i < count; i++ { +		wg.Add(1) +		go func(<-chan T, chan<- V) { +			for i := range cin { +				cout <- proc(i) +			} +			wg.Done() +		}(cin, cout) +	} + +	go func() { +		wg.Wait() +		close(cout) +	}() + +	return cout +} + +func noop[T any](_ T) {} + +func Wait[T any](cin <-chan T) { +	for v := range cin { +		noop(v) +	} +} + +func TailProc[T any](cin <-chan T, count int, proc func(T)) { +	var wg sync.WaitGroup + +	for i := 0; i < 4; i++ { +		wg.Add(1) +		go func(<-chan T) { +			for i := range cin { +				proc(i) +			} +			wg.Done() + +		}(cin) +	} +	wg.Wait() +} + +func Yield[T any](in []T) <-chan T { +	cout := make(chan T) + +	go func(chan<- T) { +		for _, a := range in { +			cout <- a +		} +		close(cout) +	}(cout) + +	return cout +} diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go new file mode 100644 index 0000000..e71b05b --- /dev/null +++ b/pipe/pipe_test.go @@ -0,0 +1,117 @@ +package pipe + +import ( +	"testing" +) + +const Values = 999 +const Multiplier = 1000 + +func createCountPipe() <-chan int { +	cout := make(chan int) + +	go func(<-chan int) { +		for i := 1; i < Values; i++ { +			cout <- i +		} +		close(cout) +	}(cout) + +	return cout +} + +func multiply(in int) int { +	return in * Multiplier +} + +func devide(in int) int { +	return in / Multiplier +} + +func TestProc(t *testing.T) { +	c := createCountPipe() +	am := Proc(c, 8, multiply) +	ad := Proc(am, 8, devide) +	a := Proc(ad, 8, multiply) + +	out := make([]int, Values-1) + +	count := 0 +	for i := range a { +		out[count] = i +		count++ +	} + +	for _, v := range out { +		if v < Multiplier { +			t.Errorf("Invalid output %d", v) +		} +	} +} + +func TestYield(t *testing.T) { +	lin := make([]int, Values) + +	for i := 0; i < Values; i++ { +		lin[i] = i +	} + +	cout := Yield(lin) + +	count := 0 +	for out := range cout { +		if out != lin[count] { +			t.Errorf("Invalid output %d", out) +		} +		count++ +	} +} + +type buffer struct { +	value int +} + +func createBuffers() []*buffer { +	buffers := make([]*buffer, Values) + +	for i := 0; i < Values; i++ { +		buffers[i] = &buffer{value: i} +	} + +	return buffers +} + +func multiplyBuffer(in *buffer) *buffer { +	in.value = in.value * Multiplier +	return in +} + +func devideBuffer(in *buffer) { +	in.value = in.value / Multiplier +} + +func TestTailProc(t *testing.T) { +	buffers := createBuffers() +	c := Yield(buffers) +	am := Proc(c, 8, multiplyBuffer) +	TailProc(am, 8, devideBuffer) + +	for i, v := range buffers { +		if v.value != i { +			t.Errorf("Invalid output %d != %d", v.value, i) +		} +	} +} + +func TestWait(t *testing.T) { +	buffers := createBuffers() +	c := Yield(buffers) +	am := Proc(c, 8, multiplyBuffer) +	Wait(am) + +	for i, v := range buffers { +		if v.value != (i * Multiplier) { +			t.Errorf("Invalid output %d != %d", v.value, i) +		} +	} +} | 
