From 544bbeeaf836436305cbed87ae1019511de62535 Mon Sep 17 00:00:00 2001 From: Gabriel Arakaki Giovanini Date: Sun, 28 Aug 2022 17:03:38 +0200 Subject: feat: Add init source code For now only `pipe` is feature complete. The order module needs to be changed a lot. --- pipe/pipe.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 pipe/pipe.go (limited to 'pipe/pipe.go') diff --git a/pipe/pipe.go b/pipe/pipe.go new file mode 100644 index 0000000..2ca1f8b --- /dev/null +++ b/pipe/pipe.go @@ -0,0 +1,64 @@ +package pipe + +import ( + "sync" +) + +func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V { + cout := make(chan V) + var wg sync.WaitGroup + + for i := 0; i < count; i++ { + wg.Add(1) + go func(<-chan T, chan<- V) { + for i := range cin { + cout <- proc(i) + } + wg.Done() + }(cin, cout) + } + + go func() { + wg.Wait() + close(cout) + }() + + return cout +} + +func noop[T any](_ T) {} + +func Wait[T any](cin <-chan T) { + for v := range cin { + noop(v) + } +} + +func TailProc[T any](cin <-chan T, count int, proc func(T)) { + var wg sync.WaitGroup + + for i := 0; i < 4; i++ { + wg.Add(1) + go func(<-chan T) { + for i := range cin { + proc(i) + } + wg.Done() + + }(cin) + } + wg.Wait() +} + +func Yield[T any](in []T) <-chan T { + cout := make(chan T) + + go func(chan<- T) { + for _, a := range in { + cout <- a + } + close(cout) + }(cout) + + return cout +} -- cgit v1.2.3