aboutsummaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker')
-rw-r--r--pkg/worker/list_processor.go39
-rw-r--r--pkg/worker/list_processor_test.go11
2 files changed, 30 insertions, 20 deletions
diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go
index ea6b453..c4c3781 100644
--- a/pkg/worker/list_processor.go
+++ b/pkg/worker/list_processor.go
@@ -3,9 +3,8 @@ package worker
import (
"context"
"errors"
+ "log/slog"
"sync"
-
- "github.com/sirupsen/logrus"
)
type (
@@ -27,19 +26,19 @@ type (
chanProcessorTask[T any] struct {
chanProcessor ChanProcessor[T]
- logrus *logrus.Entry
+ logger *slog.Logger
scheduler *Scheduler
}
batchProcessorTask[T any] struct {
batchProcessor ListProcessor[T]
- logrus *logrus.Entry
+ logger *slog.Logger
scheduler *Scheduler
}
serialProcessorTask[T any] struct {
batchProcessor ListProcessor[T]
- logrus *logrus.Entry
+ logger *slog.Logger
scheduler *Scheduler
}
)
@@ -47,36 +46,36 @@ type (
func NewTaskFromBatchProcessor[T any](
batchProcessor ListProcessor[T],
scheduler *Scheduler,
- logrus *logrus.Entry,
+ logger *slog.Logger,
) Task {
return &batchProcessorTask[T]{
batchProcessor: batchProcessor,
scheduler: scheduler,
- logrus: logrus,
+ logger: logger,
}
}
func NewTaskFromSerialProcessor[T any](
batchProcessor ListProcessor[T],
scheduler *Scheduler,
- logrus *logrus.Entry,
+ logger *slog.Logger,
) Task {
return &serialProcessorTask[T]{
batchProcessor: batchProcessor,
scheduler: scheduler,
- logrus: logrus,
+ logger: logger,
}
}
func NewTaskFromChanProcessor[T any](
chanProcessor ChanProcessor[T],
scheduler *Scheduler,
- logrus *logrus.Entry,
+ logger *slog.Logger,
) Task {
return &chanProcessorTask[T]{
chanProcessor: chanProcessor,
scheduler: scheduler,
- logrus: logrus,
+ logger: logger,
}
}
@@ -111,7 +110,10 @@ func (l *batchProcessorTask[T]) Start(ctx context.Context) error {
defer l.scheduler.Return()
defer wg.Done()
if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
- l.logrus.WithError(err).Error("Error processing batch")
+ l.logger.Error(
+ "Error processing batch",
+ slog.String("error", err.Error()),
+ )
if failure, ok := l.batchProcessor.(OnFail[T]); ok {
failure.OnFail(ctx, v, err)
}
@@ -148,7 +150,10 @@ func (l *serialProcessorTask[T]) Start(ctx context.Context) error {
l.scheduler.Take()
if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) {
- l.logrus.WithError(err).Error("Error processing batch")
+ l.logger.Error(
+ "Error processing batch",
+ slog.String("error", err.Error()),
+ )
if failure, ok := l.batchProcessor.(OnFail[T]); ok {
failure.OnFail(ctx, v, err)
}
@@ -177,7 +182,13 @@ func (l *chanProcessorTask[T]) Start(ctx context.Context) error {
go func(v T) {
defer l.scheduler.Return()
if err := l.chanProcessor.Process(ctx, v); err != nil {
- l.logrus.WithError(err).Error("Error processing chan")
+ l.logger.Error(
+ "Error processing batch",
+ slog.String("error", err.Error()),
+ )
+ if failure, ok := l.chanProcessor.(OnFail[T]); ok {
+ failure.OnFail(ctx, v, err)
+ }
}
}(v)
}
diff --git a/pkg/worker/list_processor_test.go b/pkg/worker/list_processor_test.go
index abdb907..21489e8 100644
--- a/pkg/worker/list_processor_test.go
+++ b/pkg/worker/list_processor_test.go
@@ -5,12 +5,11 @@ package worker
import (
"context"
"errors"
+ "log/slog"
"math/rand"
"sync"
"testing"
- "github.com/sirupsen/logrus"
-
"git.sr.ht/~gabrielgio/img/pkg/testkit"
)
@@ -27,12 +26,12 @@ type (
func TestListProcessorLimit(t *testing.T) {
var (
- log = logrus.New()
+ log = slog.Default()
scheduler = NewScheduler(1)
mock = &mockCounterListProcessor{countTo: 10000}
)
- worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
+ worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.With("context", "testing"))
err := worker.Start(context.Background())
testkit.TestFatalError(t, "Start", err)
@@ -42,12 +41,12 @@ func TestListProcessorLimit(t *testing.T) {
func TestListProcessorContextCancelQuery(t *testing.T) {
var (
- log = logrus.New()
+ log = slog.Default()
scheduler = NewScheduler(1)
mock = &mockContextListProcessor{}
)
- worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.WithField("context", "testing"))
+ worker := NewTaskFromBatchProcessor[int](mock, scheduler, log.With("context", "testing"))
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup