From eb1b7d7d9149114eb6b4287b7cb40c49dccfb26e Mon Sep 17 00:00:00 2001 From: Gabriel Arakaki Giovanini Date: Sun, 16 Oct 2022 19:13:41 +0200 Subject: feat: Add storage interface With this is easier to interact with storage layers. --- .gitignore | 1 + go.mod | 5 +- go.sum | 2 + main.go | 97 ++++++++++++++++++++------------------ pipe/pipe.go | 28 +++++++++-- pipe/pipe_test.go | 15 +++--- storage/storage.go | 6 +++ storage/storage_fs.go | 86 ++++++++++++++++++++++++++++++++-- storage/storage_fs_test.go | 3 +- storage/storage_wd.go | 114 +++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 294 insertions(+), 63 deletions(-) create mode 100644 .gitignore create mode 100644 storage/storage_wd.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1cbe7f2 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +porg diff --git a/go.mod b/go.mod index 5bdbcb7..3dcd697 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module porg go 1.18 -require github.com/barasher/go-exiftool v1.8.0 // indirect +require ( + github.com/barasher/go-exiftool v1.8.0 // indirect + github.com/studio-b12/gowebdav v0.0.0-20220128162035-c7b1ff8a5e62 // indirect +) diff --git a/go.sum b/go.sum index 316fde4..771a2ee 100644 --- a/go.sum +++ b/go.sum @@ -4,3 +4,5 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/studio-b12/gowebdav v0.0.0-20220128162035-c7b1ff8a5e62 h1:b2nJXyPCa9HY7giGM+kYcnQ71m14JnGdQabMPmyt++8= +github.com/studio-b12/gowebdav v0.0.0-20220128162035-c7b1ff8a5e62/go.mod h1:bHA7t77X/QFExdeAnDzK6vKM34kEZAcE1OX4MfiwjkE= diff --git a/main.go b/main.go index 1a3c554..7349781 100644 --- a/main.go +++ b/main.go @@ -3,52 +3,48 @@ package main import ( "fmt" "os" + "path" "path/filepath" "porg/fileop" "porg/pipe" + "porg/storage" + "time" ) -type SHAInfo struct { +type File struct { path string sha256 string } -type File struct { - Path string - SHA256 string - Time string -} - type MoveCommand struct { src string dest string } -func Calculate(file string) *SHAInfo { - f, err := os.Open(file) - if err != nil { - fmt.Println(err) - return nil - } - defer f.Close() +func Calculate(s storage.Storage) func(string) (*File, error) { + return func(file string) (*File, error) { + f, err := s.Get(file) + if err != nil { + return nil, err + } - sha, err := fileop.CalculateSHA256(f) - if err != nil { - fmt.Println(err) - return nil - } else { - return &SHAInfo{path: file, sha256: sha} + sha, err := fileop.CalculateSHA256(f) + if err != nil { + return nil, err + } else { + return &File{path: file, sha256: sha}, nil + } } } -func Move(base string) func(*SHAInfo) *MoveCommand { - return func(info *SHAInfo) *MoveCommand { +func Move(base string) func(*File) (*MoveCommand, error) { + return func(info *File) (*MoveCommand, error) { ext := filepath.Ext(info.path) head := info.sha256[0:2] tail := info.sha256[2:] newPath := fmt.Sprintf("%s/%s/%s%s", base, head, tail, ext) - return &MoveCommand{src: info.path, dest: newPath} + return &MoveCommand{src: info.path, dest: newPath}, nil } } @@ -69,32 +65,41 @@ func Delete(path string) { } } -func Apply(move *MoveCommand) { - dir := filepath.Dir(move.dest) - os.Mkdir(dir, 0755) - if err := os.Rename(move.src, move.dest); err != nil { - fmt.Println(err) +func Apply(s storage.Storage, d storage.Storage) func(move *MoveCommand) error { + return func(move *MoveCommand) error { + dir := filepath.Dir(move.dest) + d.Mkdir(dir) + start := time.Now() + exists, err := d.Exists(move.dest) + if err != nil { + return fmt.Errorf("%s %s", time.Since(start), err.Error()) + } else if !exists || err != nil { + fmt.Println(">>> ", time.Since(start), " ", move.dest) + r, _ := s.Get(move.src) + d.Put(move.dest, r) + } else { + fmt.Println("!!! ", time.Since(start), " ", move.src) + } + return nil } - } func main() { - for _, path := range os.Args[1:] { - info, err := os.Stat(path) - - if err != nil || !info.IsDir() { - continue - } - - fmt.Println("Processing folder") - fmt.Println(path) - files := fileop.WalkFolder(path, fileop.File) - shas := pipe.Proc(files, 4, Calculate) - cmds := pipe.Proc(shas, 1, Move(path)) - pipe.TailProc(cmds, 4, Apply) - - fmt.Println("Deleting empty folders") - files = fileop.WalkFolder(path, fileop.Folder) - pipe.TailProc(files, 2, Delete) + root := "" + username := "" + password := "" + nextcloud := storage.NewWebDavStorage(root, username, password) + + fs := storage.NewFileSystem() + basePath := "" + list, _ := fs.List(basePath) + + for _, f := range list { + base := path.Base(f) + fmt.Println("....", base) + files := fs.Walk(f, storage.File) + shas := pipe.Proc(files, 10, Calculate(fs)) + cmds := pipe.Proc(shas, 1, Move("OF/"+base)) + pipe.TailProc(cmds, 10, Apply(fs, nextcloud)) } } diff --git a/pipe/pipe.go b/pipe/pipe.go index 2ca1f8b..0cfa5fd 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -1,10 +1,11 @@ package pipe import ( + "fmt" "sync" ) -func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V { +func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V { cout := make(chan V) var wg sync.WaitGroup @@ -12,7 +13,12 @@ func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V { wg.Add(1) go func(<-chan T, chan<- V) { for i := range cin { - cout <- proc(i) + v, err := proc(i) + if err == nil { + cout <- v + } else { + fmt.Println("####", err.Error()) + } } wg.Done() }(cin, cout) @@ -34,14 +40,16 @@ func Wait[T any](cin <-chan T) { } } -func TailProc[T any](cin <-chan T, count int, proc func(T)) { +func TailProc[T any](cin <-chan T, count int, proc func(T) error) { var wg sync.WaitGroup - for i := 0; i < 4; i++ { + for i := 0; i < count; i++ { wg.Add(1) go func(<-chan T) { for i := range cin { - proc(i) + if err := proc(i); err != nil { + fmt.Println("####", err.Error()) + } } wg.Done() @@ -62,3 +70,13 @@ func Yield[T any](in []T) <-chan T { return cout } + +func Map[T any, V any](m func(T) V, vs []T) []V { + result := make([]V, len(vs)) + + for i, v := range vs { + result[i] = m(v) + } + + return result +} diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go index e71b05b..950d039 100644 --- a/pipe/pipe_test.go +++ b/pipe/pipe_test.go @@ -20,12 +20,12 @@ func createCountPipe() <-chan int { return cout } -func multiply(in int) int { - return in * Multiplier +func multiply(in int) (int, error) { + return in * Multiplier, nil } -func devide(in int) int { - return in / Multiplier +func devide(in int) (int, error) { + return in / Multiplier, nil } func TestProc(t *testing.T) { @@ -81,13 +81,14 @@ func createBuffers() []*buffer { return buffers } -func multiplyBuffer(in *buffer) *buffer { +func multiplyBuffer(in *buffer) (*buffer, error) { in.value = in.value * Multiplier - return in + return in, nil } -func devideBuffer(in *buffer) { +func devideBuffer(in *buffer) error { in.value = in.value / Multiplier + return nil } func TestTailProc(t *testing.T) { diff --git a/storage/storage.go b/storage/storage.go index b788efb..880396a 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -17,6 +17,12 @@ const ( type Storage interface { Walk(path string, walkMode WalkMode) <-chan string Get(path string) (io.Reader, error) + Put(path string, f io.Reader) error + List(path string) ([]string, error) + Move(src string, dest string) error + Copy(src string, dest string) error + Mkdir(path string) error + Exists(path string) (bool, error) } func CalculateSHA256(r io.Reader) (string, error) { diff --git a/storage/storage_fs.go b/storage/storage_fs.go index 35ce58b..16bf8e8 100644 --- a/storage/storage_fs.go +++ b/storage/storage_fs.go @@ -1,22 +1,38 @@ package storage import ( + "errors" + "fmt" + "io" + "io/ioutil" "os" "path/filepath" + "porg/pipe" ) type FileSystem struct { - root string } -func WalkFolder(folder string, walkMode WalkMode) <-chan string { +func NewFileSystem() *FileSystem { + return &FileSystem{} +} + +func (fs *FileSystem) Get(path string) (io.Reader, error) { + return os.Open(path) +} + +func (fs *FileSystem) Walk(folder string, walkMode WalkMode) <-chan string { c := make(chan string) go func(folder string, c chan string) { filepath.Walk(folder, func(path string, info os.FileInfo, err error) error { file, _ := os.Open(path) defer file.Close() - fileInfo, _ := file.Stat() + fileInfo, err := file.Stat() + if err != nil { + fmt.Println("@@@@ ", err.Error()) + return nil + } switch walkMode { case Folder: @@ -38,3 +54,67 @@ func WalkFolder(folder string, walkMode WalkMode) <-chan string { return c } + +func (fs *FileSystem) Put(path string, f io.Reader) error { + fo, err := os.Create(path) + if err != nil { + return err + } + + if _, err := io.Copy(fo, f); err != nil { + return err + } + + return nil +} + +func (fs *FileSystem) List(path string) ([]string, error) { + files, err := ioutil.ReadDir(path) + if err != err { + return nil, err + } + + findPath := func(f os.FileInfo) string { + return path + "/" + f.Name() + } + return pipe.Map(findPath, files), nil +} + +func (fs *FileSystem) Move(src string, dst string) error { + return os.Rename(src, dst) +} + +func (fs *FileSystem) Copy(src string, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return err + } + defer out.Close() + + _, err = io.Copy(out, in) + if err != nil { + return err + } + return out.Close() +} + +func (fs *FileSystem) Exists(path string) (bool, error) { + _, err := os.Stat(path) + if err == nil { + return true, nil + } + if errors.Is(err, os.ErrNotExist) { + return false, nil + } + return false, err +} + +func (fs *FileSystem) Mkdir(path string) error { + return os.MkdirAll(path, 0777) +} diff --git a/storage/storage_fs_test.go b/storage/storage_fs_test.go index b746c7e..429de58 100644 --- a/storage/storage_fs_test.go +++ b/storage/storage_fs_test.go @@ -10,12 +10,13 @@ func TestWalk(t *testing.T) { folder := testutil.CreateFolder() files := map[string]struct{}{} walkedFiles := map[string]struct{}{} + fs := NewFileSystem() for i := 0; i < fileCount; i++ { files[testutil.AppendEmptyFile(folder)] = struct{}{} } - c := WalkFolder(folder, File) + c := fs.Walk(folder, File) for file := range c { walkedFiles[file] = struct{}{} } diff --git a/storage/storage_wd.go b/storage/storage_wd.go new file mode 100644 index 0000000..b166992 --- /dev/null +++ b/storage/storage_wd.go @@ -0,0 +1,114 @@ +package storage + +import ( + "errors" + "io" + "os" + + "github.com/studio-b12/gowebdav" +) + +type WebDav struct { + client *gowebdav.Client +} + +func NewWebDavStorage(root string, username string, password string) *WebDav { + c := gowebdav.NewClient(root, username, password) + return &WebDav{client: c} +} + +func (wd *WebDav) Mkdir(path string) error { + if err := wd.client.MkdirAll(path, 0644); err != nil { + return err + } + return nil +} + +func (wd *WebDav) Put(path string, f io.Reader) error { + return wd.client.WriteStream(path, f, 0644) +} + +func (wd *WebDav) Move(src string, dest string) error { + if src != dest { + if err := wd.client.Rename(src, dest, true); err != nil { + return nil + } + } + return nil +} + +func (wd *WebDav) Exists(path string) (bool, error) { + _, err := wd.client.Stat(path) + if err == nil { + return true, nil + } + if errors.Is(err, os.ErrNotExist) { + return false, nil + } + return false, err +} + +func (wd *WebDav) Copy(src string, dest string) error { + if src != dest { + if err := wd.client.Copy(src, dest, true); err != nil { + return nil + } + } + return nil +} + +func (wd *WebDav) Get(path string) (io.Reader, error) { + return wd.client.ReadStream(path) +} + +func (wd *WebDav) List(path string) ([]string, error) { + files, err := wd.client.ReadDir(path) + if err != nil { + return nil, err + } + + r := make([]string, len(files)) + + for i, v := range files { + r[i] = v.Name() + } + + return r, nil +} + +func (wd *WebDav) Walk(folder string, walkMode WalkMode) <-chan string { + c := make(chan string) + + go func(string, chan string) { + var queue []string + queue = append(queue, folder) + + for len(queue) > 0 { + f := queue[0] + queue = queue[1:] + + files, _ := wd.client.ReadDir(f) + for _, fileInfo := range files { + if fileInfo.IsDir() { + queue = append(queue, f+"/"+fileInfo.Name()) + } + + switch walkMode { + case Folder: + if fileInfo.IsDir() { + c <- f + "/" + fileInfo.Name() + } + case File: + if !fileInfo.IsDir() { + c <- f + "/" + fileInfo.Name() + } + case FileFolder: + c <- f + "/" + fileInfo.Name() + } + } + } + close(c) + }(folder, c) + + return c +} -- cgit v1.2.3