diff options
Diffstat (limited to 'pipe.go')
-rw-r--r-- | pipe.go | 128 |
1 files changed, 128 insertions, 0 deletions
@@ -0,0 +1,128 @@ +package pipe + +import ( + "sync" +) + +var SinkChanError chan error + +func init() { + SinkChanError = make(chan error) + go func() { + for range SinkChanError { + } + }() +} + +// Proc maps cin into a channel by applying proc and outputs to its return. +func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V { + return ProcWithError(cin, SinkChanError, count, proc) +} + +// ProcWithError maps cin into a channel by applying proc and outputs to its return. +func ProcWithError[T any, V any](cin <-chan T, cerr chan<- error, count int, proc func(T) (V, error)) <-chan V { + cout := make(chan V) + var wg sync.WaitGroup + + for i := 0; i < count; i++ { + wg.Add(1) + go func(<-chan T, chan<- V) { + for i := range cin { + v, err := proc(i) + if err == nil { + cout <- v + } else { + cerr <- err + } + } + wg.Done() + }(cin, cout) + } + + go func() { + wg.Wait() + close(cout) + }() + + return cout +} + +func noop[T any](_ T) {} + +// Wait waits a channel to close, it ends a chain of procs. +func Wait[T any](cin <-chan T) { + for v := range cin { + noop(v) + } +} + +// TailProc waits a channel to close by applying proc to it. +// It does not return a channel since its meant to end a chain of goroutine+channel. +func TailProc[T any](cin <-chan T, count int, proc func(T) error) { + TailProcWithError(cin, SinkChanError, count, proc) +} + +// TailProcWithError waits a channel to close by applying proc to it. +// It does not return a channel since its meant to end a chain of goroutine+channel. +func TailProcWithError[T any](cin <-chan T, cerr chan<- error, count int, proc func(T) error) { + var wg sync.WaitGroup + + for i := 0; i < count; i++ { + wg.Add(1) + go func(<-chan T) { + for i := range cin { + if err := proc(i); err != nil { + cerr <- err + } + } + wg.Done() + + }(cin) + } + wg.Wait() +} + +func TailReduce[T any, V any](cin <-chan T, red func(V, T) (V, error), vinit V) V { + return TailReduceWithError(cin, SinkChanError, red, vinit) +} + +func TailReduceWithError[T any, V any](cin <-chan T, cerr chan<- error, red func(V, T) (V, error), vinit V) V { + acc := vinit + + for i := range cin { + if a, err := red(acc, i); err != nil { + cerr <- err + } else { + acc = a + } + } + + return acc +} + +// Yield returna a channel from a slice. +// It will pipe in into a chan returned by the function. +func Yield[T any](in []T) <-chan T { + cout := make(chan T) + + go func(chan<- T) { + for _, a := range in { + cout <- a + } + close(cout) + }(cout) + + return cout +} + +// Map is a classic map. +// It gets vs apply m to it and return its value n times. +func Map[T any, V any](m func(T) V, vs []T) []V { + result := make([]V, len(vs)) + + for i, v := range vs { + result[i] = m(v) + } + + return result +} |