aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabriel Arakaki Giovanini <mail@gabrielgio.me>2022-10-16 19:13:41 +0200
committerGabriel Arakaki Giovanini <mail@gabrielgio.me>2022-10-16 19:16:35 +0200
commiteb1b7d7d9149114eb6b4287b7cb40c49dccfb26e (patch)
tree12f797651abeda9512d56d8922199ae9edb2a293
parent98844247a424558939228b82e9b5f28d723c4fe0 (diff)
downloadporg-eb1b7d7d9149114eb6b4287b7cb40c49dccfb26e.tar.gz
porg-eb1b7d7d9149114eb6b4287b7cb40c49dccfb26e.tar.bz2
porg-eb1b7d7d9149114eb6b4287b7cb40c49dccfb26e.zip
feat: Add storage interfaceHEADmaster
With this is easier to interact with storage layers.
-rw-r--r--.gitignore1
-rw-r--r--go.mod5
-rw-r--r--go.sum2
-rw-r--r--main.go97
-rw-r--r--pipe/pipe.go28
-rw-r--r--pipe/pipe_test.go15
-rw-r--r--storage/storage.go6
-rw-r--r--storage/storage_fs.go86
-rw-r--r--storage/storage_fs_test.go3
-rw-r--r--storage/storage_wd.go114
10 files changed, 294 insertions, 63 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..1cbe7f2
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+porg
diff --git a/go.mod b/go.mod
index 5bdbcb7..3dcd697 100644
--- a/go.mod
+++ b/go.mod
@@ -2,4 +2,7 @@ module porg
go 1.18
-require github.com/barasher/go-exiftool v1.8.0 // indirect
+require (
+ github.com/barasher/go-exiftool v1.8.0 // indirect
+ github.com/studio-b12/gowebdav v0.0.0-20220128162035-c7b1ff8a5e62 // indirect
+)
diff --git a/go.sum b/go.sum
index 316fde4..771a2ee 100644
--- a/go.sum
+++ b/go.sum
@@ -4,3 +4,5 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/studio-b12/gowebdav v0.0.0-20220128162035-c7b1ff8a5e62 h1:b2nJXyPCa9HY7giGM+kYcnQ71m14JnGdQabMPmyt++8=
+github.com/studio-b12/gowebdav v0.0.0-20220128162035-c7b1ff8a5e62/go.mod h1:bHA7t77X/QFExdeAnDzK6vKM34kEZAcE1OX4MfiwjkE=
diff --git a/main.go b/main.go
index 1a3c554..7349781 100644
--- a/main.go
+++ b/main.go
@@ -3,52 +3,48 @@ package main
import (
"fmt"
"os"
+ "path"
"path/filepath"
"porg/fileop"
"porg/pipe"
+ "porg/storage"
+ "time"
)
-type SHAInfo struct {
+type File struct {
path string
sha256 string
}
-type File struct {
- Path string
- SHA256 string
- Time string
-}
-
type MoveCommand struct {
src string
dest string
}
-func Calculate(file string) *SHAInfo {
- f, err := os.Open(file)
- if err != nil {
- fmt.Println(err)
- return nil
- }
- defer f.Close()
+func Calculate(s storage.Storage) func(string) (*File, error) {
+ return func(file string) (*File, error) {
+ f, err := s.Get(file)
+ if err != nil {
+ return nil, err
+ }
- sha, err := fileop.CalculateSHA256(f)
- if err != nil {
- fmt.Println(err)
- return nil
- } else {
- return &SHAInfo{path: file, sha256: sha}
+ sha, err := fileop.CalculateSHA256(f)
+ if err != nil {
+ return nil, err
+ } else {
+ return &File{path: file, sha256: sha}, nil
+ }
}
}
-func Move(base string) func(*SHAInfo) *MoveCommand {
- return func(info *SHAInfo) *MoveCommand {
+func Move(base string) func(*File) (*MoveCommand, error) {
+ return func(info *File) (*MoveCommand, error) {
ext := filepath.Ext(info.path)
head := info.sha256[0:2]
tail := info.sha256[2:]
newPath := fmt.Sprintf("%s/%s/%s%s", base, head, tail, ext)
- return &MoveCommand{src: info.path, dest: newPath}
+ return &MoveCommand{src: info.path, dest: newPath}, nil
}
}
@@ -69,32 +65,41 @@ func Delete(path string) {
}
}
-func Apply(move *MoveCommand) {
- dir := filepath.Dir(move.dest)
- os.Mkdir(dir, 0755)
- if err := os.Rename(move.src, move.dest); err != nil {
- fmt.Println(err)
+func Apply(s storage.Storage, d storage.Storage) func(move *MoveCommand) error {
+ return func(move *MoveCommand) error {
+ dir := filepath.Dir(move.dest)
+ d.Mkdir(dir)
+ start := time.Now()
+ exists, err := d.Exists(move.dest)
+ if err != nil {
+ return fmt.Errorf("%s %s", time.Since(start), err.Error())
+ } else if !exists || err != nil {
+ fmt.Println(">>> ", time.Since(start), " ", move.dest)
+ r, _ := s.Get(move.src)
+ d.Put(move.dest, r)
+ } else {
+ fmt.Println("!!! ", time.Since(start), " ", move.src)
+ }
+ return nil
}
-
}
func main() {
- for _, path := range os.Args[1:] {
- info, err := os.Stat(path)
-
- if err != nil || !info.IsDir() {
- continue
- }
-
- fmt.Println("Processing folder")
- fmt.Println(path)
- files := fileop.WalkFolder(path, fileop.File)
- shas := pipe.Proc(files, 4, Calculate)
- cmds := pipe.Proc(shas, 1, Move(path))
- pipe.TailProc(cmds, 4, Apply)
-
- fmt.Println("Deleting empty folders")
- files = fileop.WalkFolder(path, fileop.Folder)
- pipe.TailProc(files, 2, Delete)
+ root := ""
+ username := ""
+ password := ""
+ nextcloud := storage.NewWebDavStorage(root, username, password)
+
+ fs := storage.NewFileSystem()
+ basePath := ""
+ list, _ := fs.List(basePath)
+
+ for _, f := range list {
+ base := path.Base(f)
+ fmt.Println("....", base)
+ files := fs.Walk(f, storage.File)
+ shas := pipe.Proc(files, 10, Calculate(fs))
+ cmds := pipe.Proc(shas, 1, Move("OF/"+base))
+ pipe.TailProc(cmds, 10, Apply(fs, nextcloud))
}
}
diff --git a/pipe/pipe.go b/pipe/pipe.go
index 2ca1f8b..0cfa5fd 100644
--- a/pipe/pipe.go
+++ b/pipe/pipe.go
@@ -1,10 +1,11 @@
package pipe
import (
+ "fmt"
"sync"
)
-func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V {
+func Proc[T any, V any](cin <-chan T, count int, proc func(T) (V, error)) <-chan V {
cout := make(chan V)
var wg sync.WaitGroup
@@ -12,7 +13,12 @@ func Proc[T any, V any](cin <-chan T, count int, proc func(T) V) <-chan V {
wg.Add(1)
go func(<-chan T, chan<- V) {
for i := range cin {
- cout <- proc(i)
+ v, err := proc(i)
+ if err == nil {
+ cout <- v
+ } else {
+ fmt.Println("####", err.Error())
+ }
}
wg.Done()
}(cin, cout)
@@ -34,14 +40,16 @@ func Wait[T any](cin <-chan T) {
}
}
-func TailProc[T any](cin <-chan T, count int, proc func(T)) {
+func TailProc[T any](cin <-chan T, count int, proc func(T) error) {
var wg sync.WaitGroup
- for i := 0; i < 4; i++ {
+ for i := 0; i < count; i++ {
wg.Add(1)
go func(<-chan T) {
for i := range cin {
- proc(i)
+ if err := proc(i); err != nil {
+ fmt.Println("####", err.Error())
+ }
}
wg.Done()
@@ -62,3 +70,13 @@ func Yield[T any](in []T) <-chan T {
return cout
}
+
+func Map[T any, V any](m func(T) V, vs []T) []V {
+ result := make([]V, len(vs))
+
+ for i, v := range vs {
+ result[i] = m(v)
+ }
+
+ return result
+}
diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go
index e71b05b..950d039 100644
--- a/pipe/pipe_test.go
+++ b/pipe/pipe_test.go
@@ -20,12 +20,12 @@ func createCountPipe() <-chan int {
return cout
}
-func multiply(in int) int {
- return in * Multiplier
+func multiply(in int) (int, error) {
+ return in * Multiplier, nil
}
-func devide(in int) int {
- return in / Multiplier
+func devide(in int) (int, error) {
+ return in / Multiplier, nil
}
func TestProc(t *testing.T) {
@@ -81,13 +81,14 @@ func createBuffers() []*buffer {
return buffers
}
-func multiplyBuffer(in *buffer) *buffer {
+func multiplyBuffer(in *buffer) (*buffer, error) {
in.value = in.value * Multiplier
- return in
+ return in, nil
}
-func devideBuffer(in *buffer) {
+func devideBuffer(in *buffer) error {
in.value = in.value / Multiplier
+ return nil
}
func TestTailProc(t *testing.T) {
diff --git a/storage/storage.go b/storage/storage.go
index b788efb..880396a 100644
--- a/storage/storage.go
+++ b/storage/storage.go
@@ -17,6 +17,12 @@ const (
type Storage interface {
Walk(path string, walkMode WalkMode) <-chan string
Get(path string) (io.Reader, error)
+ Put(path string, f io.Reader) error
+ List(path string) ([]string, error)
+ Move(src string, dest string) error
+ Copy(src string, dest string) error
+ Mkdir(path string) error
+ Exists(path string) (bool, error)
}
func CalculateSHA256(r io.Reader) (string, error) {
diff --git a/storage/storage_fs.go b/storage/storage_fs.go
index 35ce58b..16bf8e8 100644
--- a/storage/storage_fs.go
+++ b/storage/storage_fs.go
@@ -1,22 +1,38 @@
package storage
import (
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
"os"
"path/filepath"
+ "porg/pipe"
)
type FileSystem struct {
- root string
}
-func WalkFolder(folder string, walkMode WalkMode) <-chan string {
+func NewFileSystem() *FileSystem {
+ return &FileSystem{}
+}
+
+func (fs *FileSystem) Get(path string) (io.Reader, error) {
+ return os.Open(path)
+}
+
+func (fs *FileSystem) Walk(folder string, walkMode WalkMode) <-chan string {
c := make(chan string)
go func(folder string, c chan string) {
filepath.Walk(folder, func(path string, info os.FileInfo, err error) error {
file, _ := os.Open(path)
defer file.Close()
- fileInfo, _ := file.Stat()
+ fileInfo, err := file.Stat()
+ if err != nil {
+ fmt.Println("@@@@ ", err.Error())
+ return nil
+ }
switch walkMode {
case Folder:
@@ -38,3 +54,67 @@ func WalkFolder(folder string, walkMode WalkMode) <-chan string {
return c
}
+
+func (fs *FileSystem) Put(path string, f io.Reader) error {
+ fo, err := os.Create(path)
+ if err != nil {
+ return err
+ }
+
+ if _, err := io.Copy(fo, f); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (fs *FileSystem) List(path string) ([]string, error) {
+ files, err := ioutil.ReadDir(path)
+ if err != err {
+ return nil, err
+ }
+
+ findPath := func(f os.FileInfo) string {
+ return path + "/" + f.Name()
+ }
+ return pipe.Map(findPath, files), nil
+}
+
+func (fs *FileSystem) Move(src string, dst string) error {
+ return os.Rename(src, dst)
+}
+
+func (fs *FileSystem) Copy(src string, dst string) error {
+ in, err := os.Open(src)
+ if err != nil {
+ return err
+ }
+ defer in.Close()
+
+ out, err := os.Create(dst)
+ if err != nil {
+ return err
+ }
+ defer out.Close()
+
+ _, err = io.Copy(out, in)
+ if err != nil {
+ return err
+ }
+ return out.Close()
+}
+
+func (fs *FileSystem) Exists(path string) (bool, error) {
+ _, err := os.Stat(path)
+ if err == nil {
+ return true, nil
+ }
+ if errors.Is(err, os.ErrNotExist) {
+ return false, nil
+ }
+ return false, err
+}
+
+func (fs *FileSystem) Mkdir(path string) error {
+ return os.MkdirAll(path, 0777)
+}
diff --git a/storage/storage_fs_test.go b/storage/storage_fs_test.go
index b746c7e..429de58 100644
--- a/storage/storage_fs_test.go
+++ b/storage/storage_fs_test.go
@@ -10,12 +10,13 @@ func TestWalk(t *testing.T) {
folder := testutil.CreateFolder()
files := map[string]struct{}{}
walkedFiles := map[string]struct{}{}
+ fs := NewFileSystem()
for i := 0; i < fileCount; i++ {
files[testutil.AppendEmptyFile(folder)] = struct{}{}
}
- c := WalkFolder(folder, File)
+ c := fs.Walk(folder, File)
for file := range c {
walkedFiles[file] = struct{}{}
}
diff --git a/storage/storage_wd.go b/storage/storage_wd.go
new file mode 100644
index 0000000..b166992
--- /dev/null
+++ b/storage/storage_wd.go
@@ -0,0 +1,114 @@
+package storage
+
+import (
+ "errors"
+ "io"
+ "os"
+
+ "github.com/studio-b12/gowebdav"
+)
+
+type WebDav struct {
+ client *gowebdav.Client
+}
+
+func NewWebDavStorage(root string, username string, password string) *WebDav {
+ c := gowebdav.NewClient(root, username, password)
+ return &WebDav{client: c}
+}
+
+func (wd *WebDav) Mkdir(path string) error {
+ if err := wd.client.MkdirAll(path, 0644); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (wd *WebDav) Put(path string, f io.Reader) error {
+ return wd.client.WriteStream(path, f, 0644)
+}
+
+func (wd *WebDav) Move(src string, dest string) error {
+ if src != dest {
+ if err := wd.client.Rename(src, dest, true); err != nil {
+ return nil
+ }
+ }
+ return nil
+}
+
+func (wd *WebDav) Exists(path string) (bool, error) {
+ _, err := wd.client.Stat(path)
+ if err == nil {
+ return true, nil
+ }
+ if errors.Is(err, os.ErrNotExist) {
+ return false, nil
+ }
+ return false, err
+}
+
+func (wd *WebDav) Copy(src string, dest string) error {
+ if src != dest {
+ if err := wd.client.Copy(src, dest, true); err != nil {
+ return nil
+ }
+ }
+ return nil
+}
+
+func (wd *WebDav) Get(path string) (io.Reader, error) {
+ return wd.client.ReadStream(path)
+}
+
+func (wd *WebDav) List(path string) ([]string, error) {
+ files, err := wd.client.ReadDir(path)
+ if err != nil {
+ return nil, err
+ }
+
+ r := make([]string, len(files))
+
+ for i, v := range files {
+ r[i] = v.Name()
+ }
+
+ return r, nil
+}
+
+func (wd *WebDav) Walk(folder string, walkMode WalkMode) <-chan string {
+ c := make(chan string)
+
+ go func(string, chan string) {
+ var queue []string
+ queue = append(queue, folder)
+
+ for len(queue) > 0 {
+ f := queue[0]
+ queue = queue[1:]
+
+ files, _ := wd.client.ReadDir(f)
+ for _, fileInfo := range files {
+ if fileInfo.IsDir() {
+ queue = append(queue, f+"/"+fileInfo.Name())
+ }
+
+ switch walkMode {
+ case Folder:
+ if fileInfo.IsDir() {
+ c <- f + "/" + fileInfo.Name()
+ }
+ case File:
+ if !fileInfo.IsDir() {
+ c <- f + "/" + fileInfo.Name()
+ }
+ case FileFolder:
+ c <- f + "/" + fileInfo.Name()
+ }
+ }
+ }
+ close(c)
+ }(folder, c)
+
+ return c
+}