From f5d13165f450871fa0595727b72e0e61689ab75f Mon Sep 17 00:00:00 2001 From: mkelvers Date: Sun, 12 Apr 2026 14:53:24 +0200 Subject: [PATCH] db: add anime fetch retry queue --- internal/database/models.go | 9 ++ internal/database/querier.go | 6 ++ internal/database/queries.sql | 39 ++++++++ internal/database/queries.sql.go | 115 +++++++++++++++++++++++ migrations/009_add_anime_fetch_retry.sql | 11 +++ 5 files changed, 180 insertions(+) create mode 100644 migrations/009_add_anime_fetch_retry.sql diff --git a/internal/database/models.go b/internal/database/models.go index f1990d9..b1da0cc 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -29,6 +29,15 @@ type Anime struct { RelationsSyncedAt sql.NullTime `json:"relations_synced_at"` } +type AnimeFetchRetry struct { + AnimeID int64 `json:"anime_id"` + Attempts int64 `json:"attempts"` + NextRetryAt time.Time `json:"next_retry_at"` + LastError string `json:"last_error"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + type AnimeRelation struct { AnimeID int64 `json:"anime_id"` RelatedAnimeID int64 `json:"related_anime_id"` diff --git a/internal/database/querier.go b/internal/database/querier.go index 4e2b487..547e2ea 100644 --- a/internal/database/querier.go +++ b/internal/database/querier.go @@ -9,15 +9,20 @@ import ( ) type Querier interface { + CountPendingAnimeFetchRetries(ctx context.Context) (int64, error) CreateSession(ctx context.Context, arg CreateSessionParams) (Session, error) CreateUser(ctx context.Context, arg CreateUserParams) (User, error) + DeleteAnimeFetchRetry(ctx context.Context, animeID int64) error DeleteExpiredJikanCache(ctx context.Context) error DeleteSession(ctx context.Context, id string) error DeleteUserSessions(ctx context.Context, userID string) error DeleteWatchListEntry(ctx context.Context, arg DeleteWatchListEntryParams) error + EnqueueAnimeFetchRetry(ctx context.Context, arg EnqueueAnimeFetchRetryParams) error GetAnime(ctx context.Context, id int64) (Anime, error) GetAnimeNeedingRelationSync(ctx context.Context) ([]GetAnimeNeedingRelationSyncRow, error) + GetDueAnimeFetchRetries(ctx context.Context, limit int64) ([]AnimeFetchRetry, error) GetJikanCache(ctx context.Context, key string) (string, error) + GetJikanCacheStale(ctx context.Context, key string) (string, error) GetSession(ctx context.Context, id string) (Session, error) GetUpcomingSeasons(ctx context.Context, userID string) ([]GetUpcomingSeasonsRow, error) GetUser(ctx context.Context, id string) (User, error) @@ -26,6 +31,7 @@ type Querier interface { GetUserWatchList(ctx context.Context, userID string) ([]GetUserWatchListRow, error) GetWatchListEntry(ctx context.Context, arg GetWatchListEntryParams) (WatchListEntry, error) GetWatchingAnime(ctx context.Context, userID string) ([]GetWatchingAnimeRow, error) + MarkAnimeFetchRetryFailed(ctx context.Context, arg MarkAnimeFetchRetryFailedParams) error MarkRelationsSynced(ctx context.Context, id int64) error SetJikanCache(ctx context.Context, arg SetJikanCacheParams) error UpdateAnimeStatus(ctx context.Context, arg UpdateAnimeStatusParams) error diff --git a/internal/database/queries.sql b/internal/database/queries.sql index 3447a41..5d5568e 100644 --- a/internal/database/queries.sql +++ b/internal/database/queries.sql @@ -164,6 +164,10 @@ ORDER BY related.id DESC; SELECT data FROM jikan_cache WHERE key = ? AND expires_at > CURRENT_TIMESTAMP LIMIT 1; +-- name: GetJikanCacheStale :one +SELECT data FROM jikan_cache +WHERE key = ? LIMIT 1; + -- name: SetJikanCache :exec INSERT INTO jikan_cache (key, data, expires_at) VALUES (?, ?, ?) @@ -174,3 +178,38 @@ ON CONFLICT (key) DO UPDATE SET -- name: DeleteExpiredJikanCache :exec DELETE FROM jikan_cache WHERE expires_at <= CURRENT_TIMESTAMP; + +-- name: EnqueueAnimeFetchRetry :exec +INSERT INTO anime_fetch_retry (anime_id, attempts, next_retry_at, last_error, updated_at) +VALUES (?, 0, CURRENT_TIMESTAMP, ?, CURRENT_TIMESTAMP) +ON CONFLICT (anime_id) DO UPDATE SET + next_retry_at = CASE + WHEN anime_fetch_retry.next_retry_at > CURRENT_TIMESTAMP THEN anime_fetch_retry.next_retry_at + ELSE CURRENT_TIMESTAMP + END, + last_error = excluded.last_error, + updated_at = CURRENT_TIMESTAMP; + +-- name: GetDueAnimeFetchRetries :many +SELECT anime_id, attempts, next_retry_at, last_error, created_at, updated_at +FROM anime_fetch_retry +WHERE next_retry_at <= CURRENT_TIMESTAMP +ORDER BY next_retry_at ASC +LIMIT ?; + +-- name: MarkAnimeFetchRetryFailed :exec +UPDATE anime_fetch_retry +SET attempts = attempts + 1, + next_retry_at = datetime(CURRENT_TIMESTAMP, ?), + last_error = ?, + updated_at = CURRENT_TIMESTAMP +WHERE anime_id = ?; + +-- name: DeleteAnimeFetchRetry :exec +DELETE FROM anime_fetch_retry +WHERE anime_id = ?; + +-- name: CountPendingAnimeFetchRetries :one +SELECT COUNT(*) +FROM anime_fetch_retry +WHERE next_retry_at <= CURRENT_TIMESTAMP; diff --git a/internal/database/queries.sql.go b/internal/database/queries.sql.go index 031e336..2c40cdc 100644 --- a/internal/database/queries.sql.go +++ b/internal/database/queries.sql.go @@ -11,6 +11,19 @@ import ( "time" ) +const countPendingAnimeFetchRetries = `-- name: CountPendingAnimeFetchRetries :one +SELECT COUNT(*) +FROM anime_fetch_retry +WHERE next_retry_at <= CURRENT_TIMESTAMP +` + +func (q *Queries) CountPendingAnimeFetchRetries(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, countPendingAnimeFetchRetries) + var count int64 + err := row.Scan(&count) + return count, err +} + const createSession = `-- name: CreateSession :one INSERT INTO session (id, user_id, expires_at) VALUES (?, ?, ?) @@ -66,6 +79,16 @@ func (q *Queries) CreateUser(ctx context.Context, arg CreateUserParams) (User, e return i, err } +const deleteAnimeFetchRetry = `-- name: DeleteAnimeFetchRetry :exec +DELETE FROM anime_fetch_retry +WHERE anime_id = ? +` + +func (q *Queries) DeleteAnimeFetchRetry(ctx context.Context, animeID int64) error { + _, err := q.db.ExecContext(ctx, deleteAnimeFetchRetry, animeID) + return err +} + const deleteExpiredJikanCache = `-- name: DeleteExpiredJikanCache :exec DELETE FROM jikan_cache WHERE expires_at <= CURRENT_TIMESTAMP ` @@ -108,6 +131,28 @@ func (q *Queries) DeleteWatchListEntry(ctx context.Context, arg DeleteWatchListE return err } +const enqueueAnimeFetchRetry = `-- name: EnqueueAnimeFetchRetry :exec +INSERT INTO anime_fetch_retry (anime_id, attempts, next_retry_at, last_error, updated_at) +VALUES (?, 0, CURRENT_TIMESTAMP, ?, CURRENT_TIMESTAMP) +ON CONFLICT (anime_id) DO UPDATE SET + next_retry_at = CASE + WHEN anime_fetch_retry.next_retry_at > CURRENT_TIMESTAMP THEN anime_fetch_retry.next_retry_at + ELSE CURRENT_TIMESTAMP + END, + last_error = excluded.last_error, + updated_at = CURRENT_TIMESTAMP +` + +type EnqueueAnimeFetchRetryParams struct { + AnimeID int64 `json:"anime_id"` + LastError string `json:"last_error"` +} + +func (q *Queries) EnqueueAnimeFetchRetry(ctx context.Context, arg EnqueueAnimeFetchRetryParams) error { + _, err := q.db.ExecContext(ctx, enqueueAnimeFetchRetry, arg.AnimeID, arg.LastError) + return err +} + const getAnime = `-- name: GetAnime :one SELECT id, title_original, image_url, created_at, title_english, title_japanese, airing, status, relations_synced_at FROM anime WHERE id = ? LIMIT 1 ` @@ -180,6 +225,44 @@ func (q *Queries) GetAnimeNeedingRelationSync(ctx context.Context) ([]GetAnimeNe return items, nil } +const getDueAnimeFetchRetries = `-- name: GetDueAnimeFetchRetries :many +SELECT anime_id, attempts, next_retry_at, last_error, created_at, updated_at +FROM anime_fetch_retry +WHERE next_retry_at <= CURRENT_TIMESTAMP +ORDER BY next_retry_at ASC +LIMIT ? +` + +func (q *Queries) GetDueAnimeFetchRetries(ctx context.Context, limit int64) ([]AnimeFetchRetry, error) { + rows, err := q.db.QueryContext(ctx, getDueAnimeFetchRetries, limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []AnimeFetchRetry + for rows.Next() { + var i AnimeFetchRetry + if err := rows.Scan( + &i.AnimeID, + &i.Attempts, + &i.NextRetryAt, + &i.LastError, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getJikanCache = `-- name: GetJikanCache :one SELECT data FROM jikan_cache WHERE key = ? AND expires_at > CURRENT_TIMESTAMP LIMIT 1 @@ -192,6 +275,18 @@ func (q *Queries) GetJikanCache(ctx context.Context, key string) (string, error) return data, err } +const getJikanCacheStale = `-- name: GetJikanCacheStale :one +SELECT data FROM jikan_cache +WHERE key = ? LIMIT 1 +` + +func (q *Queries) GetJikanCacheStale(ctx context.Context, key string) (string, error) { + row := q.db.QueryRowContext(ctx, getJikanCacheStale, key) + var data string + err := row.Scan(&data) + return data, err +} + const getSession = `-- name: GetSession :one SELECT id, user_id, expires_at, created_at FROM session WHERE id = ? LIMIT 1 ` @@ -511,6 +606,26 @@ func (q *Queries) GetWatchingAnime(ctx context.Context, userID string) ([]GetWat return items, nil } +const markAnimeFetchRetryFailed = `-- name: MarkAnimeFetchRetryFailed :exec +UPDATE anime_fetch_retry +SET attempts = attempts + 1, + next_retry_at = datetime(CURRENT_TIMESTAMP, ?), + last_error = ?, + updated_at = CURRENT_TIMESTAMP +WHERE anime_id = ? +` + +type MarkAnimeFetchRetryFailedParams struct { + Datetime interface{} `json:"datetime"` + LastError string `json:"last_error"` + AnimeID int64 `json:"anime_id"` +} + +func (q *Queries) MarkAnimeFetchRetryFailed(ctx context.Context, arg MarkAnimeFetchRetryFailedParams) error { + _, err := q.db.ExecContext(ctx, markAnimeFetchRetryFailed, arg.Datetime, arg.LastError, arg.AnimeID) + return err +} + const markRelationsSynced = `-- name: MarkRelationsSynced :exec UPDATE anime SET relations_synced_at = CURRENT_TIMESTAMP WHERE id = ? ` diff --git a/migrations/009_add_anime_fetch_retry.sql b/migrations/009_add_anime_fetch_retry.sql new file mode 100644 index 0000000..ffbbe40 --- /dev/null +++ b/migrations/009_add_anime_fetch_retry.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS anime_fetch_retry ( + anime_id INTEGER PRIMARY KEY, + attempts INTEGER NOT NULL DEFAULT 0, + next_retry_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_error TEXT NOT NULL DEFAULT '', + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_anime_fetch_retry_next_retry_at +ON anime_fetch_retry(next_retry_at);