aboutsummaryrefslogtreecommitdiff
path: root/pipe/pipe.go
diff options
context:
space:
mode:
Diffstat (limited to 'pipe/pipe.go')
-rw-r--r--pipe/pipe.go28
1 files changed, 23 insertions, 5 deletions
diff --git a/pipe/pipe.go b/pipe/pipe.go
index 2ca1f8b..0cfa5fd 100644
--- a/pipe/pipe.go
+++ b/pipe/pipe.go
@@ -1,10 +1,11 @@
package pipe
import (
+ "fmt"
"sync"
)
-func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V {
+func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V {
cout := make(chan V)
var wg sync.WaitGroup
@@ -12,7 +13,12 @@ func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V {
wg.Add(1)
go func(<-chan T, chan<- V) {
for i := range cin {
- cout <- proc(i)
+ v, err := proc(i)
+ if err == nil {
+ cout <- v
+ } else {
+ fmt.Println("####", err.Error())
+ }
}
wg.Done()
}(cin, cout)
@@ -34,14 +40,16 @@ func Wait[T any](cin <-chan T) {
}
}
-func TailProc[T any](cin <-chan T, count int, proc func(T)) {
+func TailProc[T any](cin <-chan T, count int, proc func(T) error) {
var wg sync.WaitGroup
- for i := 0; i < 4; i++ {
+ for i := 0; i < count; i++ {
wg.Add(1)
go func(<-chan T) {
for i := range cin {
- proc(i)
+ if err := proc(i); err != nil {
+ fmt.Println("####", err.Error())
+ }
}
wg.Done()
@@ -62,3 +70,13 @@ func Yield[T any](in []T) <-chan T {
return cout
}
+
+func Map[T any, V any](m func(T) V, vs []T) []V {
+ result := make([]V, len(vs))
+
+ for i, v := range vs {
+ result[i] = m(v)
+ }
+
+ return result
+}