diff options
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/worker/http.go | 26 | ||||
| -rw-r--r-- | pkg/worker/worker.go | 94 | 
2 files changed, 120 insertions, 0 deletions
| diff --git a/pkg/worker/http.go b/pkg/worker/http.go new file mode 100644 index 0000000..1d56f86 --- /dev/null +++ b/pkg/worker/http.go @@ -0,0 +1,26 @@ +package worker + +import ( +	"context" +	"net/http" +) + +type ServerTask struct { +	server *http.Server +} + +func NewServerTask(server *http.Server) *ServerTask { +	return &ServerTask{ +		server: server, +	} +} + +func (self *ServerTask) Start(ctx context.Context) error { +	go func() { +		// nolint: errcheck +		self.server.ListenAndServe() +	}() + +	<-ctx.Done() +	return self.server.Shutdown(ctx) +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go new file mode 100644 index 0000000..6b5c21c --- /dev/null +++ b/pkg/worker/worker.go @@ -0,0 +1,94 @@ +package worker + +import ( +	"context" +	"errors" +	"log/slog" +	"time" + +	"golang.org/x/sync/errgroup" +) + +type ( +	Task interface { +		Start(context.Context) error +	} + +	Work struct { +		Name string +		Task Task +		wait time.Duration +	} + +	TaskPool struct { +		tasks []*Work +	} +) + +const ( +	format = "2006.01.02 15:04:05" +) + +func NewTaskPool() *TaskPool { +	return &TaskPool{} +} + +func (w *Work) run(ctx context.Context) error { +	// first time fire from the get go +	timer := time.NewTimer(time.Nanosecond) + +	for { +		select { +		case <-ctx.Done(): +			return ctx.Err() +		case <-timer.C: +			if err := w.Task.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { +				return err +			} +		} +		timer.Reset(w.wait) +	} +} + +func (self *TaskPool) AddTask(name string, wait time.Duration, task Task) { +	self.tasks = append(self.tasks, &Work{ +		Name: name, +		Task: task, +		wait: wait, +	}) +} + +func (self *TaskPool) Start(ctx context.Context) error { +	var g errgroup.Group + +	for _, w := range self.tasks { +		g.Go(func(w *Work) func() error { +			return func() error { +				slog.Info("Process starting", "time", time.Now().Format(format), "name", w.Name) +				now := time.Now() +				if err := w.run(ctx); err != nil && !errors.Is(context.Canceled, err) { +					since := time.Since(now) +					slog.Error( +						"Process erred", +						"time", time.Now().Format(format), +						"name", w.Name, +						"error", err, +						"duration", since, +					) +					return err +				} else { +					since := time.Since(now) +					slog.Info( +						"Process ended", +						"time", time.Now().Format(format), +						"name", w.Name, +						"duration", since, +					) +				} +				return nil +			} +		}(w)) +	} + +	return g.Wait() +} | 
