diff options
author | Gabriel Arakaki Giovanini <mail@gabrielgio.me> | 2022-10-16 19:13:41 +0200 |
---|---|---|
committer | Gabriel Arakaki Giovanini <mail@gabrielgio.me> | 2022-10-16 19:16:35 +0200 |
commit | eb1b7d7d9149114eb6b4287b7cb40c49dccfb26e (patch) | |
tree | 12f797651abeda9512d56d8922199ae9edb2a293 /pipe | |
parent | 98844247a424558939228b82e9b5f28d723c4fe0 (diff) | |
download | porg-eb1b7d7d9149114eb6b4287b7cb40c49dccfb26e.tar.gz porg-eb1b7d7d9149114eb6b4287b7cb40c49dccfb26e.tar.bz2 porg-eb1b7d7d9149114eb6b4287b7cb40c49dccfb26e.zip |
With this is easier to interact with storage layers.
Diffstat (limited to 'pipe')
-rw-r--r-- | pipe/pipe.go | 28 | ||||
-rw-r--r-- | pipe/pipe_test.go | 15 |
2 files changed, 31 insertions, 12 deletions
diff --git a/pipe/pipe.go b/pipe/pipe.go index 2ca1f8b..0cfa5fd 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -1,10 +1,11 @@ package pipe import ( + "fmt" "sync" ) -func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V { +func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V { cout := make(chan V) var wg sync.WaitGroup @@ -12,7 +13,12 @@ func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V { wg.Add(1) go func(<-chan T, chan<- V) { for i := range cin { - cout <- proc(i) + v, err := proc(i) + if err == nil { + cout <- v + } else { + fmt.Println("####", err.Error()) + } } wg.Done() }(cin, cout) @@ -34,14 +40,16 @@ func Wait[T any](cin <-chan T) { } } -func TailProc[T any](cin <-chan T, count int, proc func(T)) { +func TailProc[T any](cin <-chan T, count int, proc func(T) error) { var wg sync.WaitGroup - for i := 0; i < 4; i++ { + for i := 0; i < count; i++ { wg.Add(1) go func(<-chan T) { for i := range cin { - proc(i) + if err := proc(i); err != nil { + fmt.Println("####", err.Error()) + } } wg.Done() @@ -62,3 +70,13 @@ func Yield[T any](in []T) <-chan T { return cout } + +func Map[T any, V any](m func(T) V, vs []T) []V { + result := make([]V, len(vs)) + + for i, v := range vs { + result[i] = m(v) + } + + return result +} diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go index e71b05b..950d039 100644 --- a/pipe/pipe_test.go +++ b/pipe/pipe_test.go @@ -20,12 +20,12 @@ func createCountPipe() <-chan int { return cout } -func multiply(in int) int { - return in * Multiplier +func multiply(in int) (int, error) { + return in * Multiplier, nil } -func devide(in int) int { - return in / Multiplier +func devide(in int) (int, error) { + return in / Multiplier, nil } func TestProc(t *testing.T) { @@ -81,13 +81,14 @@ func createBuffers() []*buffer { return buffers } -func multiplyBuffer(in *buffer) *buffer { +func multiplyBuffer(in *buffer) (*buffer, error) { in.value = in.value * Multiplier - return in + return in, nil } -func devideBuffer(in *buffer) { +func devideBuffer(in *buffer) error { in.value = in.value / Multiplier + return nil } func TestTailProc(t *testing.T) { |