aboutsummaryrefslogtreecommitdiff
path: root/pipe/pipe.go
diff options
context:
space:
mode:
Diffstat (limited to 'pipe/pipe.go')
-rw-r--r--pipe/pipe.go64
1 files changed, 64 insertions, 0 deletions
diff --git a/pipe/pipe.go b/pipe/pipe.go
new file mode 100644
index 0000000..2ca1f8b
--- /dev/null
+++ b/pipe/pipe.go
@@ -0,0 +1,64 @@
+package pipe
+
+import (
+ "sync"
+)
+
+func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V {
+ cout := make(chan V)
+ var wg sync.WaitGroup
+
+ for i := 0; i < count; i++ {
+ wg.Add(1)
+ go func(<-chan T, chan<- V) {
+ for i := range cin {
+ cout <- proc(i)
+ }
+ wg.Done()
+ }(cin, cout)
+ }
+
+ go func() {
+ wg.Wait()
+ close(cout)
+ }()
+
+ return cout
+}
+
+func noop[T any](_ T) {}
+
+func Wait[T any](cin <-chan T) {
+ for v := range cin {
+ noop(v)
+ }
+}
+
+func TailProc[T any](cin <-chan T, count int, proc func(T)) {
+ var wg sync.WaitGroup
+
+ for i := 0; i < 4; i++ {
+ wg.Add(1)
+ go func(<-chan T) {
+ for i := range cin {
+ proc(i)
+ }
+ wg.Done()
+
+ }(cin)
+ }
+ wg.Wait()
+}
+
+func Yield[T any](in []T) <-chan T {
+ cout := make(chan T)
+
+ go func(chan<- T) {
+ for _, a := range in {
+ cout <- a
+ }
+ close(cout)
+ }(cout)
+
+ return cout
+}