path: root/pipe.go
diff options
authorGabriel A. Giovanini <mail@gabrielgio.me>2023-12-09 18:14:51 +0100
committerGabriel A. Giovanini <mail@gabrielgio.me>2023-12-09 18:17:36 +0100
commit8049a4e0decd7b233cf2c2339ad0e57a0a029898 (patch)
treec28c8214a7f2482ad8d98c84ab68176029776fb8 /pipe.go
Initial commit
Diffstat (limited to 'pipe.go')
1 files changed, 128 insertions, 0 deletions
diff --git a/pipe.go b/pipe.go
new file mode 100644
index 0000000..08eaa23
--- /dev/null
+++ b/pipe.go
@@ -0,0 +1,128 @@
+package pipe
+import (
+ "sync"
+var SinkChanError chan error
+func init() {
+ SinkChanError = make(chan error)
+ go func() {
+ for range SinkChanError {
+ }
+ }()
+// Proc maps cin into a channel by applying proc and outputs to its return.
+func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V {
+ return ProcWithError(cin, SinkChanError, count, proc)
+// ProcWithError maps cin into a channel by applying proc and outputs to its return.
+func ProcWithError[T any, V any](cin <-chan T, cerr chan<- error, count int, proc func(T) (V, error)) <-chan V {
+ cout := make(chan V)
+ var wg sync.WaitGroup
+ for i := 0; i < count; i++ {
+ wg.Add(1)
+ go func(<-chan T, chan<- V) {
+ for i := range cin {
+ v, err := proc(i)
+ if err == nil {
+ cout <- v
+ } else {
+ cerr <- err
+ }
+ }
+ wg.Done()
+ }(cin, cout)
+ }
+ go func() {
+ wg.Wait()
+ close(cout)
+ }()
+ return cout
+func noop[T any](_ T) {}
+// Wait waits a channel to close, it ends a chain of procs.
+func Wait[T any](cin <-chan T) {
+ for v := range cin {
+ noop(v)
+ }
+// TailProc waits a channel to close by applying proc to it.
+// It does not return a channel since its meant to end a chain of goroutine+channel.
+func TailProc[T any](cin <-chan T, count int, proc func(T) error) {
+ TailProcWithError(cin, SinkChanError, count, proc)
+// TailProcWithError waits a channel to close by applying proc to it.
+// It does not return a channel since its meant to end a chain of goroutine+channel.
+func TailProcWithError[T any](cin <-chan T, cerr chan<- error, count int, proc func(T) error) {
+ var wg sync.WaitGroup
+ for i := 0; i < count; i++ {
+ wg.Add(1)
+ go func(<-chan T) {
+ for i := range cin {
+ if err := proc(i); err != nil {
+ cerr <- err
+ }
+ }
+ wg.Done()
+ }(cin)
+ }
+ wg.Wait()
+func TailReduce[T any, V any](cin <-chan T, red func(V, T) (V, error), vinit V) V {
+ return TailReduceWithError(cin, SinkChanError, red, vinit)
+func TailReduceWithError[T any, V any](cin <-chan T, cerr chan<- error, red func(V, T) (V, error), vinit V) V {
+ acc := vinit
+ for i := range cin {
+ if a, err := red(acc, i); err != nil {
+ cerr <- err
+ } else {
+ acc = a
+ }
+ }
+ return acc
+// Yield returna a channel from a slice.
+// It will pipe in into a chan returned by the function.
+func Yield[T any](in []T) <-chan T {
+ cout := make(chan T)
+ go func(chan<- T) {
+ for _, a := range in {
+ cout <- a
+ }
+ close(cout)
+ }(cout)
+ return cout
+// Map is a classic map.
+// It gets vs apply m to it and return its value n times.
+func Map[T any, V any](m func(T) V, vs []T) []V {
+ result := make([]V, len(vs))
+ for i, v := range vs {
+ result[i] = m(v)
+ }
+ return result