diff options
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | LICENSE | 19 | ||||
-rw-r--r-- | Makefile | 14 | ||||
-rw-r--r-- | README.md | 6 | ||||
-rw-r--r-- | examples/countlines.go | 135 | ||||
-rw-r--r-- | go.mod | 3 | ||||
-rw-r--r-- | pipe.go | 128 | ||||
-rw-r--r-- | pipe_test.go | 122 |
8 files changed, 428 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e660fd9 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +bin/ @@ -0,0 +1,19 @@ +Copyright © 2023 Gabriel A. Giovanini + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the “Software”), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e6c3c4c --- /dev/null +++ b/Makefile @@ -0,0 +1,14 @@ +test: + go test -tags=unit . + +doc: + godoc \ + -goroot=${PWD} \ + -templates="./docs" \ + -url="/pkg/git.sr.ht/~gabrielgio/pipe/" \ + > index.html + +examples: countlines + +countlines: + go build -o bin/coutlines ./examples/countlines.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..d706bfa --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# PIPE + +A simple collection of channel based function make simple function to be +processed in multiples goroutine and piped. + +Check `examples` folder for how to use it. diff --git a/examples/countlines.go b/examples/countlines.go new file mode 100644 index 0000000..000efa5 --- /dev/null +++ b/examples/countlines.go @@ -0,0 +1,135 @@ +package main + +import ( + "bufio" + "errors" + "flag" + "fmt" + "os" + "path/filepath" + "unicode/utf8" + + "git.sr.ht/~gabrielgio/pipe" +) + +var ( + InvalidInput = errors.New("It was given a file instead of a folder.") + ColorGreen = "\033[32m" + ColorRed = "\033[31m" + ColorReset = "\033[0m" +) + +type ( + NonTextError struct { + Filename string + } +) + +func NewNonTextError(filename string) *NonTextError { + return &NonTextError{Filename: filename} +} + +func (b *NonTextError) Error() string { + return fmt.Sprintf("Non text file %s", b.Filename) +} + +func main() { + err := run() + if err != nil { + fmt.Printf("Error on main process: %s", err.Error()) + } +} + +func run() error { + var ( + dir = flag.String("dir", "", "Folder to recursively search for files. Default: \"\"") + quiet = flag.Bool("quiet", false, "If true only outputs the result. Default: false") + nproc = flag.Int("nproc", 4, "Number of goroutines used to count. Default: 4") + ) + + flag.Parse() + + info, err := os.Stat(*dir) + if err != nil { + return err + } + + if !info.IsDir() { + return InvalidInput + } + + cerr := make(chan error) + go func(c <-chan error) { + for e := range c { + if !*quiet { + fmt.Printf("%sERROR%s: %s\n", ColorRed, ColorReset, e.Error()) + } + } + }(cerr) + + c := count(*dir, *nproc, cerr) + fmt.Printf("%sCOUNT%s: %d\n", ColorGreen, ColorReset, c) + + return nil +} + +func count(dir string, nproc int, cerr chan<- error) int { + cfiles := walkFolder(dir) + ccount := pipe.ProcWithError(cfiles, cerr, nproc, countLines) + return pipe.TailReduceWithError(ccount, cerr, sum, 0) +} + +func sum(acc, lines int) (int, error) { + return acc + lines, nil +} + +func countLines(filename string) (int, error) { + + file, err := os.Open(filename) + if err != nil { + return 0, err + } + + defer file.Close() + + fileScanner := bufio.NewScanner(file) + fileScanner.Split(bufio.ScanLines) + + var count int + for fileScanner.Scan() { + if !utf8.ValidString(string(fileScanner.Text())) { + return 0, NewNonTextError(filename) + } + count++ + } + + return count, nil +} + +func walkFolder(folder string) <-chan string { + c := make(chan string) + + go func(folder string, c chan string) { + filepath.Walk(folder, func(path string, info os.FileInfo, err error) error { + file, err := os.Open(path) + if err != nil { + return filepath.SkipDir + } + defer file.Close() + + fileInfo, err := file.Stat() + if err != nil { + return filepath.SkipDir + } + + if !fileInfo.IsDir() { + c <- path + } + return nil + }) + close(c) + + }(folder, c) + + return c +} @@ -0,0 +1,3 @@ +module git.sr.ht/~gabrielgio/pipe + +go 1.21.4 @@ -0,0 +1,128 @@ +package pipe + +import ( + "sync" +) + +var SinkChanError chan error + +func init() { + SinkChanError = make(chan error) + go func() { + for range SinkChanError { + } + }() +} + +// Proc maps cin into a channel by applying proc and outputs to its return. +func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V { + return ProcWithError(cin, SinkChanError, count, proc) +} + +// ProcWithError maps cin into a channel by applying proc and outputs to its return. +func ProcWithError[T any, V any](cin <-chan T, cerr chan<- error, count int, proc func(T) (V, error)) <-chan V { + cout := make(chan V) + var wg sync.WaitGroup + + for i := 0; i < count; i++ { + wg.Add(1) + go func(<-chan T, chan<- V) { + for i := range cin { + v, err := proc(i) + if err == nil { + cout <- v + } else { + cerr <- err + } + } + wg.Done() + }(cin, cout) + } + + go func() { + wg.Wait() + close(cout) + }() + + return cout +} + +func noop[T any](_ T) {} + +// Wait waits a channel to close, it ends a chain of procs. +func Wait[T any](cin <-chan T) { + for v := range cin { + noop(v) + } +} + +// TailProc waits a channel to close by applying proc to it. +// It does not return a channel since its meant to end a chain of goroutine+channel. +func TailProc[T any](cin <-chan T, count int, proc func(T) error) { + TailProcWithError(cin, SinkChanError, count, proc) +} + +// TailProcWithError waits a channel to close by applying proc to it. +// It does not return a channel since its meant to end a chain of goroutine+channel. +func TailProcWithError[T any](cin <-chan T, cerr chan<- error, count int, proc func(T) error) { + var wg sync.WaitGroup + + for i := 0; i < count; i++ { + wg.Add(1) + go func(<-chan T) { + for i := range cin { + if err := proc(i); err != nil { + cerr <- err + } + } + wg.Done() + + }(cin) + } + wg.Wait() +} + +func TailReduce[T any, V any](cin <-chan T, red func(V, T) (V, error), vinit V) V { + return TailReduceWithError(cin, SinkChanError, red, vinit) +} + +func TailReduceWithError[T any, V any](cin <-chan T, cerr chan<- error, red func(V, T) (V, error), vinit V) V { + acc := vinit + + for i := range cin { + if a, err := red(acc, i); err != nil { + cerr <- err + } else { + acc = a + } + } + + return acc +} + +// Yield returna a channel from a slice. +// It will pipe in into a chan returned by the function. +func Yield[T any](in []T) <-chan T { + cout := make(chan T) + + go func(chan<- T) { + for _, a := range in { + cout <- a + } + close(cout) + }(cout) + + return cout +} + +// Map is a classic map. +// It gets vs apply m to it and return its value n times. +func Map[T any, V any](m func(T) V, vs []T) []V { + result := make([]V, len(vs)) + + for i, v := range vs { + result[i] = m(v) + } + + return result +} diff --git a/pipe_test.go b/pipe_test.go new file mode 100644 index 0000000..804adcc --- /dev/null +++ b/pipe_test.go @@ -0,0 +1,122 @@ +//go:build unit + +package pipe + +import ( + "testing" +) + +const ( + Values = 999 + Multiplier = 1000 +) + +type buffer struct { + value int +} + +func TestProc(t *testing.T) { + c := createCountPipe() + am := Proc(c, 8, multiply) + ad := Proc(am, 8, devide) + a := Proc(ad, 8, multiply) + + out := make([]int, Values-1) + + count := 0 + for i := range a { + out[count] = i + count++ + } + + for _, v := range out { + if v < Multiplier { + t.Errorf("Invalid output %d", v) + } + } +} + +func TestYield(t *testing.T) { + lin := make([]int, Values) + + for i := 0; i < Values; i++ { + lin[i] = i + } + + cout := Yield(lin) + + count := 0 + for out := range cout { + if out != lin[count] { + t.Errorf("Invalid output %d", out) + } + count++ + } +} + +func TestTailProc(t *testing.T) { + buffers := createBuffers() + c := Yield(buffers) + am := Proc(c, 8, multiplyBuffer) + TailProc(am, 8, devideBuffer) + + for i, v := range buffers { + if v.value != i { + t.Errorf("Invalid output %d != %d", v.value, i) + } + } +} + +func TestWait(t *testing.T) { + buffers := createBuffers() + c := Yield(buffers) + am := Proc(c, 8, multiplyBuffer) + Wait(am) + + for i, v := range buffers { + if v.value != (i * Multiplier) { + t.Errorf("Invalid output %d != %d", v.value, i) + } + } +} + +func devideBuffer(in *buffer) error { + in.value = in.value / Multiplier + return nil +} + +func createBuffers() []*buffer { + buffers := make([]*buffer, Values) + + for i := 0; i < Values; i++ { + buffers[i] = &buffer{value: i} + } + + return buffers +} + +func multiplyBuffer(in *buffer) (*buffer, error) { + in.value = in.value * Multiplier + return in, nil +} + +func createCountPipe() <-chan int { + cout := make(chan int) + + go func(<-chan int) { + for i := 1; i < Values; i++ { + cout <- i + } + close(cout) + }(cout) + + return cout +} + +func multiply(in int) (int, error) { + return in * Multiplier, nil +} + +func devide(in int) (int, error) { + return in / Multiplier, nil +} |