db: add anime fetch retry queue
This commit is contained in:
@@ -29,6 +29,15 @@ type Anime struct {
|
||||
RelationsSyncedAt sql.NullTime `json:"relations_synced_at"`
|
||||
}
|
||||
|
||||
type AnimeFetchRetry struct {
|
||||
AnimeID int64 `json:"anime_id"`
|
||||
Attempts int64 `json:"attempts"`
|
||||
NextRetryAt time.Time `json:"next_retry_at"`
|
||||
LastError string `json:"last_error"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
|
||||
type AnimeRelation struct {
|
||||
AnimeID int64 `json:"anime_id"`
|
||||
RelatedAnimeID int64 `json:"related_anime_id"`
|
||||
|
||||
@@ -9,15 +9,20 @@ import (
|
||||
)
|
||||
|
||||
type Querier interface {
|
||||
CountPendingAnimeFetchRetries(ctx context.Context) (int64, error)
|
||||
CreateSession(ctx context.Context, arg CreateSessionParams) (Session, error)
|
||||
CreateUser(ctx context.Context, arg CreateUserParams) (User, error)
|
||||
DeleteAnimeFetchRetry(ctx context.Context, animeID int64) error
|
||||
DeleteExpiredJikanCache(ctx context.Context) error
|
||||
DeleteSession(ctx context.Context, id string) error
|
||||
DeleteUserSessions(ctx context.Context, userID string) error
|
||||
DeleteWatchListEntry(ctx context.Context, arg DeleteWatchListEntryParams) error
|
||||
EnqueueAnimeFetchRetry(ctx context.Context, arg EnqueueAnimeFetchRetryParams) error
|
||||
GetAnime(ctx context.Context, id int64) (Anime, error)
|
||||
GetAnimeNeedingRelationSync(ctx context.Context) ([]GetAnimeNeedingRelationSyncRow, error)
|
||||
GetDueAnimeFetchRetries(ctx context.Context, limit int64) ([]AnimeFetchRetry, error)
|
||||
GetJikanCache(ctx context.Context, key string) (string, error)
|
||||
GetJikanCacheStale(ctx context.Context, key string) (string, error)
|
||||
GetSession(ctx context.Context, id string) (Session, error)
|
||||
GetUpcomingSeasons(ctx context.Context, userID string) ([]GetUpcomingSeasonsRow, error)
|
||||
GetUser(ctx context.Context, id string) (User, error)
|
||||
@@ -26,6 +31,7 @@ type Querier interface {
|
||||
GetUserWatchList(ctx context.Context, userID string) ([]GetUserWatchListRow, error)
|
||||
GetWatchListEntry(ctx context.Context, arg GetWatchListEntryParams) (WatchListEntry, error)
|
||||
GetWatchingAnime(ctx context.Context, userID string) ([]GetWatchingAnimeRow, error)
|
||||
MarkAnimeFetchRetryFailed(ctx context.Context, arg MarkAnimeFetchRetryFailedParams) error
|
||||
MarkRelationsSynced(ctx context.Context, id int64) error
|
||||
SetJikanCache(ctx context.Context, arg SetJikanCacheParams) error
|
||||
UpdateAnimeStatus(ctx context.Context, arg UpdateAnimeStatusParams) error
|
||||
|
||||
@@ -164,6 +164,10 @@ ORDER BY related.id DESC;
|
||||
SELECT data FROM jikan_cache
|
||||
WHERE key = ? AND expires_at > CURRENT_TIMESTAMP LIMIT 1;
|
||||
|
||||
-- name: GetJikanCacheStale :one
|
||||
SELECT data FROM jikan_cache
|
||||
WHERE key = ? LIMIT 1;
|
||||
|
||||
-- name: SetJikanCache :exec
|
||||
INSERT INTO jikan_cache (key, data, expires_at)
|
||||
VALUES (?, ?, ?)
|
||||
@@ -174,3 +178,38 @@ ON CONFLICT (key) DO UPDATE SET
|
||||
|
||||
-- name: DeleteExpiredJikanCache :exec
|
||||
DELETE FROM jikan_cache WHERE expires_at <= CURRENT_TIMESTAMP;
|
||||
|
||||
-- name: EnqueueAnimeFetchRetry :exec
|
||||
INSERT INTO anime_fetch_retry (anime_id, attempts, next_retry_at, last_error, updated_at)
|
||||
VALUES (?, 0, CURRENT_TIMESTAMP, ?, CURRENT_TIMESTAMP)
|
||||
ON CONFLICT (anime_id) DO UPDATE SET
|
||||
next_retry_at = CASE
|
||||
WHEN anime_fetch_retry.next_retry_at > CURRENT_TIMESTAMP THEN anime_fetch_retry.next_retry_at
|
||||
ELSE CURRENT_TIMESTAMP
|
||||
END,
|
||||
last_error = excluded.last_error,
|
||||
updated_at = CURRENT_TIMESTAMP;
|
||||
|
||||
-- name: GetDueAnimeFetchRetries :many
|
||||
SELECT anime_id, attempts, next_retry_at, last_error, created_at, updated_at
|
||||
FROM anime_fetch_retry
|
||||
WHERE next_retry_at <= CURRENT_TIMESTAMP
|
||||
ORDER BY next_retry_at ASC
|
||||
LIMIT ?;
|
||||
|
||||
-- name: MarkAnimeFetchRetryFailed :exec
|
||||
UPDATE anime_fetch_retry
|
||||
SET attempts = attempts + 1,
|
||||
next_retry_at = datetime(CURRENT_TIMESTAMP, ?),
|
||||
last_error = ?,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE anime_id = ?;
|
||||
|
||||
-- name: DeleteAnimeFetchRetry :exec
|
||||
DELETE FROM anime_fetch_retry
|
||||
WHERE anime_id = ?;
|
||||
|
||||
-- name: CountPendingAnimeFetchRetries :one
|
||||
SELECT COUNT(*)
|
||||
FROM anime_fetch_retry
|
||||
WHERE next_retry_at <= CURRENT_TIMESTAMP;
|
||||
|
||||
@@ -11,6 +11,19 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const countPendingAnimeFetchRetries = `-- name: CountPendingAnimeFetchRetries :one
|
||||
SELECT COUNT(*)
|
||||
FROM anime_fetch_retry
|
||||
WHERE next_retry_at <= CURRENT_TIMESTAMP
|
||||
`
|
||||
|
||||
func (q *Queries) CountPendingAnimeFetchRetries(ctx context.Context) (int64, error) {
|
||||
row := q.db.QueryRowContext(ctx, countPendingAnimeFetchRetries)
|
||||
var count int64
|
||||
err := row.Scan(&count)
|
||||
return count, err
|
||||
}
|
||||
|
||||
const createSession = `-- name: CreateSession :one
|
||||
INSERT INTO session (id, user_id, expires_at)
|
||||
VALUES (?, ?, ?)
|
||||
@@ -66,6 +79,16 @@ func (q *Queries) CreateUser(ctx context.Context, arg CreateUserParams) (User, e
|
||||
return i, err
|
||||
}
|
||||
|
||||
const deleteAnimeFetchRetry = `-- name: DeleteAnimeFetchRetry :exec
|
||||
DELETE FROM anime_fetch_retry
|
||||
WHERE anime_id = ?
|
||||
`
|
||||
|
||||
func (q *Queries) DeleteAnimeFetchRetry(ctx context.Context, animeID int64) error {
|
||||
_, err := q.db.ExecContext(ctx, deleteAnimeFetchRetry, animeID)
|
||||
return err
|
||||
}
|
||||
|
||||
const deleteExpiredJikanCache = `-- name: DeleteExpiredJikanCache :exec
|
||||
DELETE FROM jikan_cache WHERE expires_at <= CURRENT_TIMESTAMP
|
||||
`
|
||||
@@ -108,6 +131,28 @@ func (q *Queries) DeleteWatchListEntry(ctx context.Context, arg DeleteWatchListE
|
||||
return err
|
||||
}
|
||||
|
||||
const enqueueAnimeFetchRetry = `-- name: EnqueueAnimeFetchRetry :exec
|
||||
INSERT INTO anime_fetch_retry (anime_id, attempts, next_retry_at, last_error, updated_at)
|
||||
VALUES (?, 0, CURRENT_TIMESTAMP, ?, CURRENT_TIMESTAMP)
|
||||
ON CONFLICT (anime_id) DO UPDATE SET
|
||||
next_retry_at = CASE
|
||||
WHEN anime_fetch_retry.next_retry_at > CURRENT_TIMESTAMP THEN anime_fetch_retry.next_retry_at
|
||||
ELSE CURRENT_TIMESTAMP
|
||||
END,
|
||||
last_error = excluded.last_error,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
`
|
||||
|
||||
type EnqueueAnimeFetchRetryParams struct {
|
||||
AnimeID int64 `json:"anime_id"`
|
||||
LastError string `json:"last_error"`
|
||||
}
|
||||
|
||||
func (q *Queries) EnqueueAnimeFetchRetry(ctx context.Context, arg EnqueueAnimeFetchRetryParams) error {
|
||||
_, err := q.db.ExecContext(ctx, enqueueAnimeFetchRetry, arg.AnimeID, arg.LastError)
|
||||
return err
|
||||
}
|
||||
|
||||
const getAnime = `-- name: GetAnime :one
|
||||
SELECT id, title_original, image_url, created_at, title_english, title_japanese, airing, status, relations_synced_at FROM anime WHERE id = ? LIMIT 1
|
||||
`
|
||||
@@ -180,6 +225,44 @@ func (q *Queries) GetAnimeNeedingRelationSync(ctx context.Context) ([]GetAnimeNe
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getDueAnimeFetchRetries = `-- name: GetDueAnimeFetchRetries :many
|
||||
SELECT anime_id, attempts, next_retry_at, last_error, created_at, updated_at
|
||||
FROM anime_fetch_retry
|
||||
WHERE next_retry_at <= CURRENT_TIMESTAMP
|
||||
ORDER BY next_retry_at ASC
|
||||
LIMIT ?
|
||||
`
|
||||
|
||||
func (q *Queries) GetDueAnimeFetchRetries(ctx context.Context, limit int64) ([]AnimeFetchRetry, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getDueAnimeFetchRetries, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []AnimeFetchRetry
|
||||
for rows.Next() {
|
||||
var i AnimeFetchRetry
|
||||
if err := rows.Scan(
|
||||
&i.AnimeID,
|
||||
&i.Attempts,
|
||||
&i.NextRetryAt,
|
||||
&i.LastError,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getJikanCache = `-- name: GetJikanCache :one
|
||||
SELECT data FROM jikan_cache
|
||||
WHERE key = ? AND expires_at > CURRENT_TIMESTAMP LIMIT 1
|
||||
@@ -192,6 +275,18 @@ func (q *Queries) GetJikanCache(ctx context.Context, key string) (string, error)
|
||||
return data, err
|
||||
}
|
||||
|
||||
const getJikanCacheStale = `-- name: GetJikanCacheStale :one
|
||||
SELECT data FROM jikan_cache
|
||||
WHERE key = ? LIMIT 1
|
||||
`
|
||||
|
||||
func (q *Queries) GetJikanCacheStale(ctx context.Context, key string) (string, error) {
|
||||
row := q.db.QueryRowContext(ctx, getJikanCacheStale, key)
|
||||
var data string
|
||||
err := row.Scan(&data)
|
||||
return data, err
|
||||
}
|
||||
|
||||
const getSession = `-- name: GetSession :one
|
||||
SELECT id, user_id, expires_at, created_at FROM session WHERE id = ? LIMIT 1
|
||||
`
|
||||
@@ -511,6 +606,26 @@ func (q *Queries) GetWatchingAnime(ctx context.Context, userID string) ([]GetWat
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const markAnimeFetchRetryFailed = `-- name: MarkAnimeFetchRetryFailed :exec
|
||||
UPDATE anime_fetch_retry
|
||||
SET attempts = attempts + 1,
|
||||
next_retry_at = datetime(CURRENT_TIMESTAMP, ?),
|
||||
last_error = ?,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE anime_id = ?
|
||||
`
|
||||
|
||||
type MarkAnimeFetchRetryFailedParams struct {
|
||||
Datetime interface{} `json:"datetime"`
|
||||
LastError string `json:"last_error"`
|
||||
AnimeID int64 `json:"anime_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) MarkAnimeFetchRetryFailed(ctx context.Context, arg MarkAnimeFetchRetryFailedParams) error {
|
||||
_, err := q.db.ExecContext(ctx, markAnimeFetchRetryFailed, arg.Datetime, arg.LastError, arg.AnimeID)
|
||||
return err
|
||||
}
|
||||
|
||||
const markRelationsSynced = `-- name: MarkRelationsSynced :exec
|
||||
UPDATE anime SET relations_synced_at = CURRENT_TIMESTAMP WHERE id = ?
|
||||
`
|
||||
|
||||
Reference in New Issue
Block a user