fix: ensure worker queue advances on error

This commit is contained in:
2026-04-08 15:00:19 +02:00
parent 5f4ed88d1a
commit a5b72c599c
4 changed files with 57 additions and 36 deletions

View File

@@ -23,6 +23,7 @@ type Querier interface {
GetUserWatchList(ctx context.Context, userID string) ([]GetUserWatchListRow, error)
GetWatchListEntry(ctx context.Context, arg GetWatchListEntryParams) (WatchListEntry, error)
GetWatchingAnime(ctx context.Context, userID string) ([]GetWatchingAnimeRow, error)
MarkRelationsSynced(ctx context.Context, id int64) error
UpdateAnimeStatus(ctx context.Context, arg UpdateAnimeStatusParams) error
UpsertAnime(ctx context.Context, arg UpsertAnimeParams) (Anime, error)
UpsertAnimeRelation(ctx context.Context, arg UpsertAnimeRelationParams) error

View File

@@ -86,7 +86,10 @@ ON CONFLICT (anime_id, related_anime_id) DO UPDATE SET
relation_type = excluded.relation_type;
-- name: UpdateAnimeStatus :exec
UPDATE anime SET status = ?, relations_synced_at = CURRENT_TIMESTAMP WHERE id = ?;
UPDATE anime SET status = ? WHERE id = ?;
-- name: MarkRelationsSynced :exec
UPDATE anime SET relations_synced_at = CURRENT_TIMESTAMP WHERE id = ?;
-- name: GetAnimeNeedingRelationSync :many
WITH RECURSIVE sequel_chain AS (

View File

@@ -459,8 +459,17 @@ func (q *Queries) GetWatchingAnime(ctx context.Context, userID string) ([]GetWat
return items, nil
}
const markRelationsSynced = `-- name: MarkRelationsSynced :exec
UPDATE anime SET relations_synced_at = CURRENT_TIMESTAMP WHERE id = ?
`
func (q *Queries) MarkRelationsSynced(ctx context.Context, id int64) error {
_, err := q.db.ExecContext(ctx, markRelationsSynced, id)
return err
}
const updateAnimeStatus = `-- name: UpdateAnimeStatus :exec
UPDATE anime SET status = ?, relations_synced_at = CURRENT_TIMESTAMP WHERE id = ?
UPDATE anime SET status = ? WHERE id = ?
`
type UpdateAnimeStatusParams struct {

View File

@@ -51,47 +51,55 @@ func (w *Worker) syncRelations(ctx context.Context) {
for _, a := range animes {
log.Printf("worker: syncing relations for anime %d (%s)", a.ID, a.TitleOriginal)
relations, err := w.client.GetRelationsData(int(a.ID))
if err != nil {
log.Printf("worker: failed to fetch relations for %d: %v", a.ID, err)
continue
}
func() {
// Always mark as synced and sleep so the queue advances even on error.
defer func() {
err := w.db.MarkRelationsSynced(ctx, a.ID)
if err != nil {
log.Printf("worker: failed to mark relations synced for %d: %v", a.ID, err)
}
time.Sleep(400 * time.Millisecond)
}()
for _, rel := range relations.Data {
for _, entry := range rel.Entry {
if entry.Type == "anime" {
// We just insert the relation.
err := w.db.UpsertAnimeRelation(ctx, database.UpsertAnimeRelationParams{
AnimeID: a.ID,
RelatedAnimeID: int64(entry.MalID),
RelationType: rel.Relation,
})
if err != nil {
log.Printf("worker: failed to insert relation %d -> %d: %v", a.ID, entry.MalID, err)
}
relations, err := w.client.GetRelationsData(int(a.ID))
if err != nil {
log.Printf("worker: failed to fetch relations for %d: %v", a.ID, err)
return
}
// If it's a Sequel, we should also make sure the related anime is tracked
if rel.Relation == "Sequel" {
w.ensureAnimeExistsAndStatusUpdated(ctx, entry.MalID)
for _, rel := range relations.Data {
for _, entry := range rel.Entry {
if entry.Type == "anime" {
// We just insert the relation.
err := w.db.UpsertAnimeRelation(ctx, database.UpsertAnimeRelationParams{
AnimeID: a.ID,
RelatedAnimeID: int64(entry.MalID),
RelationType: rel.Relation,
})
if err != nil {
log.Printf("worker: failed to insert relation %d -> %d: %v", a.ID, entry.MalID, err)
}
// If it's a Sequel, we should also make sure the related anime is tracked
if rel.Relation == "Sequel" {
w.ensureAnimeExistsAndStatusUpdated(ctx, entry.MalID)
}
}
}
}
}
// Also update the status of the anime itself so we know if it's Not yet aired, etc.
animeDetails, err := w.client.GetAnimeByID(int(a.ID))
if err == nil {
err = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{
Status: sql.NullString{String: animeDetails.Status, Valid: true},
ID: a.ID,
})
if err != nil {
log.Printf("worker: failed to update status for %d: %v", a.ID, err)
// Also update the status of the anime itself so we know if it's Not yet aired, etc.
animeDetails, err := w.client.GetAnimeByID(int(a.ID))
if err == nil {
err = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{
Status: sql.NullString{String: animeDetails.Status, Valid: true},
ID: a.ID,
})
if err != nil {
log.Printf("worker: failed to update status for %d: %v", a.ID, err)
}
}
}
// Sleep briefly to respect Jikan's 3 req/sec rate limit
time.Sleep(400 * time.Millisecond)
}()
}
}