diff options
Diffstat (limited to 'pipe')
-rw-r--r-- | pipe/pipe.go | 64 | ||||
-rw-r--r-- | pipe/pipe_test.go | 117 |
2 files changed, 181 insertions, 0 deletions
diff --git a/pipe/pipe.go b/pipe/pipe.go new file mode 100644 index 0000000..2ca1f8b --- /dev/null +++ b/pipe/pipe.go @@ -0,0 +1,64 @@ +package pipe + +import ( + "sync" +) + +func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V { + cout := make(chan V) + var wg sync.WaitGroup + + for i := 0; i < count; i++ { + wg.Add(1) + go func(<-chan T, chan<- V) { + for i := range cin { + cout <- proc(i) + } + wg.Done() + }(cin, cout) + } + + go func() { + wg.Wait() + close(cout) + }() + + return cout +} + +func noop[T any](_ T) {} + +func Wait[T any](cin <-chan T) { + for v := range cin { + noop(v) + } +} + +func TailProc[T any](cin <-chan T, count int, proc func(T)) { + var wg sync.WaitGroup + + for i := 0; i < 4; i++ { + wg.Add(1) + go func(<-chan T) { + for i := range cin { + proc(i) + } + wg.Done() + + }(cin) + } + wg.Wait() +} + +func Yield[T any](in []T) <-chan T { + cout := make(chan T) + + go func(chan<- T) { + for _, a := range in { + cout <- a + } + close(cout) + }(cout) + + return cout +} diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go new file mode 100644 index 0000000..e71b05b --- /dev/null +++ b/pipe/pipe_test.go @@ -0,0 +1,117 @@ +package pipe + +import ( + "testing" +) + +const Values = 999 +const Multiplier = 1000 + +func createCountPipe() <-chan int { + cout := make(chan int) + + go func(<-chan int) { + for i := 1; i < Values; i++ { + cout <- i + } + close(cout) + }(cout) + + return cout +} + +func multiply(in int) int { + return in * Multiplier +} + +func devide(in int) int { + return in / Multiplier +} + +func TestProc(t *testing.T) { + c := createCountPipe() + am := Proc(c, 8, multiply) + ad := Proc(am, 8, devide) + a := Proc(ad, 8, multiply) + + out := make([]int, Values-1) + + count := 0 + for i := range a { + out[count] = i + count++ + } + + for _, v := range out { + if v < Multiplier { + t.Errorf("Invalid output %d", v) + } + } +} + +func TestYield(t *testing.T) { + lin := make([]int, Values) + + for i := 0; i < Values; i++ { + lin[i] = i + } + + cout := Yield(lin) + + count := 0 + for out := range cout { + if out != lin[count] { + t.Errorf("Invalid output %d", out) + } + count++ + } +} + +type buffer struct { + value int +} + +func createBuffers() []*buffer { + buffers := make([]*buffer, Values) + + for i := 0; i < Values; i++ { + buffers[i] = &buffer{value: i} + } + + return buffers +} + +func multiplyBuffer(in *buffer) *buffer { + in.value = in.value * Multiplier + return in +} + +func devideBuffer(in *buffer) { + in.value = in.value / Multiplier +} + +func TestTailProc(t *testing.T) { + buffers := createBuffers() + c := Yield(buffers) + am := Proc(c, 8, multiplyBuffer) + TailProc(am, 8, devideBuffer) + + for i, v := range buffers { + if v.value != i { + t.Errorf("Invalid output %d != %d", v.value, i) + } + } +} + +func TestWait(t *testing.T) { + buffers := createBuffers() + c := Yield(buffers) + am := Proc(c, 8, multiplyBuffer) + Wait(am) + + for i, v := range buffers { + if v.value != (i * Multiplier) { + t.Errorf("Invalid output %d != %d", v.value, i) + } + } +} |