aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--LICENSE19
-rw-r--r--Makefile14
-rw-r--r--README.md6
-rw-r--r--examples/countlines.go135
-rw-r--r--go.mod3
-rw-r--r--pipe.go128
-rw-r--r--pipe_test.go122
8 files changed, 428 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..e660fd9
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+bin/
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..65afc42
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,19 @@
+Copyright © 2023 Gabriel A. Giovanini
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the “Software”), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+of the Software, and to permit persons to whom the Software is furnished to do
+so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..e6c3c4c
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,14 @@
+test:
+ go test -tags=unit .
+
+doc:
+ godoc \
+ -goroot=${PWD} \
+ -templates="./docs" \
+ -url="/pkg/git.sr.ht/~gabrielgio/pipe/" \
+ > index.html
+
+examples: countlines
+
+countlines:
+ go build -o bin/coutlines ./examples/countlines.go
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..d706bfa
--- /dev/null
+++ b/README.md
@@ -0,0 +1,6 @@
+# PIPE
+
+A simple collection of channel based function make simple function to be
+processed in multiples goroutine and piped.
+
+Check `examples` folder for how to use it.
diff --git a/examples/countlines.go b/examples/countlines.go
new file mode 100644
index 0000000..000efa5
--- /dev/null
+++ b/examples/countlines.go
@@ -0,0 +1,135 @@
+package main
+
+import (
+ "bufio"
+ "errors"
+ "flag"
+ "fmt"
+ "os"
+ "path/filepath"
+ "unicode/utf8"
+
+ "git.sr.ht/~gabrielgio/pipe"
+)
+
+var (
+ InvalidInput = errors.New("It was given a file instead of a folder.")
+ ColorGreen = "\033[32m"
+ ColorRed = "\033[31m"
+ ColorReset = "\033[0m"
+)
+
+type (
+ NonTextError struct {
+ Filename string
+ }
+)
+
+func NewNonTextError(filename string) *NonTextError {
+ return &NonTextError{Filename: filename}
+}
+
+func (b *NonTextError) Error() string {
+ return fmt.Sprintf("Non text file %s", b.Filename)
+}
+
+func main() {
+ err := run()
+ if err != nil {
+ fmt.Printf("Error on main process: %s", err.Error())
+ }
+}
+
+func run() error {
+ var (
+ dir = flag.String("dir", "", "Folder to recursively search for files. Default: \"\"")
+ quiet = flag.Bool("quiet", false, "If true only outputs the result. Default: false")
+ nproc = flag.Int("nproc", 4, "Number of goroutines used to count. Default: 4")
+ )
+
+ flag.Parse()
+
+ info, err := os.Stat(*dir)
+ if err != nil {
+ return err
+ }
+
+ if !info.IsDir() {
+ return InvalidInput
+ }
+
+ cerr := make(chan error)
+ go func(c <-chan error) {
+ for e := range c {
+ if !*quiet {
+ fmt.Printf("%sERROR%s: %s\n", ColorRed, ColorReset, e.Error())
+ }
+ }
+ }(cerr)
+
+ c := count(*dir, *nproc, cerr)
+ fmt.Printf("%sCOUNT%s: %d\n", ColorGreen, ColorReset, c)
+
+ return nil
+}
+
+func count(dir string, nproc int, cerr chan<- error) int {
+ cfiles := walkFolder(dir)
+ ccount := pipe.ProcWithError(cfiles, cerr, nproc, countLines)
+ return pipe.TailReduceWithError(ccount, cerr, sum, 0)
+}
+
+func sum(acc, lines int) (int, error) {
+ return acc + lines, nil
+}
+
+func countLines(filename string) (int, error) {
+
+ file, err := os.Open(filename)
+ if err != nil {
+ return 0, err
+ }
+
+ defer file.Close()
+
+ fileScanner := bufio.NewScanner(file)
+ fileScanner.Split(bufio.ScanLines)
+
+ var count int
+ for fileScanner.Scan() {
+ if !utf8.ValidString(string(fileScanner.Text())) {
+ return 0, NewNonTextError(filename)
+ }
+ count++
+ }
+
+ return count, nil
+}
+
+func walkFolder(folder string) <-chan string {
+ c := make(chan string)
+
+ go func(folder string, c chan string) {
+ filepath.Walk(folder, func(path string, info os.FileInfo, err error) error {
+ file, err := os.Open(path)
+ if err != nil {
+ return filepath.SkipDir
+ }
+ defer file.Close()
+
+ fileInfo, err := file.Stat()
+ if err != nil {
+ return filepath.SkipDir
+ }
+
+ if !fileInfo.IsDir() {
+ c <- path
+ }
+ return nil
+ })
+ close(c)
+
+ }(folder, c)
+
+ return c
+}
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..bad46aa
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,3 @@
+module git.sr.ht/~gabrielgio/pipe
+
+go 1.21.4
diff --git a/pipe.go b/pipe.go
new file mode 100644
index 0000000..08eaa23
--- /dev/null
+++ b/pipe.go
@@ -0,0 +1,128 @@
+package pipe
+
+import (
+ "sync"
+)
+
+var SinkChanError chan error
+
+func init() {
+ SinkChanError = make(chan error)
+ go func() {
+ for range SinkChanError {
+ }
+ }()
+}
+
+// Proc maps cin into a channel by applying proc and outputs to its return.
+func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V {
+ return ProcWithError(cin, SinkChanError, count, proc)
+}
+
+// ProcWithError maps cin into a channel by applying proc and outputs to its return.
+func ProcWithError[T any, V any](cin <-chan T, cerr chan<- error, count int, proc func(T) (V, error)) <-chan V {
+ cout := make(chan V)
+ var wg sync.WaitGroup
+
+ for i := 0; i < count; i++ {
+ wg.Add(1)
+ go func(<-chan T, chan<- V) {
+ for i := range cin {
+ v, err := proc(i)
+ if err == nil {
+ cout <- v
+ } else {
+ cerr <- err
+ }
+ }
+ wg.Done()
+ }(cin, cout)
+ }
+
+ go func() {
+ wg.Wait()
+ close(cout)
+ }()
+
+ return cout
+}
+
+func noop[T any](_ T) {}
+
+// Wait waits a channel to close, it ends a chain of procs.
+func Wait[T any](cin <-chan T) {
+ for v := range cin {
+ noop(v)
+ }
+}
+
+// TailProc waits a channel to close by applying proc to it.
+// It does not return a channel since its meant to end a chain of goroutine+channel.
+func TailProc[T any](cin <-chan T, count int, proc func(T) error) {
+ TailProcWithError(cin, SinkChanError, count, proc)
+}
+
+// TailProcWithError waits a channel to close by applying proc to it.
+// It does not return a channel since its meant to end a chain of goroutine+channel.
+func TailProcWithError[T any](cin <-chan T, cerr chan<- error, count int, proc func(T) error) {
+ var wg sync.WaitGroup
+
+ for i := 0; i < count; i++ {
+ wg.Add(1)
+ go func(<-chan T) {
+ for i := range cin {
+ if err := proc(i); err != nil {
+ cerr <- err
+ }
+ }
+ wg.Done()
+
+ }(cin)
+ }
+ wg.Wait()
+}
+
+func TailReduce[T any, V any](cin <-chan T, red func(V, T) (V, error), vinit V) V {
+ return TailReduceWithError(cin, SinkChanError, red, vinit)
+}
+
+func TailReduceWithError[T any, V any](cin <-chan T, cerr chan<- error, red func(V, T) (V, error), vinit V) V {
+ acc := vinit
+
+ for i := range cin {
+ if a, err := red(acc, i); err != nil {
+ cerr <- err
+ } else {
+ acc = a
+ }
+ }
+
+ return acc
+}
+
+// Yield returna a channel from a slice.
+// It will pipe in into a chan returned by the function.
+func Yield[T any](in []T) <-chan T {
+ cout := make(chan T)
+
+ go func(chan<- T) {
+ for _, a := range in {
+ cout <- a
+ }
+ close(cout)
+ }(cout)
+
+ return cout
+}
+
+// Map is a classic map.
+// It gets vs apply m to it and return its value n times.
+func Map[T any, V any](m func(T) V, vs []T) []V {
+ result := make([]V, len(vs))
+
+ for i, v := range vs {
+ result[i] = m(v)
+ }
+
+ return result
+}
diff --git a/pipe_test.go b/pipe_test.go
new file mode 100644
index 0000000..804adcc
--- /dev/null
+++ b/pipe_test.go
@@ -0,0 +1,122 @@
+//go:build unit
+
+package pipe
+
+import (
+ "testing"
+)
+
+const (
+ Values = 999
+ Multiplier = 1000
+)
+
+type buffer struct {
+ value int
+}
+
+func TestProc(t *testing.T) {
+ c := createCountPipe()
+ am := Proc(c, 8, multiply)
+ ad := Proc(am, 8, devide)
+ a := Proc(ad, 8, multiply)
+
+ out := make([]int, Values-1)
+
+ count := 0
+ for i := range a {
+ out[count] = i
+ count++
+ }
+
+ for _, v := range out {
+ if v < Multiplier {
+ t.Errorf("Invalid output %d", v)
+ }
+ }
+}
+
+func TestYield(t *testing.T) {
+ lin := make([]int, Values)
+
+ for i := 0; i < Values; i++ {
+ lin[i] = i
+ }
+
+ cout := Yield(lin)
+
+ count := 0
+ for out := range cout {
+ if out != lin[count] {
+ t.Errorf("Invalid output %d", out)
+ }
+ count++
+ }
+}
+
+func TestTailProc(t *testing.T) {
+ buffers := createBuffers()
+ c := Yield(buffers)
+ am := Proc(c, 8, multiplyBuffer)
+ TailProc(am, 8, devideBuffer)
+
+ for i, v := range buffers {
+ if v.value != i {
+ t.Errorf("Invalid output %d != %d", v.value, i)
+ }
+ }
+}
+
+func TestWait(t *testing.T) {
+ buffers := createBuffers()
+ c := Yield(buffers)
+ am := Proc(c, 8, multiplyBuffer)
+ Wait(am)
+
+ for i, v := range buffers {
+ if v.value != (i * Multiplier) {
+ t.Errorf("Invalid output %d != %d", v.value, i)
+ }
+ }
+}
+
+func devideBuffer(in *buffer) error {
+ in.value = in.value / Multiplier
+ return nil
+}
+
+func createBuffers() []*buffer {
+ buffers := make([]*buffer, Values)
+
+ for i := 0; i < Values; i++ {
+ buffers[i] = &buffer{value: i}
+ }
+
+ return buffers
+}
+
+func multiplyBuffer(in *buffer) (*buffer, error) {
+ in.value = in.value * Multiplier
+ return in, nil
+}
+
+func createCountPipe() <-chan int {
+ cout := make(chan int)
+
+ go func(<-chan int) {
+ for i := 1; i < Values; i++ {
+ cout <- i
+ }
+ close(cout)
+ }(cout)
+
+ return cout
+}
+
+func multiply(in int) (int, error) {
+ return in * Multiplier, nil
+}
+
+func devide(in int) (int, error) {
+ return in / Multiplier, nil
+}