core: add jikan stale retry pipeline
This commit is contained in:
@@ -13,9 +13,16 @@ func (c *Client) GetAnimeByID(ctx context.Context, id int) (Anime, error) {
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
var stale Anime
|
||||
hasStale := c.getStaleCache(ctx, cacheKey, &stale)
|
||||
|
||||
var result AnimeResponse
|
||||
reqURL := fmt.Sprintf("%s/anime/%d/full", c.baseURL, id)
|
||||
if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil {
|
||||
if hasStale {
|
||||
return stale, nil
|
||||
}
|
||||
|
||||
return Anime{}, err
|
||||
}
|
||||
|
||||
|
||||
@@ -3,8 +3,12 @@ package jikan
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -15,18 +19,146 @@ type Client struct {
|
||||
httpClient *http.Client
|
||||
baseURL string
|
||||
db database.Querier
|
||||
retrySignal chan struct{}
|
||||
mu sync.Mutex
|
||||
lastReqTime time.Time
|
||||
}
|
||||
|
||||
func NewClient(db database.Querier) *Client {
|
||||
return &Client{
|
||||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||
baseURL: "https://api.jikan.moe/v4",
|
||||
db: db,
|
||||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||
baseURL: "https://api.jikan.moe/v4",
|
||||
db: db,
|
||||
retrySignal: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
type APIError struct {
|
||||
StatusCode int
|
||||
URL string
|
||||
}
|
||||
|
||||
func (e *APIError) Error() string {
|
||||
return fmt.Sprintf("jikan api returned status %d", e.StatusCode)
|
||||
}
|
||||
|
||||
func IsNotFoundError(err error) bool {
|
||||
var apiErr *APIError
|
||||
if errors.As(err, &apiErr) {
|
||||
return apiErr.StatusCode == http.StatusNotFound
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func IsRetryableError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var apiErr *APIError
|
||||
if errors.As(err, &apiErr) {
|
||||
return isRetryableStatus(apiErr.StatusCode)
|
||||
}
|
||||
|
||||
var netErr net.Error
|
||||
if errors.As(err, &netErr) {
|
||||
return true
|
||||
}
|
||||
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func isRetryableStatus(statusCode int) bool {
|
||||
if statusCode == http.StatusTooManyRequests {
|
||||
return true
|
||||
}
|
||||
|
||||
return statusCode >= 500 && statusCode <= 504
|
||||
}
|
||||
|
||||
func retryDelay(attempt int) time.Duration {
|
||||
base := 500 * time.Millisecond
|
||||
delay := base * time.Duration(1<<attempt)
|
||||
if delay > 8*time.Second {
|
||||
return 8 * time.Second
|
||||
}
|
||||
|
||||
return delay
|
||||
}
|
||||
|
||||
func parseRetryAfter(value string) (time.Duration, bool) {
|
||||
trimmed := strings.TrimSpace(value)
|
||||
if trimmed == "" {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
seconds, err := strconv.Atoi(trimmed)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
if seconds <= 0 {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
return time.Duration(seconds) * time.Second, true
|
||||
}
|
||||
|
||||
func waitForRetry(ctx context.Context, delay time.Duration) error {
|
||||
timer := time.NewTimer(delay)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err())
|
||||
}
|
||||
}
|
||||
|
||||
func truncateErrorMessage(message string) string {
|
||||
if len(message) <= 400 {
|
||||
return message
|
||||
}
|
||||
|
||||
return message[:400]
|
||||
}
|
||||
|
||||
func (c *Client) notifyRetryWorker() {
|
||||
select {
|
||||
case c.retrySignal <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) RetrySignal() <-chan struct{} {
|
||||
return c.retrySignal
|
||||
}
|
||||
|
||||
func (c *Client) EnqueueAnimeFetchRetry(parentCtx context.Context, animeID int, cause error) {
|
||||
if animeID <= 0 || !IsRetryableError(cause) {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err := c.db.EnqueueAnimeFetchRetry(ctx, database.EnqueueAnimeFetchRetryParams{
|
||||
AnimeID: int64(animeID),
|
||||
LastError: truncateErrorMessage(cause.Error()),
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.notifyRetryWorker()
|
||||
}
|
||||
|
||||
func (c *Client) waitRateLimit(ctx context.Context) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
@@ -65,6 +197,19 @@ func (c *Client) getCache(parentCtx context.Context, key string, out any) bool {
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (c *Client) getStaleCache(parentCtx context.Context, key string, out any) bool {
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
data, err := c.db.GetJikanCacheStale(ctx, key)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(data), out)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (c *Client) setCache(parentCtx context.Context, key string, data any, ttl time.Duration) {
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
||||
defer cancel()
|
||||
@@ -83,7 +228,7 @@ func (c *Client) setCache(parentCtx context.Context, key string, data any, ttl t
|
||||
|
||||
func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) error {
|
||||
maxRetries := 5
|
||||
for range maxRetries {
|
||||
for attempt := range maxRetries {
|
||||
if err := c.waitRateLimit(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -95,31 +240,58 @@ func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) err
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
if attempt < maxRetries-1 && IsRetryableError(err) {
|
||||
if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf("jikan api error: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode == 429 {
|
||||
resp.Body.Close()
|
||||
// Jikan rate limit is hit (usually the 60 requests/minute limit)
|
||||
// Wait for 2 seconds before retrying to let the bucket refill slightly
|
||||
timer := time.NewTimer(2 * time.Second)
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err())
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
apiErr := &APIError{StatusCode: resp.StatusCode, URL: urlStr}
|
||||
retryable := isRetryableStatus(resp.StatusCode)
|
||||
|
||||
retryAfter := time.Duration(0)
|
||||
if parsed, ok := parseRetryAfter(resp.Header.Get("Retry-After")); ok {
|
||||
retryAfter = parsed
|
||||
}
|
||||
|
||||
resp.Body.Close()
|
||||
return fmt.Errorf("jikan api returned status %d", resp.StatusCode)
|
||||
|
||||
if retryable && attempt < maxRetries-1 {
|
||||
delay := retryDelay(attempt)
|
||||
if retryAfter > delay {
|
||||
delay = retryAfter
|
||||
}
|
||||
|
||||
if retryErr := waitForRetry(ctx, delay); retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
return apiErr
|
||||
}
|
||||
|
||||
err = json.NewDecoder(resp.Body).Decode(out)
|
||||
resp.Body.Close()
|
||||
return err
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if attempt < maxRetries-1 {
|
||||
if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil {
|
||||
return retryErr
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to decode jikan response: %w", err)
|
||||
}
|
||||
|
||||
return fmt.Errorf("max retries exceeded for %s", urlStr)
|
||||
}
|
||||
|
||||
@@ -34,9 +34,19 @@ func (c *Client) GetRecommendations(ctx context.Context, animeID int, limit int)
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
var stale []Anime
|
||||
hasStale := c.getStaleCache(ctx, cacheKey, &stale)
|
||||
|
||||
var result RecommendationsResponse
|
||||
reqURL := fmt.Sprintf("%s/anime/%d/recommendations", c.baseURL, animeID)
|
||||
if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil {
|
||||
if hasStale {
|
||||
if limit > 0 && len(stale) > limit {
|
||||
return stale[:limit], nil
|
||||
}
|
||||
return stale, nil
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -114,6 +114,7 @@ func (c *Client) GetFullRelations(ctx context.Context, id int) ([]RelationEntry,
|
||||
|
||||
anime, err := c.GetAnimeByID(ctx, watchOrderEntry.ID)
|
||||
if err != nil {
|
||||
c.EnqueueAnimeFetchRetry(ctx, watchOrderEntry.ID, err)
|
||||
log.Printf("relations: skipping related anime %d for root %d: %v", watchOrderEntry.ID, id, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -21,10 +21,17 @@ func (c *Client) Search(ctx context.Context, query string, page int) (SearchResu
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
var stale SearchResult
|
||||
hasStale := c.getStaleCache(ctx, cacheKey, &stale)
|
||||
|
||||
var result SearchResponse
|
||||
reqURL := fmt.Sprintf("%s/anime?q=%s&limit=24&page=%d", c.baseURL, url.QueryEscape(query), page)
|
||||
|
||||
if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil {
|
||||
if hasStale {
|
||||
return stale, nil
|
||||
}
|
||||
|
||||
return SearchResult{}, err
|
||||
}
|
||||
|
||||
@@ -47,10 +54,17 @@ func (c *Client) GetTopAnime(ctx context.Context, page int) (TopAnimeResult, err
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
var stale TopAnimeResult
|
||||
hasStale := c.getStaleCache(ctx, cacheKey, &stale)
|
||||
|
||||
var result TopAnimeResponse
|
||||
reqURL := fmt.Sprintf("%s/top/anime?filter=bypopularity&limit=24&page=%d", c.baseURL, page)
|
||||
|
||||
if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil {
|
||||
if hasStale {
|
||||
return stale, nil
|
||||
}
|
||||
|
||||
return TopAnimeResult{}, err
|
||||
}
|
||||
|
||||
|
||||
@@ -21,9 +21,16 @@ func (c *Client) GetSchedule(ctx context.Context, day string) (ScheduleResult, e
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
var stale ScheduleResult
|
||||
hasStale := c.getStaleCache(ctx, cacheKey, &stale)
|
||||
|
||||
var result TopAnimeResponse
|
||||
reqURL := fmt.Sprintf("%s/schedules?filter=%s&sfw=true&limit=24", c.baseURL, day)
|
||||
if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil {
|
||||
if hasStale {
|
||||
return stale, nil
|
||||
}
|
||||
|
||||
return ScheduleResult{}, err
|
||||
}
|
||||
|
||||
@@ -61,9 +68,16 @@ func (c *Client) GetSeasonsNow(ctx context.Context, page int) (TopAnimeResult, e
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
var stale TopAnimeResult
|
||||
hasStale := c.getStaleCache(ctx, cacheKey, &stale)
|
||||
|
||||
var result TopAnimeResponse
|
||||
reqURL := fmt.Sprintf("%s/seasons/now?limit=24&page=%d", c.baseURL, page)
|
||||
if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil {
|
||||
if hasStale {
|
||||
return stale, nil
|
||||
}
|
||||
|
||||
return TopAnimeResult{}, err
|
||||
}
|
||||
|
||||
@@ -86,9 +100,16 @@ func (c *Client) GetSeasonsUpcoming(ctx context.Context, page int) (TopAnimeResu
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
var stale TopAnimeResult
|
||||
hasStale := c.getStaleCache(ctx, cacheKey, &stale)
|
||||
|
||||
var result TopAnimeResponse
|
||||
reqURL := fmt.Sprintf("%s/seasons/upcoming?limit=24&page=%d", c.baseURL, page)
|
||||
if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil {
|
||||
if hasStale {
|
||||
return stale, nil
|
||||
}
|
||||
|
||||
return TopAnimeResult{}, err
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package worker
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
@@ -25,10 +26,13 @@ func New(db *database.Queries, client *jikan.Client) *Worker {
|
||||
func (w *Worker) Start(ctx context.Context) {
|
||||
log.Println("Starting relations sync worker...")
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
retryTicker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
defer retryTicker.Stop()
|
||||
|
||||
// Run once immediately
|
||||
w.syncRelations(ctx)
|
||||
w.processAnimeFetchRetries(ctx)
|
||||
w.cleanupCache(ctx)
|
||||
|
||||
cleanupCounter := 0
|
||||
@@ -37,6 +41,10 @@ func (w *Worker) Start(ctx context.Context) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-w.client.RetrySignal():
|
||||
w.processAnimeFetchRetries(ctx)
|
||||
case <-retryTicker.C:
|
||||
w.processAnimeFetchRetries(ctx)
|
||||
case <-ticker.C:
|
||||
w.syncRelations(ctx)
|
||||
|
||||
@@ -50,6 +58,78 @@ func (w *Worker) Start(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func retryBackoff(attempts int64) string {
|
||||
if attempts < 1 {
|
||||
attempts = 1
|
||||
}
|
||||
|
||||
delay := time.Minute
|
||||
if attempts > 1 {
|
||||
shift := attempts - 1
|
||||
if shift > 6 {
|
||||
shift = 6
|
||||
}
|
||||
delay = time.Minute * time.Duration(1<<shift)
|
||||
}
|
||||
|
||||
if delay > 30*time.Minute {
|
||||
delay = 30 * time.Minute
|
||||
}
|
||||
|
||||
seconds := int(delay / time.Second)
|
||||
return fmt.Sprintf("+%d seconds", seconds)
|
||||
}
|
||||
|
||||
func (w *Worker) processAnimeFetchRetries(ctx context.Context) {
|
||||
pending, err := w.db.CountPendingAnimeFetchRetries(ctx)
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to count pending anime fetch retries: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if pending == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
retries, err := w.db.GetDueAnimeFetchRetries(ctx, 20)
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to load due anime fetch retries: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(retries) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, retry := range retries {
|
||||
_, err := w.client.GetAnimeByID(ctx, int(retry.AnimeID))
|
||||
if err != nil {
|
||||
if !jikan.IsRetryableError(err) {
|
||||
deleteErr := w.db.DeleteAnimeFetchRetry(ctx, retry.AnimeID)
|
||||
if deleteErr != nil {
|
||||
log.Printf("worker: failed deleting non-retryable anime retry %d: %v", retry.AnimeID, deleteErr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
updateErr := w.db.MarkAnimeFetchRetryFailed(ctx, database.MarkAnimeFetchRetryFailedParams{
|
||||
Datetime: retryBackoff(retry.Attempts + 1),
|
||||
LastError: err.Error(),
|
||||
AnimeID: retry.AnimeID,
|
||||
})
|
||||
if updateErr != nil {
|
||||
log.Printf("worker: failed updating anime fetch retry %d: %v", retry.AnimeID, updateErr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
deleteErr := w.db.DeleteAnimeFetchRetry(ctx, retry.AnimeID)
|
||||
if deleteErr != nil {
|
||||
log.Printf("worker: failed deleting successful anime retry %d: %v", retry.AnimeID, deleteErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) cleanupCache(ctx context.Context) {
|
||||
err := w.db.DeleteExpiredJikanCache(ctx)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user