package jikan import ( "context" "encoding/json" "errors" "fmt" "net" "net/http" "strconv" "strings" "sync" "time" "mal/internal/db" ) type Client struct { httpClient *http.Client baseURL string db db.Querier retrySignal chan struct{} // signals retry worker to process queued retries mu sync.Mutex lastReqTime time.Time // rate limiting: last request timestamp } func NewClient(queries *db.Queries) *Client { return &Client{ httpClient: &http.Client{ Timeout: 10 * time.Second, Transport: &http.Transport{ MaxIdleConns: 10, IdleConnTimeout: 30 * time.Second, DisableKeepAlives: false, TLSHandshakeTimeout: 5 * time.Second, }, }, baseURL: "https://api.jikan.moe/v4", db: queries, retrySignal: make(chan struct{}, 1), } } type APIError struct { StatusCode int URL string } func (e *APIError) Error() string { return fmt.Sprintf("jikan api returned status %d", e.StatusCode) } // IsRetryableError returns true if the error should trigger a retry. func IsRetryableError(err error) bool { if err == nil { return false } var apiErr *APIError if errors.As(err, &apiErr) { return isRetryableStatus(apiErr.StatusCode) } var netErr net.Error if errors.As(err, &netErr) { return true } if errors.Is(err, context.DeadlineExceeded) { return true } return false } func isRetryableStatus(statusCode int) bool { if statusCode == http.StatusTooManyRequests { return true } return statusCode >= 500 && statusCode <= 504 } // retryDelay returns exponential backoff delay: 500ms, 1s, 2s, 4s, 8s (capped). func retryDelay(attempt int) time.Duration { base := 500 * time.Millisecond delay := base * time.Duration(1< 8*time.Second { return 8 * time.Second } return delay } // parseRetryAfter parses Retry-After header value (seconds) into duration. func parseRetryAfter(value string) (time.Duration, bool) { trimmed := strings.TrimSpace(value) if trimmed == "" { return 0, false } seconds, err := strconv.Atoi(trimmed) if err != nil { return 0, false } if seconds <= 0 { return 0, false } return time.Duration(seconds) * time.Second, true } func waitForRetry(ctx context.Context, delay time.Duration) error { timer := time.NewTimer(delay) defer timer.Stop() select { case <-timer.C: return nil case <-ctx.Done(): return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()) } } func truncateErrorMessage(message string) string { if len(message) <= 400 { return message } return message[:400] } // notifyRetryWorker signals the retry worker, non-blocking. func (c *Client) notifyRetryWorker() { select { case c.retrySignal <- struct{}{}: default: } } // RetrySignal returns channel that signals when retries are enqueued. func (c *Client) RetrySignal() <-chan struct{} { return c.retrySignal } // EnqueueAnimeFetchRetry queues a failed anime fetch for later retry if the error is retryable. func (c *Client) EnqueueAnimeFetchRetry(parentCtx context.Context, animeID int, cause error) { if animeID <= 0 || !IsRetryableError(cause) { return } ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) defer cancel() err := c.db.EnqueueAnimeFetchRetry(ctx, db.EnqueueAnimeFetchRetryParams{ AnimeID: int64(animeID), LastError: truncateErrorMessage(cause.Error()), }) if err != nil { return } c.notifyRetryWorker() } // waitRateLimit enforces Jikan's 3 req/sec rate limit with 400ms spacing. func (c *Client) waitRateLimit(ctx context.Context) error { c.mu.Lock() defer c.mu.Unlock() now := time.Now() // Jikan has a 3 req/sec limit AND a 60 req/min limit. // 400ms base delay keeps us safely under the 3/sec limit. nextAllowed := c.lastReqTime.Add(400 * time.Millisecond) if now.Before(nextAllowed) { timer := time.NewTimer(nextAllowed.Sub(now)) defer timer.Stop() select { case <-timer.C: case <-ctx.Done(): return fmt.Errorf("request canceled while waiting for rate limit: %w", ctx.Err()) } c.lastReqTime = time.Now() } else { c.lastReqTime = now } return nil } // getCache retrieves cached data by key, returns true on cache hit. func (c *Client) getCache(parentCtx context.Context, key string, out any) bool { ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) defer cancel() data, err := c.db.GetJikanCache(ctx, key) if err != nil { return false } err = json.Unmarshal([]byte(data), out) return err == nil } // getStaleCache retrieves expired-but-available cache by key. func (c *Client) getStaleCache(parentCtx context.Context, key string, out any) bool { ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) defer cancel() data, err := c.db.GetJikanCacheStale(ctx, key) if err != nil { return false } err = json.Unmarshal([]byte(data), out) return err == nil } // setCache stores data in cache with specified TTL. func (c *Client) setCache(parentCtx context.Context, key string, data any, ttl time.Duration) { ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) defer cancel() bytes, err := json.Marshal(data) if err != nil { return } _ = c.db.SetJikanCache(ctx, db.SetJikanCacheParams{ Key: key, Data: string(bytes), ExpiresAt: time.Now().Add(ttl), }) } // isEmptyResult detects if response contains no meaningful data. func isEmptyResult(out any) bool { switch v := out.(type) { case *TopAnimeResponse: return len(v.Data) == 0 case *SearchResponse: return len(v.Data) == 0 case *AnimeResponse: return v.Data.MalID == 0 case *EpisodesResponse: return len(v.Data) == 0 } return false } // getWithCache fetches URL with cache-aside pattern: checks cache first, falls back to stale on error. func (c *Client) getWithCache(ctx context.Context, cacheKey string, ttl time.Duration, url string, out any) error { if c.getCache(ctx, cacheKey, out) { if !isEmptyResult(out) { return nil } } var stale any hasStale := c.getStaleCache(ctx, cacheKey, &stale) if err := c.fetchWithRetry(ctx, url, out); err != nil { if hasStale { staleBytes, marshalErr := json.Marshal(stale) if marshalErr == nil { unmarshalErr := json.Unmarshal(staleBytes, out) if unmarshalErr == nil && !isEmptyResult(out) { return nil } } if !errors.Is(err, context.Canceled) { } } return err } // Don't cache empty results to avoid caching failures if isEmptyResult(out) { return fmt.Errorf("jikan: empty response for %s", cacheKey) } c.setCache(ctx, cacheKey, out, ttl) return nil } // fetchWithRetry makes HTTP request with exponential backoff retry on transient failures. func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) error { maxRetries := 5 for attempt := range maxRetries { select { case <-ctx.Done(): return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()) default: } if err := c.waitRateLimit(ctx); err != nil { return err } req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil) if err != nil { return fmt.Errorf("failed to create jikan request: %w", err) } resp, err := c.httpClient.Do(req) if err != nil { if errors.Is(err, context.Canceled) { return fmt.Errorf("request canceled while retrying jikan request: %w", err) } if attempt < maxRetries-1 && IsRetryableError(err) { if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil { return retryErr } continue } return fmt.Errorf("jikan api error: %w", err) } if resp.StatusCode != http.StatusOK { apiErr := &APIError{StatusCode: resp.StatusCode, URL: urlStr} retryable := isRetryableStatus(resp.StatusCode) retryAfter := time.Duration(0) if parsed, ok := parseRetryAfter(resp.Header.Get("Retry-After")); ok { retryAfter = parsed } if retryable && attempt < maxRetries-1 { _ = resp.Body.Close() delay := max(retryAfter, retryDelay(attempt)) if retryErr := waitForRetry(ctx, delay); retryErr != nil { return retryErr } continue } err = json.NewDecoder(resp.Body).Decode(out) _ = resp.Body.Close() if err == nil { return nil } return apiErr } err = json.NewDecoder(resp.Body).Decode(out) _ = resp.Body.Close() if err == nil { return nil } if attempt < maxRetries-1 { if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil { return retryErr } continue } return fmt.Errorf("failed to decode jikan response: %w", err) } return fmt.Errorf("max retries exceeded for %s", urlStr) }