package pipe import ( "fmt" "sync" ) func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V { cout := make(chan V) var wg sync.WaitGroup for i := 0; i < count; i++ { wg.Add(1) go func(<-chan T, chan<- V) { for i := range cin { v, err := proc(i) if err == nil { cout <- v } else { fmt.Println("####", err.Error()) } } wg.Done() }(cin, cout) } go func() { wg.Wait() close(cout) }() return cout } func noop[T any](_ T) {} func Wait[T any](cin <-chan T) { for v := range cin { noop(v) } } func TailProc[T any](cin <-chan T, count int, proc func(T) error) { var wg sync.WaitGroup for i := 0; i < count; i++ { wg.Add(1) go func(<-chan T) { for i := range cin { if err := proc(i); err != nil { fmt.Println("####", err.Error()) } } wg.Done() }(cin) } wg.Wait() } func Yield[T any](in []T) <-chan T { cout := make(chan T) go func(chan<- T) { for _, a := range in { cout <- a } close(cout) }(cout) return cout } func Map[T any, V any](m func(T) V, vs []T) []V { result := make([]V, len(vs)) for i, v := range vs { result[i] = m(v) } return result }