package pipe import ( "sync" ) var SinkChanError chan error func init() { SinkChanError = make(chan error) go func() { for range SinkChanError { } }() } // Proc maps cin into a channel by applying proc and outputs to its return. func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V { return ProcWithError(cin, SinkChanError, count, proc) } // ProcWithError maps cin into a channel by applying proc and outputs to its return. func ProcWithError[T any, V any](cin <-chan T, cerr chan<- error, count int, proc func(T) (V, error)) <-chan V { cout := make(chan V) var wg sync.WaitGroup for i := 0; i < count; i++ { wg.Add(1) go func(<-chan T, chan<- V) { for i := range cin { v, err := proc(i) if err == nil { cout <- v } else { cerr <- err } } wg.Done() }(cin, cout) } go func() { wg.Wait() close(cout) }() return cout } func noop[T any](_ T) {} // Wait waits a channel to close, it ends a chain of procs. func Wait[T any](cin <-chan T) { for v := range cin { noop(v) } } // TailProc waits a channel to close by applying proc to it. // It does not return a channel since its meant to end a chain of goroutine+channel. func TailProc[T any](cin <-chan T, count int, proc func(T) error) { TailProcWithError(cin, SinkChanError, count, proc) } // TailProcWithError waits a channel to close by applying proc to it. // It does not return a channel since its meant to end a chain of goroutine+channel. func TailProcWithError[T any](cin <-chan T, cerr chan<- error, count int, proc func(T) error) { var wg sync.WaitGroup for i := 0; i < count; i++ { wg.Add(1) go func(<-chan T) { for i := range cin { if err := proc(i); err != nil { cerr <- err } } wg.Done() }(cin) } wg.Wait() } func TailReduce[T any, V any](cin <-chan T, red func(V, T) (V, error), vinit V) V { return TailReduceWithError(cin, SinkChanError, red, vinit) } func TailReduceWithError[T any, V any](cin <-chan T, cerr chan<- error, red func(V, T) (V, error), vinit V) V { acc := vinit for i := range cin { if a, err := red(acc, i); err != nil { cerr <- err } else { acc = a } } return acc } // Yield returna a channel from a slice. // It will pipe in into a chan returned by the function. func Yield[T any](in []T) <-chan T { cout := make(chan T) go func(chan<- T) { for _, a := range in { cout <- a } close(cout) }(cout) return cout } // Map is a classic map. // It gets vs apply m to it and return its value n times. func Map[T any, V any](m func(T) V, vs []T) []V { result := make([]V, len(vs)) for i, v := range vs { result[i] = m(v) } return result }