From eda055fea3b2ea54611cba290280433a4713c69e Mon Sep 17 00:00:00 2001 From: mkelvers Date: Sun, 12 Apr 2026 14:53:32 +0200 Subject: [PATCH] core: add jikan stale retry pipeline --- internal/jikan/anime.go | 7 + internal/jikan/client.go | 212 +++++++++++++++++++++++++++--- internal/jikan/recommendations.go | 10 ++ internal/jikan/relations.go | 1 + internal/jikan/search.go | 14 ++ internal/jikan/seasons.go | 21 +++ internal/worker/relations.go | 80 +++++++++++ 7 files changed, 325 insertions(+), 20 deletions(-) diff --git a/internal/jikan/anime.go b/internal/jikan/anime.go index 34c5f6a..f794496 100644 --- a/internal/jikan/anime.go +++ b/internal/jikan/anime.go @@ -13,9 +13,16 @@ func (c *Client) GetAnimeByID(ctx context.Context, id int) (Anime, error) { return cached, nil } + var stale Anime + hasStale := c.getStaleCache(ctx, cacheKey, &stale) + var result AnimeResponse reqURL := fmt.Sprintf("%s/anime/%d/full", c.baseURL, id) if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil { + if hasStale { + return stale, nil + } + return Anime{}, err } diff --git a/internal/jikan/client.go b/internal/jikan/client.go index f638a0e..e1773fa 100644 --- a/internal/jikan/client.go +++ b/internal/jikan/client.go @@ -3,8 +3,12 @@ package jikan import ( "context" "encoding/json" + "errors" "fmt" + "net" "net/http" + "strconv" + "strings" "sync" "time" @@ -15,18 +19,146 @@ type Client struct { httpClient *http.Client baseURL string db database.Querier + retrySignal chan struct{} mu sync.Mutex lastReqTime time.Time } func NewClient(db database.Querier) *Client { return &Client{ - httpClient: &http.Client{Timeout: 10 * time.Second}, - baseURL: "https://api.jikan.moe/v4", - db: db, + httpClient: &http.Client{Timeout: 10 * time.Second}, + baseURL: "https://api.jikan.moe/v4", + db: db, + retrySignal: make(chan struct{}, 1), } } +type APIError struct { + StatusCode int + URL string +} + +func (e *APIError) Error() string { + return fmt.Sprintf("jikan api returned status %d", e.StatusCode) +} + +func IsNotFoundError(err error) bool { + var apiErr *APIError + if errors.As(err, &apiErr) { + return apiErr.StatusCode == http.StatusNotFound + } + + return false +} + +func IsRetryableError(err error) bool { + if err == nil { + return false + } + + var apiErr *APIError + if errors.As(err, &apiErr) { + return isRetryableStatus(apiErr.StatusCode) + } + + var netErr net.Error + if errors.As(err, &netErr) { + return true + } + + if errors.Is(err, context.DeadlineExceeded) { + return true + } + + return false +} + +func isRetryableStatus(statusCode int) bool { + if statusCode == http.StatusTooManyRequests { + return true + } + + return statusCode >= 500 && statusCode <= 504 +} + +func retryDelay(attempt int) time.Duration { + base := 500 * time.Millisecond + delay := base * time.Duration(1< 8*time.Second { + return 8 * time.Second + } + + return delay +} + +func parseRetryAfter(value string) (time.Duration, bool) { + trimmed := strings.TrimSpace(value) + if trimmed == "" { + return 0, false + } + + seconds, err := strconv.Atoi(trimmed) + if err != nil { + return 0, false + } + + if seconds <= 0 { + return 0, false + } + + return time.Duration(seconds) * time.Second, true +} + +func waitForRetry(ctx context.Context, delay time.Duration) error { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-timer.C: + return nil + case <-ctx.Done(): + return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()) + } +} + +func truncateErrorMessage(message string) string { + if len(message) <= 400 { + return message + } + + return message[:400] +} + +func (c *Client) notifyRetryWorker() { + select { + case c.retrySignal <- struct{}{}: + default: + } +} + +func (c *Client) RetrySignal() <-chan struct{} { + return c.retrySignal +} + +func (c *Client) EnqueueAnimeFetchRetry(parentCtx context.Context, animeID int, cause error) { + if animeID <= 0 || !IsRetryableError(cause) { + return + } + + ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) + defer cancel() + + err := c.db.EnqueueAnimeFetchRetry(ctx, database.EnqueueAnimeFetchRetryParams{ + AnimeID: int64(animeID), + LastError: truncateErrorMessage(cause.Error()), + }) + if err != nil { + return + } + + c.notifyRetryWorker() +} + func (c *Client) waitRateLimit(ctx context.Context) error { c.mu.Lock() defer c.mu.Unlock() @@ -65,6 +197,19 @@ func (c *Client) getCache(parentCtx context.Context, key string, out any) bool { return err == nil } +func (c *Client) getStaleCache(parentCtx context.Context, key string, out any) bool { + ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) + defer cancel() + + data, err := c.db.GetJikanCacheStale(ctx, key) + if err != nil { + return false + } + + err = json.Unmarshal([]byte(data), out) + return err == nil +} + func (c *Client) setCache(parentCtx context.Context, key string, data any, ttl time.Duration) { ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) defer cancel() @@ -83,7 +228,7 @@ func (c *Client) setCache(parentCtx context.Context, key string, data any, ttl t func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) error { maxRetries := 5 - for range maxRetries { + for attempt := range maxRetries { if err := c.waitRateLimit(ctx); err != nil { return err } @@ -95,31 +240,58 @@ func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) err resp, err := c.httpClient.Do(req) if err != nil { + if attempt < maxRetries-1 && IsRetryableError(err) { + if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil { + return retryErr + } + continue + } + return fmt.Errorf("jikan api error: %w", err) } - if resp.StatusCode == 429 { - resp.Body.Close() - // Jikan rate limit is hit (usually the 60 requests/minute limit) - // Wait for 2 seconds before retrying to let the bucket refill slightly - timer := time.NewTimer(2 * time.Second) - select { - case <-timer.C: - case <-ctx.Done(): - timer.Stop() - return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()) - } - continue - } - if resp.StatusCode != http.StatusOK { + apiErr := &APIError{StatusCode: resp.StatusCode, URL: urlStr} + retryable := isRetryableStatus(resp.StatusCode) + + retryAfter := time.Duration(0) + if parsed, ok := parseRetryAfter(resp.Header.Get("Retry-After")); ok { + retryAfter = parsed + } + resp.Body.Close() - return fmt.Errorf("jikan api returned status %d", resp.StatusCode) + + if retryable && attempt < maxRetries-1 { + delay := retryDelay(attempt) + if retryAfter > delay { + delay = retryAfter + } + + if retryErr := waitForRetry(ctx, delay); retryErr != nil { + return retryErr + } + + continue + } + + return apiErr } err = json.NewDecoder(resp.Body).Decode(out) resp.Body.Close() - return err + if err == nil { + return nil + } + + if attempt < maxRetries-1 { + if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil { + return retryErr + } + continue + } + + return fmt.Errorf("failed to decode jikan response: %w", err) } + return fmt.Errorf("max retries exceeded for %s", urlStr) } diff --git a/internal/jikan/recommendations.go b/internal/jikan/recommendations.go index 26004ac..d17f709 100644 --- a/internal/jikan/recommendations.go +++ b/internal/jikan/recommendations.go @@ -34,9 +34,19 @@ func (c *Client) GetRecommendations(ctx context.Context, animeID int, limit int) return cached, nil } + var stale []Anime + hasStale := c.getStaleCache(ctx, cacheKey, &stale) + var result RecommendationsResponse reqURL := fmt.Sprintf("%s/anime/%d/recommendations", c.baseURL, animeID) if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil { + if hasStale { + if limit > 0 && len(stale) > limit { + return stale[:limit], nil + } + return stale, nil + } + return nil, err } diff --git a/internal/jikan/relations.go b/internal/jikan/relations.go index 413075e..4af4706 100644 --- a/internal/jikan/relations.go +++ b/internal/jikan/relations.go @@ -114,6 +114,7 @@ func (c *Client) GetFullRelations(ctx context.Context, id int) ([]RelationEntry, anime, err := c.GetAnimeByID(ctx, watchOrderEntry.ID) if err != nil { + c.EnqueueAnimeFetchRetry(ctx, watchOrderEntry.ID, err) log.Printf("relations: skipping related anime %d for root %d: %v", watchOrderEntry.ID, id, err) continue } diff --git a/internal/jikan/search.go b/internal/jikan/search.go index 74a43fe..667db8f 100644 --- a/internal/jikan/search.go +++ b/internal/jikan/search.go @@ -21,10 +21,17 @@ func (c *Client) Search(ctx context.Context, query string, page int) (SearchResu return cached, nil } + var stale SearchResult + hasStale := c.getStaleCache(ctx, cacheKey, &stale) + var result SearchResponse reqURL := fmt.Sprintf("%s/anime?q=%s&limit=24&page=%d", c.baseURL, url.QueryEscape(query), page) if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil { + if hasStale { + return stale, nil + } + return SearchResult{}, err } @@ -47,10 +54,17 @@ func (c *Client) GetTopAnime(ctx context.Context, page int) (TopAnimeResult, err return cached, nil } + var stale TopAnimeResult + hasStale := c.getStaleCache(ctx, cacheKey, &stale) + var result TopAnimeResponse reqURL := fmt.Sprintf("%s/top/anime?filter=bypopularity&limit=24&page=%d", c.baseURL, page) if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil { + if hasStale { + return stale, nil + } + return TopAnimeResult{}, err } diff --git a/internal/jikan/seasons.go b/internal/jikan/seasons.go index dcaf75e..6f6003a 100644 --- a/internal/jikan/seasons.go +++ b/internal/jikan/seasons.go @@ -21,9 +21,16 @@ func (c *Client) GetSchedule(ctx context.Context, day string) (ScheduleResult, e return cached, nil } + var stale ScheduleResult + hasStale := c.getStaleCache(ctx, cacheKey, &stale) + var result TopAnimeResponse reqURL := fmt.Sprintf("%s/schedules?filter=%s&sfw=true&limit=24", c.baseURL, day) if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil { + if hasStale { + return stale, nil + } + return ScheduleResult{}, err } @@ -61,9 +68,16 @@ func (c *Client) GetSeasonsNow(ctx context.Context, page int) (TopAnimeResult, e return cached, nil } + var stale TopAnimeResult + hasStale := c.getStaleCache(ctx, cacheKey, &stale) + var result TopAnimeResponse reqURL := fmt.Sprintf("%s/seasons/now?limit=24&page=%d", c.baseURL, page) if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil { + if hasStale { + return stale, nil + } + return TopAnimeResult{}, err } @@ -86,9 +100,16 @@ func (c *Client) GetSeasonsUpcoming(ctx context.Context, page int) (TopAnimeResu return cached, nil } + var stale TopAnimeResult + hasStale := c.getStaleCache(ctx, cacheKey, &stale) + var result TopAnimeResponse reqURL := fmt.Sprintf("%s/seasons/upcoming?limit=24&page=%d", c.baseURL, page) if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil { + if hasStale { + return stale, nil + } + return TopAnimeResult{}, err } diff --git a/internal/worker/relations.go b/internal/worker/relations.go index 5741da4..b620a5a 100644 --- a/internal/worker/relations.go +++ b/internal/worker/relations.go @@ -3,6 +3,7 @@ package worker import ( "context" "database/sql" + "fmt" "log" "time" @@ -25,10 +26,13 @@ func New(db *database.Queries, client *jikan.Client) *Worker { func (w *Worker) Start(ctx context.Context) { log.Println("Starting relations sync worker...") ticker := time.NewTicker(1 * time.Minute) + retryTicker := time.NewTicker(30 * time.Second) defer ticker.Stop() + defer retryTicker.Stop() // Run once immediately w.syncRelations(ctx) + w.processAnimeFetchRetries(ctx) w.cleanupCache(ctx) cleanupCounter := 0 @@ -37,6 +41,10 @@ func (w *Worker) Start(ctx context.Context) { select { case <-ctx.Done(): return + case <-w.client.RetrySignal(): + w.processAnimeFetchRetries(ctx) + case <-retryTicker.C: + w.processAnimeFetchRetries(ctx) case <-ticker.C: w.syncRelations(ctx) @@ -50,6 +58,78 @@ func (w *Worker) Start(ctx context.Context) { } } +func retryBackoff(attempts int64) string { + if attempts < 1 { + attempts = 1 + } + + delay := time.Minute + if attempts > 1 { + shift := attempts - 1 + if shift > 6 { + shift = 6 + } + delay = time.Minute * time.Duration(1< 30*time.Minute { + delay = 30 * time.Minute + } + + seconds := int(delay / time.Second) + return fmt.Sprintf("+%d seconds", seconds) +} + +func (w *Worker) processAnimeFetchRetries(ctx context.Context) { + pending, err := w.db.CountPendingAnimeFetchRetries(ctx) + if err != nil { + log.Printf("worker: failed to count pending anime fetch retries: %v", err) + return + } + + if pending == 0 { + return + } + + retries, err := w.db.GetDueAnimeFetchRetries(ctx, 20) + if err != nil { + log.Printf("worker: failed to load due anime fetch retries: %v", err) + return + } + + if len(retries) == 0 { + return + } + + for _, retry := range retries { + _, err := w.client.GetAnimeByID(ctx, int(retry.AnimeID)) + if err != nil { + if !jikan.IsRetryableError(err) { + deleteErr := w.db.DeleteAnimeFetchRetry(ctx, retry.AnimeID) + if deleteErr != nil { + log.Printf("worker: failed deleting non-retryable anime retry %d: %v", retry.AnimeID, deleteErr) + } + continue + } + + updateErr := w.db.MarkAnimeFetchRetryFailed(ctx, database.MarkAnimeFetchRetryFailedParams{ + Datetime: retryBackoff(retry.Attempts + 1), + LastError: err.Error(), + AnimeID: retry.AnimeID, + }) + if updateErr != nil { + log.Printf("worker: failed updating anime fetch retry %d: %v", retry.AnimeID, updateErr) + } + continue + } + + deleteErr := w.db.DeleteAnimeFetchRetry(ctx, retry.AnimeID) + if deleteErr != nil { + log.Printf("worker: failed deleting successful anime retry %d: %v", retry.AnimeID, deleteErr) + } + } +} + func (w *Worker) cleanupCache(ctx context.Context) { err := w.db.DeleteExpiredJikanCache(ctx) if err != nil {