diff options
| author | Gabriel Arakaki Giovanini <mail@gabrielgio.me> | 2023-06-24 00:34:57 +0200 | 
|---|---|---|
| committer | Gabriel Arakaki Giovanini <mail@gabrielgio.me> | 2023-06-25 15:18:12 +0200 | 
| commit | 57b41ad766b3c4505672c12f058f10c7a132dd5b (patch) | |
| tree | 80aea95babc8f740e86dadc7469236bdffc78c26 /pkg | |
| parent | d5261d7f121985f13f9d19e9efd5c2ae3d4b5609 (diff) | |
| download | lens-57b41ad766b3c4505672c12f058f10c7a132dd5b.tar.gz lens-57b41ad766b3c4505672c12f058f10c7a132dd5b.tar.bz2 lens-57b41ad766b3c4505672c12f058f10c7a132dd5b.zip | |
feat: Remove unnecessary function
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/coroutine/coroutine.go | 33 | ||||
| -rw-r--r-- | pkg/coroutine/coroutine_test.go | 63 | ||||
| -rw-r--r-- | pkg/coroutines/coroutines.go | 1 | ||||
| -rw-r--r-- | pkg/worker/exif_scanner.go | 30 | ||||
| -rw-r--r-- | pkg/worker/list_processor_test.go | 21 | ||||
| -rw-r--r-- | pkg/worker/worker.go | 12 | 
6 files changed, 120 insertions, 40 deletions
| diff --git a/pkg/coroutine/coroutine.go b/pkg/coroutine/coroutine.go new file mode 100644 index 0000000..96d149e --- /dev/null +++ b/pkg/coroutine/coroutine.go @@ -0,0 +1,33 @@ +package coroutine + +import ( +	"context" +) + +// WrapProcess wraps process into a go routine and make it cancelable through context +func WrapProcess[V any](ctx context.Context, fun func() (V, error)) (V, error) { +	c := make(chan V) +	e := make(chan error) +	go func() { +		defer close(c) +		defer close(e) + +		v, err := fun() +		if err != nil { +			e <- err +		} else { +			c <- v +		} +	}() + +	select { +	case <-ctx.Done(): +		var zero V +		return zero, ctx.Err() +	case m := <-c: +		return m, nil +	case err := <-e: +		var zero V +		return zero, err +	} +} diff --git a/pkg/coroutine/coroutine_test.go b/pkg/coroutine/coroutine_test.go new file mode 100644 index 0000000..e876ec3 --- /dev/null +++ b/pkg/coroutine/coroutine_test.go @@ -0,0 +1,63 @@ +//go:build unit + +package coroutine + +import ( +	"context" +	"errors" +	"sync" +	"testing" +	"time" + +	"git.sr.ht/~gabrielgio/img/pkg/testkit" +) + +var rError = errors.New("This is a error") + +func imediatReturn() (string, error) { +	return "A string", nil +} + +func imediatErrorReturn() (string, error) { +	return "", rError +} + +func haltedReturn() (string, error) { +	time.Sleep(time.Hour) +	return "", nil +} + +func TestImediatReturn(t *testing.T) { +	ctx := context.Background() +	v, err := WrapProcess(ctx, imediatReturn) +	testkit.TestError(t, "WrapProcess", nil, err) +	testkit.TestValue(t, "WrapProcess", "A string", v) +} + +func TestImediatErrorReturn(t *testing.T) { +	ctx := context.Background() +	v, err := WrapProcess(ctx, imediatErrorReturn) +	testkit.TestError(t, "WrapProcess", rError, err) +	testkit.TestValue(t, "WrapProcess", "", v) +} + +func TestHaltedReturn(t *testing.T) { +	ctx := context.Background() +	ctx, cancel := context.WithCancel(ctx) + +	var ( +		err error +		wg  sync.WaitGroup +	) + +	wg.Add(1) +	go func(err *error) { +		defer wg.Done() +		_, *err = WrapProcess(ctx, haltedReturn) +	}(&err) + +	cancel() +	wg.Wait() + +	testkit.TestError(t, "WrapProcess", context.Canceled, err) +} diff --git a/pkg/coroutines/coroutines.go b/pkg/coroutines/coroutines.go deleted file mode 100644 index c0f7247..0000000 --- a/pkg/coroutines/coroutines.go +++ /dev/null @@ -1 +0,0 @@ -package coroutines diff --git a/pkg/worker/exif_scanner.go b/pkg/worker/exif_scanner.go index 4aa247d..91eed12 100644 --- a/pkg/worker/exif_scanner.go +++ b/pkg/worker/exif_scanner.go @@ -4,6 +4,7 @@ import (  	"context"  	"git.sr.ht/~gabrielgio/img/pkg/components/media" +	"git.sr.ht/~gabrielgio/img/pkg/coroutine"  	"git.sr.ht/~gabrielgio/img/pkg/fileop"  ) @@ -33,36 +34,11 @@ func (e *EXIFScanner) Query(ctx context.Context) ([]*media.Media, error) {  	return medias, nil  } -func wrapReadExif(ctx context.Context, path string) (*media.MediaEXIF, error) { -	c := make(chan *media.MediaEXIF) -	e := make(chan error) -	go func() { -		defer close(c) -		defer close(e) - -		newExif, err := fileop.ReadExif(path) -		if err != nil { -			e <- err -		} else { -			c <- newExif -		} -	}() - -	select { -	case <-ctx.Done(): -		return nil, ctx.Err() -	case m := <-c: -		return m, nil -	case err := <-e: -		return nil, err -	} -} -  func (e *EXIFScanner) Process(ctx context.Context, m *media.Media) error { -	newExif, err := wrapReadExif(ctx, m.Path) +	exif, err := coroutine.WrapProcess(ctx, func() (*media.MediaEXIF, error) { return fileop.ReadExif(m.Path) })  	if err != nil {  		return err  	} -	return e.repository.CreateEXIF(ctx, m.ID, newExif) +	return e.repository.CreateEXIF(ctx, m.ID, exif)  } diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go index 1e4ed2d..35672f3 100644 --- a/pkg/worker/list_processor_test.go +++ b/pkg/worker/list_processor_test.go @@ -10,6 +10,7 @@ import (  	"testing"  	"git.sr.ht/~gabrielgio/img/pkg/testkit" +	"github.com/sirupsen/logrus"  )  type ( @@ -24,10 +25,13 @@ type (  )  func TestListProcessorLimit(t *testing.T) { -	mock := &mockCounterListProcessor{ -		countTo: 10000, -	} -	worker := NewWorkerFromListProcessor[int](mock, nil) +	var ( +		log       = logrus.New() +		scheduler = NewScheduler(1) +		mock      = &mockCounterListProcessor{countTo: 10000} +	) + +	worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))  	err := worker.Start(context.Background())  	testkit.TestFatalError(t, "Start", err) @@ -36,8 +40,13 @@ func TestListProcessorLimit(t *testing.T) {  }  func TestListProcessorContextCancelQuery(t *testing.T) { -	mock := &mockContextListProcessor{} -	worker := NewWorkerFromListProcessor[int](mock, nil) +	var ( +		log       = logrus.New() +		scheduler = NewScheduler(1) +		mock      = &mockContextListProcessor{} +	) + +	worker := NewWorkerFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))  	ctx, cancel := context.WithCancel(context.Background())  	var wg sync.WaitGroup diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 18cc0e2..359384a 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -20,7 +20,6 @@ type (  	WorkerPool struct {  		workers []*Work -		wg      sync.WaitGroup  	}  ) @@ -36,10 +35,13 @@ func (self *WorkerPool) AddWorker(name string, worker Worker) {  }  func (self *WorkerPool) Start(ctx context.Context) { -	self.wg.Add(len(self.workers)) +	var wg sync.WaitGroup + +	wg.Add(len(self.workers)) +  	for _, w := range self.workers {  		go func(w *Work) { -			defer self.wg.Done() +			defer wg.Done()  			if err := w.Worker.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {  				fmt.Println("Processes finished, error", w.Name, err.Error())  			} else { @@ -47,8 +49,6 @@ func (self *WorkerPool) Start(ctx context.Context) {  			}  		}(w)  	} -} -func (self *WorkerPool) Wait() { -	self.wg.Wait() +	wg.Wait()  } | 
