aboutsummaryrefslogtreecommitdiff
path: root/pipe
diff options
context:
space:
mode:
Diffstat (limited to 'pipe')
-rw-r--r--pipe/pipe.go64
-rw-r--r--pipe/pipe_test.go117
2 files changed, 181 insertions, 0 deletions
diff --git a/pipe/pipe.go b/pipe/pipe.go
new file mode 100644
index 0000000..2ca1f8b
--- /dev/null
+++ b/pipe/pipe.go
@@ -0,0 +1,64 @@
+package pipe
+
+import (
+ "sync"
+)
+
+func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V {
+ cout := make(chan V)
+ var wg sync.WaitGroup
+
+ for i := 0; i < count; i++ {
+ wg.Add(1)
+ go func(<-chan T, chan<- V) {
+ for i := range cin {
+ cout <- proc(i)
+ }
+ wg.Done()
+ }(cin, cout)
+ }
+
+ go func() {
+ wg.Wait()
+ close(cout)
+ }()
+
+ return cout
+}
+
+func noop[T any](_ T) {}
+
+func Wait[T any](cin <-chan T) {
+ for v := range cin {
+ noop(v)
+ }
+}
+
+func TailProc[T any](cin <-chan T, count int, proc func(T)) {
+ var wg sync.WaitGroup
+
+ for i := 0; i < 4; i++ {
+ wg.Add(1)
+ go func(<-chan T) {
+ for i := range cin {
+ proc(i)
+ }
+ wg.Done()
+
+ }(cin)
+ }
+ wg.Wait()
+}
+
+func Yield[T any](in []T) <-chan T {
+ cout := make(chan T)
+
+ go func(chan<- T) {
+ for _, a := range in {
+ cout <- a
+ }
+ close(cout)
+ }(cout)
+
+ return cout
+}
diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go
new file mode 100644
index 0000000..e71b05b
--- /dev/null
+++ b/pipe/pipe_test.go
@@ -0,0 +1,117 @@
+package pipe
+
+import (
+ "testing"
+)
+
+const Values = 999
+const Multiplier = 1000
+
+func createCountPipe() <-chan int {
+ cout := make(chan int)
+
+ go func(<-chan int) {
+ for i := 1; i < Values; i++ {
+ cout <- i
+ }
+ close(cout)
+ }(cout)
+
+ return cout
+}
+
+func multiply(in int) int {
+ return in * Multiplier
+}
+
+func devide(in int) int {
+ return in / Multiplier
+}
+
+func TestProc(t *testing.T) {
+ c := createCountPipe()
+ am := Proc(c, 8, multiply)
+ ad := Proc(am, 8, devide)
+ a := Proc(ad, 8, multiply)
+
+ out := make([]int, Values-1)
+
+ count := 0
+ for i := range a {
+ out[count] = i
+ count++
+ }
+
+ for _, v := range out {
+ if v < Multiplier {
+ t.Errorf("Invalid output %d", v)
+ }
+ }
+}
+
+func TestYield(t *testing.T) {
+ lin := make([]int, Values)
+
+ for i := 0; i < Values; i++ {
+ lin[i] = i
+ }
+
+ cout := Yield(lin)
+
+ count := 0
+ for out := range cout {
+ if out != lin[count] {
+ t.Errorf("Invalid output %d", out)
+ }
+ count++
+ }
+}
+
+type buffer struct {
+ value int
+}
+
+func createBuffers() []*buffer {
+ buffers := make([]*buffer, Values)
+
+ for i := 0; i < Values; i++ {
+ buffers[i] = &buffer{value: i}
+ }
+
+ return buffers
+}
+
+func multiplyBuffer(in *buffer) *buffer {
+ in.value = in.value * Multiplier
+ return in
+}
+
+func devideBuffer(in *buffer) {
+ in.value = in.value / Multiplier
+}
+
+func TestTailProc(t *testing.T) {
+ buffers := createBuffers()
+ c := Yield(buffers)
+ am := Proc(c, 8, multiplyBuffer)
+ TailProc(am, 8, devideBuffer)
+
+ for i, v := range buffers {
+ if v.value != i {
+ t.Errorf("Invalid output %d != %d", v.value, i)
+ }
+ }
+}
+
+func TestWait(t *testing.T) {
+ buffers := createBuffers()
+ c := Yield(buffers)
+ am := Proc(c, 8, multiplyBuffer)
+ Wait(am)
+
+ for i, v := range buffers {
+ if v.value != (i * Multiplier) {
+ t.Errorf("Invalid output %d != %d", v.value, i)
+ }
+ }
+}