package pipe import ( "sync" ) func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V { cout := make(chan V) var wg sync.WaitGroup for i := 0; i < count; i++ { wg.Add(1) go func(<-chan T, chan<- V) { for i := range cin { cout <- proc(i) } wg.Done() }(cin, cout) } go func() { wg.Wait() close(cout) }() return cout } func noop[T any](_ T) {} func Wait[T any](cin <-chan T) { for v := range cin { noop(v) } } func TailProc[T any](cin <-chan T, count int, proc func(T)) { var wg sync.WaitGroup for i := 0; i < 4; i++ { wg.Add(1) go func(<-chan T) { for i := range cin { proc(i) } wg.Done() }(cin) } wg.Wait() } func Yield[T any](in []T) <-chan T { cout := make(chan T) go func(chan<- T) { for _, a := range in { cout <- a } close(cout) }(cout) return cout }