diff options
Diffstat (limited to 'pipe/pipe.go')
-rw-r--r-- | pipe/pipe.go | 28 |
1 files changed, 23 insertions, 5 deletions
diff --git a/pipe/pipe.go b/pipe/pipe.go index 2ca1f8b..0cfa5fd 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -1,10 +1,11 @@ package pipe import ( + "fmt" "sync" ) -func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V { +func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V { cout := make(chan V) var wg sync.WaitGroup @@ -12,7 +13,12 @@ func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V { wg.Add(1) go func(<-chan T, chan<- V) { for i := range cin { - cout <- proc(i) + v, err := proc(i) + if err == nil { + cout <- v + } else { + fmt.Println("####", err.Error()) + } } wg.Done() }(cin, cout) @@ -34,14 +40,16 @@ func Wait[T any](cin <-chan T) { } } -func TailProc[T any](cin <-chan T, count int, proc func(T)) { +func TailProc[T any](cin <-chan T, count int, proc func(T) error) { var wg sync.WaitGroup - for i := 0; i < 4; i++ { + for i := 0; i < count; i++ { wg.Add(1) go func(<-chan T) { for i := range cin { - proc(i) + if err := proc(i); err != nil { + fmt.Println("####", err.Error()) + } } wg.Done() @@ -62,3 +70,13 @@ func Yield[T any](in []T) <-chan T { return cout } + +func Map[T any, V any](m func(T) V, vs []T) []V { + result := make([]V, len(vs)) + + for i, v := range vs { + result[i] = m(v) + } + + return result +} |