diff options
-rw-r--r-- | cmd/server/main.go | 7 | ||||
-rw-r--r-- | pkg/database/repository/media.go | 21 | ||||
-rw-r--r-- | pkg/database/sql/media.go | 122 | ||||
-rw-r--r-- | pkg/database/sql/migration.go | 2 | ||||
-rw-r--r-- | pkg/list/list.go | 25 | ||||
-rw-r--r-- | pkg/worker/list_processor.go | 59 | ||||
-rw-r--r-- | pkg/worker/scanner/album_scanner.go | 98 | ||||
-rw-r--r-- | pkg/worker/scanner/exif_scanner.go | 2 | ||||
-rw-r--r-- | pkg/worker/scanner/thumbnail_scanner.go | 2 |
9 files changed, 327 insertions, 11 deletions
diff --git a/cmd/server/main.go b/cmd/server/main.go index 39987e5..a3d5124 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -118,6 +118,7 @@ func main() { fileScanner = scanner.NewFileScanner(mediaRepository, userRepository) exifScanner = scanner.NewEXIFScanner(mediaRepository) thumbnailScanner = scanner.NewThumbnailScanner(*cachePath, mediaRepository) + albumScanner = scanner.NewAlbumScanner(mediaRepository) ) // worker @@ -138,6 +139,11 @@ func main() { scheduler, logrus.WithField("context", "thumbnail scanner"), ) + albumWorker = worker.NewWorkerFromSerialProcessor[*repository.Media]( + albumScanner, + scheduler, + logrus.WithField("context", "thumbnail scanner"), + ) ) pool := worker.NewWorkerPool() @@ -145,6 +151,7 @@ func main() { pool.AddWorker("exif scanner", exifWorker) pool.AddWorker("file scanner", fileWorker) pool.AddWorker("thumbnail scanner", thumbnailWorker) + pool.AddWorker("album scanner", albumWorker) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() diff --git a/pkg/database/repository/media.go b/pkg/database/repository/media.go index 6f5b39b..d6addbf 100644 --- a/pkg/database/repository/media.go +++ b/pkg/database/repository/media.go @@ -34,6 +34,10 @@ type ( GPSLongitude *float64 } + Album struct { + ID uint + } + MediaThumbnail struct { Path string } @@ -51,6 +55,17 @@ type ( MIMEType string } + CreateAlbum struct { + ParentID *uint + Name string + Path string + } + + CreateAlbumFile struct { + MediaID uint + AlbumID uint + } + MediaRepository interface { Create(context.Context, *CreateMedia) error Exists(context.Context, string) (bool, error) @@ -66,6 +81,12 @@ type ( ListEmptyThumbnail(context.Context, *Pagination) ([]*Media, error) GetThumbnail(context.Context, uint) (*MediaThumbnail, error) CreateThumbnail(context.Context, uint, *MediaThumbnail) error + + ListEmptyAlbums(context.Context, *Pagination) ([]*Media, error) + ExistsAlbumByAbsolutePath(context.Context, string) (bool, error) + GetAlbumByAbsolutePath(context.Context, string) (*Album, error) + CreateAlbum(context.Context, *CreateAlbum) (*Album, error) + CreateAlbumFile(context.Context, *CreateAlbumFile) error } ) diff --git a/pkg/database/sql/media.go b/pkg/database/sql/media.go index e5ba517..59e39ee 100644 --- a/pkg/database/sql/media.go +++ b/pkg/database/sql/media.go @@ -48,6 +48,22 @@ type ( Media Media } + MediaAlbum struct { + gorm.Model + ParentID *uint + Parent *MediaAlbum + Name string + Path string + } + + MediaAlbumFile struct { + gorm.Model + MediaID uint + Media Media + AlbumID uint + Album MediaAlbum + } + MediaRepository struct { db *gorm.DB } @@ -55,13 +71,13 @@ type ( var _ repository.MediaRepository = &MediaRepository{} -func (self *Media) ToModel() *repository.Media { +func (m *Media) ToModel() *repository.Media { return &repository.Media{ - ID: self.ID, - Path: self.Path, - PathHash: self.PathHash, - Name: self.Name, - MIMEType: self.MIMEType, + ID: m.ID, + Path: m.Path, + PathHash: m.PathHash, + Name: m.Name, + MIMEType: m.MIMEType, } } @@ -86,6 +102,12 @@ func (m *MediaEXIF) ToModel() *repository.MediaEXIF { } } +func (a *MediaAlbum) ToModel() *repository.Album { + return &repository.Album{ + ID: a.ID, + } +} + func (m *MediaThumbnail) ToModel() *repository.MediaThumbnail { return &repository.MediaThumbnail{ Path: m.Path, @@ -329,3 +351,91 @@ func (m *MediaRepository) CreateThumbnail(ctx context.Context, mediaID uint, thu return nil } + +func (r *MediaRepository) ListEmptyAlbums(ctx context.Context, pagination *repository.Pagination) ([]*repository.Media, error) { + medias := make([]*Media, 0) + result := r.db. + WithContext(ctx). + Model(&Media{}). + Joins("left join media_album_files on media.id = media_album_files.media_id"). + Where("media_album_files.media_id IS NULL"). + Offset(pagination.Page * pagination.Size). + Limit(pagination.Size). + Order("media.created_at DESC"). + Find(&medias) + + if result.Error != nil { + return nil, result.Error + } + + m := list.Map(medias, func(s *Media) *repository.Media { + return s.ToModel() + }) + + return m, nil +} + +func (m *MediaRepository) ExistsAlbumByAbsolutePath(ctx context.Context, path string) (bool, error) { + var exists bool + result := m.db. + WithContext(ctx). + Model(&MediaAlbum{}). + Select("count(id) > 0"). + Where("path = ?", path). + Find(&exists) + + if result.Error != nil { + return false, result.Error + } + + return exists, nil +} + +func (r *MediaRepository) GetAlbumByAbsolutePath(ctx context.Context, path string) (*repository.Album, error) { + m := &MediaAlbum{} + result := r.db. + WithContext(ctx). + Model(&MediaAlbum{}). + Where("path = ?", path). + Limit(1). + Take(m) + + if result.Error != nil { + return nil, result.Error + } + + return m.ToModel(), nil +} + +func (m *MediaRepository) CreateAlbum(ctx context.Context, createAlbum *repository.CreateAlbum) (*repository.Album, error) { + album := &MediaAlbum{ + ParentID: createAlbum.ParentID, + Name: createAlbum.Name, + Path: createAlbum.Path, + } + + result := m.db. + WithContext(ctx). + Create(album) + if result.Error != nil { + return nil, result.Error + } + + return album.ToModel(), nil +} + +func (m *MediaRepository) CreateAlbumFile(ctx context.Context, createAlbumFile *repository.CreateAlbumFile) error { + albumFile := &MediaAlbumFile{ + MediaID: createAlbumFile.MediaID, + AlbumID: createAlbumFile.AlbumID, + } + + result := m.db. + WithContext(ctx). + Create(albumFile) + if result.Error != nil { + return result.Error + } + + return nil +} diff --git a/pkg/database/sql/migration.go b/pkg/database/sql/migration.go index 076bf69..73e4297 100644 --- a/pkg/database/sql/migration.go +++ b/pkg/database/sql/migration.go @@ -9,6 +9,8 @@ func Migrate(db *gorm.DB) error { &Media{}, &MediaEXIF{}, &MediaThumbnail{}, + &MediaAlbum{}, + &MediaAlbumFile{}, } { if err := db.AutoMigrate(m); err != nil { return err diff --git a/pkg/list/list.go b/pkg/list/list.go index ff259f7..dfc3fb7 100644 --- a/pkg/list/list.go +++ b/pkg/list/list.go @@ -7,3 +7,28 @@ func Map[V any, T any](source []V, fun func(V) T) []T { } return result } + +type Pair[T, U any] struct { + Left T + Right U +} + +func Zip[T, U any](left []T, right []U) []Pair[T, U] { + // pick the array with the smaller length + l := len(left) + if len(left) > len(right) { + l = len(right) + } + + pairs := make([]Pair[T, U], len(left)) + for i := 0; i < l; i++ { + pairs[i] = Pair[T, U]{left[i], right[i]} + } + return pairs +} + +func Revert[T any](s []T) { + for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { + s[i], s[j] = s[j], s[i] + } +} diff --git a/pkg/worker/list_processor.go b/pkg/worker/list_processor.go index c060583..02817c9 100644 --- a/pkg/worker/list_processor.go +++ b/pkg/worker/list_processor.go @@ -20,7 +20,7 @@ type ( OnFail(context.Context, T, error) } - BatchProcessor[T any] interface { + ListProcessor[T any] interface { Query(context.Context) ([]T, error) Process(context.Context, T) error } @@ -32,14 +32,20 @@ type ( } batchProcessorWorker[T any] struct { - batchProcessor BatchProcessor[T] + batchProcessor ListProcessor[T] + logrus *logrus.Entry + scheduler *Scheduler + } + + serialProcessorWorker[T any] struct { + batchProcessor ListProcessor[T] logrus *logrus.Entry scheduler *Scheduler } ) func NewWorkerFromBatchProcessor[T any]( - batchProcessor BatchProcessor[T], + batchProcessor ListProcessor[T], scheduler *Scheduler, logrus *logrus.Entry, ) Worker { @@ -50,6 +56,18 @@ func NewWorkerFromBatchProcessor[T any]( } } +func NewWorkerFromSerialProcessor[T any]( + batchProcessor ListProcessor[T], + scheduler *Scheduler, + logrus *logrus.Entry, +) Worker { + return &serialProcessorWorker[T]{ + batchProcessor: batchProcessor, + scheduler: scheduler, + logrus: logrus, + } +} + func NewWorkerFromChanProcessor[T any]( chanProcessor ChanProcessor[T], scheduler *Scheduler, @@ -105,6 +123,41 @@ func (l *batchProcessorWorker[T]) Start(ctx context.Context) error { } } +func (l *serialProcessorWorker[T]) Start(ctx context.Context) error { + for { + values, err := l.batchProcessor.Query(ctx) + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if len(values) == 0 { + return nil + } + for _, v := range values { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + l.scheduler.Take() + if err := l.batchProcessor.Process(ctx, v); err != nil && !errors.Is(err, context.Canceled) { + l.logrus.WithError(err).Error("Error processing batch") + if failure, ok := l.batchProcessor.(OnFail[T]); ok { + failure.OnFail(ctx, v, err) + } + } + l.scheduler.Return() + } + } +} + func (l *chanProcessorWorker[T]) Start(ctx context.Context) error { c, err := l.chanProcessor.Query(ctx) if err != nil { diff --git a/pkg/worker/scanner/album_scanner.go b/pkg/worker/scanner/album_scanner.go new file mode 100644 index 0000000..618a184 --- /dev/null +++ b/pkg/worker/scanner/album_scanner.go @@ -0,0 +1,98 @@ +package scanner + +import ( + "context" + "os" + "path" + "path/filepath" + "strings" + + "git.sr.ht/~gabrielgio/img/pkg/database/repository" + "git.sr.ht/~gabrielgio/img/pkg/worker" +) + +type ( + AlbumScanner struct { + repository repository.MediaRepository + } +) + +var _ worker.ListProcessor[*repository.Media] = &AlbumScanner{} + +func NewAlbumScanner(repository repository.MediaRepository) *AlbumScanner { + return &AlbumScanner{ + repository: repository, + } +} + +func (e *AlbumScanner) Query(ctx context.Context) ([]*repository.Media, error) { + return e.repository.ListEmptyAlbums(ctx, &repository.Pagination{ + Page: 0, + Size: 100, + }) +} + +// This process will optmize for file over folder, which means it will assume that there will be +// more file then folder in the overall library. +// So will try to make as cheap as possible to look for fetching many files in a folder +// meaning it will start from checking from left to right in the path since it will assume +// that the path to that point has been registered already, resulting in a single lookup for the media +func (e *AlbumScanner) Process(ctx context.Context, m *repository.Media) error { + // we don't need the name of the file, only its path + filePath, _ := path.Split(m.Path) + + parts := strings.Split(filePath, string(os.PathSeparator)) + + subPaths := FanInwards(parts) + album, err := e.GetAndCreateNestedAlbuns(ctx, subPaths) + if err != nil { + return err + } + + return e.repository.CreateAlbumFile(ctx, &repository.CreateAlbumFile{ + MediaID: m.ID, + AlbumID: album.ID, + }) +} + +func (e *AlbumScanner) GetAndCreateNestedAlbuns(ctx context.Context, paths []string) (*repository.Album, error) { + if len(paths) == 1 { + // end of trail, we should create a album without parent + return e.repository.CreateAlbum(ctx, &repository.CreateAlbum{ + ParentID: nil, + Name: filepath.Base(paths[0]), + Path: paths[0], + }) + } + + exists, err := e.repository.ExistsAlbumByAbsolutePath(ctx, paths[0]) + if err != nil { + return nil, err + } + + if exists { + return e.repository.GetAlbumByAbsolutePath(ctx, paths[0]) + } + + //album does not exist, create it and get its parent id + a, err := e.GetAndCreateNestedAlbuns(ctx, paths[1:]) + if err != nil { + return nil, err + } + + return e.repository.CreateAlbum(ctx, &repository.CreateAlbum{ + ParentID: &a.ID, + Name: filepath.Base(paths[0]), + Path: paths[0], + }) + +} + +func FanInwards(paths []string) []string { + result := make([]string, 0, len(paths)) + for i := (len(paths) - 1); i >= 0; i-- { + subPaths := paths[0:i] + result = append(result, path.Join(subPaths...)) + } + return result +} diff --git a/pkg/worker/scanner/exif_scanner.go b/pkg/worker/scanner/exif_scanner.go index 47d717f..da63c0b 100644 --- a/pkg/worker/scanner/exif_scanner.go +++ b/pkg/worker/scanner/exif_scanner.go @@ -15,7 +15,7 @@ type ( } ) -var _ worker.BatchProcessor[*repository.Media] = &EXIFScanner{} +var _ worker.ListProcessor[*repository.Media] = &EXIFScanner{} func NewEXIFScanner(repository repository.MediaRepository) *EXIFScanner { return &EXIFScanner{ diff --git a/pkg/worker/scanner/thumbnail_scanner.go b/pkg/worker/scanner/thumbnail_scanner.go index 9f75464..6446c53 100644 --- a/pkg/worker/scanner/thumbnail_scanner.go +++ b/pkg/worker/scanner/thumbnail_scanner.go @@ -19,7 +19,7 @@ type ( } ) -var _ worker.BatchProcessor[*repository.Media] = &EXIFScanner{} +var _ worker.ListProcessor[*repository.Media] = &EXIFScanner{} func NewThumbnailScanner(cachePath string, repository repository.MediaRepository) *ThumbnailScanner { return &ThumbnailScanner{ |