aboutsummaryrefslogtreecommitdiff
path: root/pipe
diff options
context:
space:
mode:
authorGabriel Arakaki Giovanini <mail@gabrielgio.me>2022-10-16 19:13:41 +0200
committerGabriel Arakaki Giovanini <mail@gabrielgio.me>2022-10-16 19:16:35 +0200
commiteb1b7d7d9149114eb6b4287b7cb40c49dccfb26e (patch)
tree12f797651abeda9512d56d8922199ae9edb2a293 /pipe
parent98844247a424558939228b82e9b5f28d723c4fe0 (diff)
downloadporg-eb1b7d7d9149114eb6b4287b7cb40c49dccfb26e.tar.gz
porg-eb1b7d7d9149114eb6b4287b7cb40c49dccfb26e.tar.bz2
porg-eb1b7d7d9149114eb6b4287b7cb40c49dccfb26e.zip
feat: Add storage interfaceHEADmaster
With this is easier to interact with storage layers.
Diffstat (limited to 'pipe')
-rw-r--r--pipe/pipe.go28
-rw-r--r--pipe/pipe_test.go15
2 files changed, 31 insertions, 12 deletions
diff --git a/pipe/pipe.go b/pipe/pipe.go
index 2ca1f8b..0cfa5fd 100644
--- a/pipe/pipe.go
+++ b/pipe/pipe.go
@@ -1,10 +1,11 @@
package pipe
import (
+ "fmt"
"sync"
)
-func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V {
+func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V {
cout := make(chan V)
var wg sync.WaitGroup
@@ -12,7 +13,12 @@ func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V {
wg.Add(1)
go func(<-chan T, chan<- V) {
for i := range cin {
- cout <- proc(i)
+ v, err := proc(i)
+ if err == nil {
+ cout <- v
+ } else {
+ fmt.Println("####", err.Error())
+ }
}
wg.Done()
}(cin, cout)
@@ -34,14 +40,16 @@ func Wait[T any](cin <-chan T) {
}
}
-func TailProc[T any](cin <-chan T, count int, proc func(T)) {
+func TailProc[T any](cin <-chan T, count int, proc func(T) error) {
var wg sync.WaitGroup
- for i := 0; i < 4; i++ {
+ for i := 0; i < count; i++ {
wg.Add(1)
go func(<-chan T) {
for i := range cin {
- proc(i)
+ if err := proc(i); err != nil {
+ fmt.Println("####", err.Error())
+ }
}
wg.Done()
@@ -62,3 +70,13 @@ func Yield[T any](in []T) <-chan T {
return cout
}
+
+func Map[T any, V any](m func(T) V, vs []T) []V {
+ result := make([]V, len(vs))
+
+ for i, v := range vs {
+ result[i] = m(v)
+ }
+
+ return result
+}
diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go
index e71b05b..950d039 100644
--- a/pipe/pipe_test.go
+++ b/pipe/pipe_test.go
@@ -20,12 +20,12 @@ func createCountPipe() <-chan int {
return cout
}
-func multiply(in int) int {
- return in * Multiplier
+func multiply(in int) (int, error) {
+ return in * Multiplier, nil
}
-func devide(in int) int {
- return in / Multiplier
+func devide(in int) (int, error) {
+ return in / Multiplier, nil
}
func TestProc(t *testing.T) {
@@ -81,13 +81,14 @@ func createBuffers() []*buffer {
return buffers
}
-func multiplyBuffer(in *buffer) *buffer {
+func multiplyBuffer(in *buffer) (*buffer, error) {
in.value = in.value * Multiplier
- return in
+ return in, nil
}
-func devideBuffer(in *buffer) {
+func devideBuffer(in *buffer) error {
in.value = in.value / Multiplier
+ return nil
}
func TestTailProc(t *testing.T) {