From a5b72c599c7dd22e743276f33a041a7f79503be9 Mon Sep 17 00:00:00 2001 From: mkelvers Date: Wed, 8 Apr 2026 15:00:19 +0200 Subject: [PATCH] fix: ensure worker queue advances on error --- internal/database/querier.go | 1 + internal/database/queries.sql | 5 ++- internal/database/queries.sql.go | 11 ++++- internal/worker/relations.go | 76 ++++++++++++++++++-------------- 4 files changed, 57 insertions(+), 36 deletions(-) diff --git a/internal/database/querier.go b/internal/database/querier.go index b0c57d8..e335c76 100644 --- a/internal/database/querier.go +++ b/internal/database/querier.go @@ -23,6 +23,7 @@ type Querier interface { GetUserWatchList(ctx context.Context, userID string) ([]GetUserWatchListRow, error) GetWatchListEntry(ctx context.Context, arg GetWatchListEntryParams) (WatchListEntry, error) GetWatchingAnime(ctx context.Context, userID string) ([]GetWatchingAnimeRow, error) + MarkRelationsSynced(ctx context.Context, id int64) error UpdateAnimeStatus(ctx context.Context, arg UpdateAnimeStatusParams) error UpsertAnime(ctx context.Context, arg UpsertAnimeParams) (Anime, error) UpsertAnimeRelation(ctx context.Context, arg UpsertAnimeRelationParams) error diff --git a/internal/database/queries.sql b/internal/database/queries.sql index 6655f25..095c6d5 100644 --- a/internal/database/queries.sql +++ b/internal/database/queries.sql @@ -86,7 +86,10 @@ ON CONFLICT (anime_id, related_anime_id) DO UPDATE SET relation_type = excluded.relation_type; -- name: UpdateAnimeStatus :exec -UPDATE anime SET status = ?, relations_synced_at = CURRENT_TIMESTAMP WHERE id = ?; +UPDATE anime SET status = ? WHERE id = ?; + +-- name: MarkRelationsSynced :exec +UPDATE anime SET relations_synced_at = CURRENT_TIMESTAMP WHERE id = ?; -- name: GetAnimeNeedingRelationSync :many WITH RECURSIVE sequel_chain AS ( diff --git a/internal/database/queries.sql.go b/internal/database/queries.sql.go index e666199..60f7f7c 100644 --- a/internal/database/queries.sql.go +++ b/internal/database/queries.sql.go @@ -459,8 +459,17 @@ func (q *Queries) GetWatchingAnime(ctx context.Context, userID string) ([]GetWat return items, nil } +const markRelationsSynced = `-- name: MarkRelationsSynced :exec +UPDATE anime SET relations_synced_at = CURRENT_TIMESTAMP WHERE id = ? +` + +func (q *Queries) MarkRelationsSynced(ctx context.Context, id int64) error { + _, err := q.db.ExecContext(ctx, markRelationsSynced, id) + return err +} + const updateAnimeStatus = `-- name: UpdateAnimeStatus :exec -UPDATE anime SET status = ?, relations_synced_at = CURRENT_TIMESTAMP WHERE id = ? +UPDATE anime SET status = ? WHERE id = ? ` type UpdateAnimeStatusParams struct { diff --git a/internal/worker/relations.go b/internal/worker/relations.go index acc3b72..c83cb9a 100644 --- a/internal/worker/relations.go +++ b/internal/worker/relations.go @@ -51,47 +51,55 @@ func (w *Worker) syncRelations(ctx context.Context) { for _, a := range animes { log.Printf("worker: syncing relations for anime %d (%s)", a.ID, a.TitleOriginal) - relations, err := w.client.GetRelationsData(int(a.ID)) - if err != nil { - log.Printf("worker: failed to fetch relations for %d: %v", a.ID, err) - continue - } + func() { + // Always mark as synced and sleep so the queue advances even on error. + defer func() { + err := w.db.MarkRelationsSynced(ctx, a.ID) + if err != nil { + log.Printf("worker: failed to mark relations synced for %d: %v", a.ID, err) + } + time.Sleep(400 * time.Millisecond) + }() - for _, rel := range relations.Data { - for _, entry := range rel.Entry { - if entry.Type == "anime" { - // We just insert the relation. - err := w.db.UpsertAnimeRelation(ctx, database.UpsertAnimeRelationParams{ - AnimeID: a.ID, - RelatedAnimeID: int64(entry.MalID), - RelationType: rel.Relation, - }) - if err != nil { - log.Printf("worker: failed to insert relation %d -> %d: %v", a.ID, entry.MalID, err) - } + relations, err := w.client.GetRelationsData(int(a.ID)) + if err != nil { + log.Printf("worker: failed to fetch relations for %d: %v", a.ID, err) + return + } - // If it's a Sequel, we should also make sure the related anime is tracked - if rel.Relation == "Sequel" { - w.ensureAnimeExistsAndStatusUpdated(ctx, entry.MalID) + for _, rel := range relations.Data { + for _, entry := range rel.Entry { + if entry.Type == "anime" { + // We just insert the relation. + err := w.db.UpsertAnimeRelation(ctx, database.UpsertAnimeRelationParams{ + AnimeID: a.ID, + RelatedAnimeID: int64(entry.MalID), + RelationType: rel.Relation, + }) + if err != nil { + log.Printf("worker: failed to insert relation %d -> %d: %v", a.ID, entry.MalID, err) + } + + // If it's a Sequel, we should also make sure the related anime is tracked + if rel.Relation == "Sequel" { + w.ensureAnimeExistsAndStatusUpdated(ctx, entry.MalID) + } } } } - } - // Also update the status of the anime itself so we know if it's Not yet aired, etc. - animeDetails, err := w.client.GetAnimeByID(int(a.ID)) - if err == nil { - err = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{ - Status: sql.NullString{String: animeDetails.Status, Valid: true}, - ID: a.ID, - }) - if err != nil { - log.Printf("worker: failed to update status for %d: %v", a.ID, err) + // Also update the status of the anime itself so we know if it's Not yet aired, etc. + animeDetails, err := w.client.GetAnimeByID(int(a.ID)) + if err == nil { + err = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{ + Status: sql.NullString{String: animeDetails.Status, Valid: true}, + ID: a.ID, + }) + if err != nil { + log.Printf("worker: failed to update status for %d: %v", a.ID, err) + } } - } - - // Sleep briefly to respect Jikan's 3 req/sec rate limit - time.Sleep(400 * time.Millisecond) + }() } }