From 9e25745804ecca365892471d800e080610402ff2 Mon Sep 17 00:00:00 2001 From: mkelvers Date: Tue, 16 Jun 2026 00:50:12 +0200 Subject: [PATCH] refactor: split jikan client into transport/cache/rate subpackages --- integrations/jikan/cache/store.go | 86 ++++++ integrations/jikan/client.go | 395 ++----------------------- integrations/jikan/client_test.go | 2 +- integrations/jikan/rate/limiter.go | 42 +++ integrations/jikan/relations.go | 2 +- integrations/jikan/transport/client.go | 323 ++++++++++++++++++++ 6 files changed, 477 insertions(+), 373 deletions(-) create mode 100644 integrations/jikan/cache/store.go create mode 100644 integrations/jikan/rate/limiter.go create mode 100644 integrations/jikan/transport/client.go diff --git a/integrations/jikan/cache/store.go b/integrations/jikan/cache/store.go new file mode 100644 index 0000000..a90058f --- /dev/null +++ b/integrations/jikan/cache/store.go @@ -0,0 +1,86 @@ +package cache + +import ( + "context" + "encoding/json" + "time" + + "mal/internal/db" + "mal/internal/observability" +) + +type Store struct { + db db.Querier + metrics *observability.Metrics +} + +func NewStore(queries db.Querier, metrics *observability.Metrics) *Store { + return &Store{db: queries, metrics: metrics} +} + +// Get retrieves a fresh cached value by key. +func (s *Store) Get(parentCtx context.Context, key string, out any) bool { + ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) + defer cancel() + + data, err := s.db.GetJikanCache(ctx, key) + if err != nil { + s.metrics.ObserveCache("jikan", "miss") + return false + } + + if err := json.Unmarshal([]byte(data), out); err != nil { + s.metrics.ObserveCache("jikan", "miss") + return false + } + + s.metrics.ObserveCache("jikan", "hit") + return true +} + +// GetStale retrieves an expired-but-available cached value by key. +func (s *Store) GetStale(parentCtx context.Context, key string, out any) bool { + ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) + defer cancel() + + data, err := s.db.GetJikanCacheStale(ctx, key) + if err != nil { + s.metrics.ObserveCache("jikan_stale", "miss") + return false + } + + if err := json.Unmarshal([]byte(data), out); err != nil { + s.metrics.ObserveCache("jikan_stale", "miss") + return false + } + + s.metrics.ObserveCache("jikan_stale", "hit") + return true +} + +// Set stores data in cache with the specified TTL. +func (s *Store) Set(parentCtx context.Context, key string, data any, ttl time.Duration) { + ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) + defer cancel() + + bytes, err := json.Marshal(data) + if err != nil { + return + } + + err = s.db.SetJikanCache(ctx, db.SetJikanCacheParams{ + Key: key, + Data: string(bytes), + ExpiresAt: time.Now().Add(ttl), + }) + if err != nil { + observability.LogJSON( + observability.LogLevelError, + "jikan_cache_set", + "jikan", + "", + map[string]any{"cache_key": key}, + err, + ) + } +} diff --git a/integrations/jikan/client.go b/integrations/jikan/client.go index fd104f3..11db876 100644 --- a/integrations/jikan/client.go +++ b/integrations/jikan/client.go @@ -3,20 +3,17 @@ package jikan import ( "context" "encoding/json" - "errors" "fmt" - "net" - "net/http" "reflect" - "strconv" - "strings" "sync" "time" + jcache "mal/integrations/jikan/cache" + "mal/integrations/jikan/rate" + jtransport "mal/integrations/jikan/transport" "mal/internal/config" "mal/internal/db" "mal/internal/observability" - netutil "mal/pkg/net" "golang.org/x/sync/singleflight" ) @@ -24,15 +21,13 @@ import ( var traceEnabled bool type Client struct { - httpClient *http.Client baseURL string db db.Querier retrySignal chan struct{} // signals retry worker to process queued retries - mu sync.Mutex - lastReqTime time.Time // rate limiting: last request timestamp sf singleflight.Group refreshSem chan struct{} - metrics *observability.Metrics + cache *jcache.Store + fetcher *jtransport.Client // Random anime pool for DDoS-proof truly random "Surprise Me" randomPool []Anime @@ -42,107 +37,30 @@ type Client struct { const jikanSlowLogThreshold = 750 * time.Millisecond +type APIError = jtransport.APIError + func NewClient(cfg config.Config, queries *db.Queries, metrics *observability.Metrics) *Client { traceEnabled = cfg.JikanTrace + limiter := rate.NewLimiter(400 * time.Millisecond) return &Client{ - httpClient: &http.Client{ - Timeout: 10 * time.Second, - Transport: &http.Transport{ - MaxIdleConns: 10, - IdleConnTimeout: 30 * time.Second, - DisableKeepAlives: false, - TLSHandshakeTimeout: 5 * time.Second, - }, - }, baseURL: "https://api.jikan.moe/v4", db: queries, - metrics: metrics, retrySignal: make(chan struct{}, 1), refreshSem: make(chan struct{}, 4), - randomPool: make([]Anime, 0), + cache: jcache.NewStore(queries, metrics), + fetcher: jtransport.NewClient(jtransport.Config{ + HTTPClient: jtransport.NewHTTPClient(), + Limiter: limiter, + Metrics: metrics, + TraceEnabled: jikanTraceEnabled, + }), + randomPool: make([]Anime, 0), } } -type APIError struct { - StatusCode int - URL string -} - -func (e *APIError) Error() string { - return fmt.Sprintf("jikan api returned status %d", e.StatusCode) -} - // IsRetryableError returns true if the error should trigger a retry. func IsRetryableError(err error) bool { - if err == nil { - return false - } - - var apiErr *APIError - if errors.As(err, &apiErr) { - return isRetryableStatus(apiErr.StatusCode) - } - - var netErr net.Error - if errors.As(err, &netErr) { - return true - } - - if errors.Is(err, context.DeadlineExceeded) { - return true - } - - return false -} - -func isRetryableStatus(statusCode int) bool { - if statusCode == http.StatusTooManyRequests { - return true - } - - return statusCode >= 500 && statusCode <= 504 -} - -// retryDelay returns exponential backoff delay: 500ms, 1s, 2s, 4s, 8s (capped). -func retryDelay(attempt int) time.Duration { - base := 500 * time.Millisecond - delay := base * time.Duration(1< 8*time.Second { - return 8 * time.Second - } - - return delay -} - -// parseRetryAfter parses Retry-After header value (seconds) into duration. -func parseRetryAfter(value string) (time.Duration, bool) { - trimmed := strings.TrimSpace(value) - if trimmed == "" { - return 0, false - } - - seconds, err := strconv.Atoi(trimmed) - if err != nil { - return 0, false - } - - if seconds <= 0 { - return 0, false - } - - return time.Duration(seconds) * time.Second, true -} - -func waitForRetry(ctx context.Context, delay time.Duration) error { - timer := time.NewTimer(delay) - defer timer.Stop() - - select { - case <-timer.C: - return nil - case <-ctx.Done(): - return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()) - } + return jtransport.IsRetryableError(err) } func jikanTraceEnabled() bool { @@ -198,35 +116,6 @@ func logJikanCache(cacheKey string, source string, startedAt time.Time, err erro ) } -func logJikanUpstream(urlStr string, statusCode int, attempts int, startedAt time.Time, err error) { - duration := time.Since(startedAt) - if !jikanTraceEnabled() && err == nil && statusCode < http.StatusBadRequest && duration < jikanSlowLogThreshold { - return - } - - level := observability.LogLevelInfo - if err != nil || statusCode >= http.StatusInternalServerError { - level = observability.LogLevelError - } else if statusCode == http.StatusTooManyRequests || statusCode >= http.StatusBadRequest { - level = observability.LogLevelWarn - } - - observability.LogJSON( - level, - "jikan_upstream", - "jikan", - "", - map[string]any{ - "url": urlStr, - "endpoint": metricsEndpoint(urlStr), - "status": statusCode, - "attempts": attempts, - "duration_ms": float64(duration.Microseconds()) / 1000, - }, - err, - ) -} - func truncateErrorMessage(message string) string { if len(message) <= 400 { return message @@ -268,99 +157,20 @@ func (c *Client) EnqueueAnimeFetchRetry(parentCtx context.Context, animeID int, c.notifyRetryWorker() } -// waitRateLimit enforces Jikan's 3 req/sec rate limit with 400ms spacing. -func (c *Client) waitRateLimit(ctx context.Context) error { - c.mu.Lock() - defer c.mu.Unlock() - - now := time.Now() - // Jikan has a 3 req/sec limit AND a 60 req/min limit. - // 400ms base delay keeps us safely under the 3/sec limit. - nextAllowed := c.lastReqTime.Add(400 * time.Millisecond) - if now.Before(nextAllowed) { - timer := time.NewTimer(nextAllowed.Sub(now)) - defer timer.Stop() - - select { - case <-timer.C: - case <-ctx.Done(): - return fmt.Errorf("request canceled while waiting for rate limit: %w", ctx.Err()) - } - c.lastReqTime = time.Now() - } else { - c.lastReqTime = now - } - - return nil -} - -// getCache retrieves cached data by key, returns true on cache hit. func (c *Client) getCache(parentCtx context.Context, key string, out any) bool { - ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) - defer cancel() - - data, err := c.db.GetJikanCache(ctx, key) - if err != nil { - c.metrics.ObserveCache("jikan", "miss") - return false - } - - err = json.Unmarshal([]byte(data), out) - if err != nil { - c.metrics.ObserveCache("jikan", "miss") - return false - } - - c.metrics.ObserveCache("jikan", "hit") - return true + return c.cache.Get(parentCtx, key, out) } -// getStaleCache retrieves expired-but-available cache by key. func (c *Client) getStaleCache(parentCtx context.Context, key string, out any) bool { - ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) - defer cancel() - - data, err := c.db.GetJikanCacheStale(ctx, key) - if err != nil { - c.metrics.ObserveCache("jikan_stale", "miss") - return false - } - - err = json.Unmarshal([]byte(data), out) - if err != nil { - c.metrics.ObserveCache("jikan_stale", "miss") - return false - } - - c.metrics.ObserveCache("jikan_stale", "hit") - return true + return c.cache.GetStale(parentCtx, key, out) } -// setCache stores data in cache with specified TTL. func (c *Client) setCache(parentCtx context.Context, key string, data any, ttl time.Duration) { - ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) - defer cancel() + c.cache.Set(parentCtx, key, data, ttl) +} - bytes, err := json.Marshal(data) - if err != nil { - return - } - - err = c.db.SetJikanCache(ctx, db.SetJikanCacheParams{ - Key: key, - Data: string(bytes), - ExpiresAt: time.Now().Add(ttl), - }) - if err != nil { - observability.LogJSON( - observability.LogLevelError, - "jikan_cache_set", - "jikan", - "", - map[string]any{"cache_key": key}, - err, - ) - } +func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) error { + return c.fetcher.FetchWithRetry(ctx, urlStr, out) } // isEmptyResult detects if response contains no meaningful data. @@ -411,7 +221,7 @@ func (c *Client) refreshWithCache(ctx context.Context, cacheKey string, ttl time return nil, err } - // Don't cache empty results to avoid caching failures + // Don't cache empty results to avoid caching failures. if isEmptyResult(out) { return nil, fmt.Errorf("jikan: empty response for %s", cacheKey) } @@ -488,160 +298,3 @@ func (c *Client) getWithCache(ctx context.Context, cacheKey string, ttl time.Dur logJikanCache(cacheKey, "refresh", startedAt, nil) return nil } - -// fetchWithRetry makes HTTP request with exponential backoff retry on transient failures. -func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) error { - maxRetries := 5 - startedAt := time.Now() - attempts := 0 - endpoint := metricsEndpoint(urlStr) - logAndReturn := func(statusCode int, err error) error { - c.metrics.ObserveJikanRequest(endpoint, statusCode, time.Since(startedAt), err) - logJikanUpstream(urlStr, statusCode, attempts, startedAt, err) - return err - } - - for attempt := range maxRetries { - attempts = attempt + 1 - if err := c.prepareRetryAttempt(ctx); err != nil { - return logAndReturn(0, err) - } - - resp, err := c.doRequest(ctx, urlStr) - if err != nil { - retry, requestErr := handleRequestRetry(ctx, err, attempt, maxRetries) - if retry { - continue - } - - return logAndReturn(0, requestErr) - } - - statusCode, retry, err := handleResponseRetry(ctx, resp, urlStr, out, attempt, maxRetries) - if retry { - continue - } - - return logAndReturn(statusCode, err) - } - - return logAndReturn(0, fmt.Errorf("max retries exceeded for %s", urlStr)) -} - -func (c *Client) prepareRetryAttempt(ctx context.Context) error { - select { - case <-ctx.Done(): - return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()) - default: - } - - return c.waitRateLimit(ctx) -} - -func (c *Client) doRequest(ctx context.Context, urlStr string) (*http.Response, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil) - if err != nil { - return nil, fmt.Errorf("failed to create jikan request: %w", err) - } - - req.Header.Set("User-Agent", netutil.Generic) - resp, err := c.httpClient.Do(req) - if err != nil { - return nil, err - } - - return resp, nil -} - -func handleRequestRetry(ctx context.Context, err error, attempt int, maxRetries int) (bool, error) { - if errors.Is(err, context.Canceled) { - return false, fmt.Errorf("request canceled while retrying jikan request: %w", err) - } - - if attempt >= maxRetries-1 || !IsRetryableError(err) { - return false, fmt.Errorf("jikan api error: %w", err) - } - - if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil { - return false, retryErr - } - - return true, nil -} - -func handleResponseRetry(ctx context.Context, resp *http.Response, urlStr string, out any, attempt int, maxRetries int) (int, bool, error) { - if resp.StatusCode != http.StatusOK { - return handleStatusRetry(ctx, resp, urlStr, out, attempt, maxRetries) - } - - err := json.NewDecoder(resp.Body).Decode(out) - _ = resp.Body.Close() - if err == nil { - return resp.StatusCode, false, nil - } - - if attempt < maxRetries-1 { - if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil { - return resp.StatusCode, false, retryErr - } - return resp.StatusCode, true, nil - } - - return resp.StatusCode, false, fmt.Errorf("failed to decode jikan response: %w", err) -} - -func handleStatusRetry(ctx context.Context, resp *http.Response, urlStr string, out any, attempt int, maxRetries int) (int, bool, error) { - statusCode := resp.StatusCode - apiErr := &APIError{StatusCode: statusCode, URL: urlStr} - - retryAfter := time.Duration(0) - if parsed, ok := parseRetryAfter(resp.Header.Get("Retry-After")); ok { - retryAfter = parsed - } - - if isRetryableStatus(statusCode) && attempt < maxRetries-1 { - _ = resp.Body.Close() - if retryErr := waitForRetry(ctx, max(retryAfter, retryDelay(attempt))); retryErr != nil { - return statusCode, false, retryErr - } - return statusCode, true, nil - } - - // Best-effort decode (often useful for debugging), but still treat non-200 as error. - _ = json.NewDecoder(resp.Body).Decode(out) - _ = resp.Body.Close() - return statusCode, false, apiErr -} - -func metricsEndpoint(urlStr string) string { - trimmed := strings.TrimSpace(urlStr) - if trimmed == "" { - return "unknown" - } - - prefix := "https://api.jikan.moe/v4" - trimmed = strings.TrimPrefix(trimmed, prefix) - - if idx := strings.Index(trimmed, "?"); idx >= 0 { - trimmed = trimmed[:idx] - } - - parts := strings.Split(trimmed, "/") - out := make([]string, 0, len(parts)) - for _, part := range parts { - if part == "" { - continue - } - if _, err := strconv.Atoi(part); err == nil { - out = append(out, "{id}") - continue - } - out = append(out, part) - } - - if len(out) == 0 { - return "/" - } - - return "/" + strings.Join(out, "/") -} diff --git a/integrations/jikan/client_test.go b/integrations/jikan/client_test.go index 823a489..e25e72d 100644 --- a/integrations/jikan/client_test.go +++ b/integrations/jikan/client_test.go @@ -31,7 +31,7 @@ func TestGetWithCacheReturnsStaleAndRefreshesAsync(t *testing.T) { stale := TopAnimeResponse{Data: []Anime{{MalID: 1, Title: "stale"}}} insertCachedResponse(t, sqlDB, "top:1", stale, time.Now().Add(-time.Hour)) - client.httpClient = &http.Client{ + client.fetcher.HTTPClient = &http.Client{ Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { body := `{"data":[{"mal_id":2,"title":"fresh"}]}` return &http.Response{ diff --git a/integrations/jikan/rate/limiter.go b/integrations/jikan/rate/limiter.go new file mode 100644 index 0000000..148ba69 --- /dev/null +++ b/integrations/jikan/rate/limiter.go @@ -0,0 +1,42 @@ +package rate + +import ( + "context" + "fmt" + "sync" + "time" +) + +type Limiter struct { + mu sync.Mutex + lastReqTime time.Time + interval time.Duration +} + +func NewLimiter(interval time.Duration) *Limiter { + return &Limiter{interval: interval} +} + +// Wait enforces minimum spacing between upstream Jikan requests. +func (l *Limiter) Wait(ctx context.Context) error { + l.mu.Lock() + defer l.mu.Unlock() + + now := time.Now() + nextAllowed := l.lastReqTime.Add(l.interval) + if now.Before(nextAllowed) { + timer := time.NewTimer(nextAllowed.Sub(now)) + defer timer.Stop() + + select { + case <-timer.C: + case <-ctx.Done(): + return fmt.Errorf("request canceled while waiting for rate limit: %w", ctx.Err()) + } + l.lastReqTime = time.Now() + return nil + } + + l.lastReqTime = now + return nil +} diff --git a/integrations/jikan/relations.go b/integrations/jikan/relations.go index f5caed2..c5f60ff 100644 --- a/integrations/jikan/relations.go +++ b/integrations/jikan/relations.go @@ -84,7 +84,7 @@ func (c *Client) refreshWatchOrder(ctx context.Context, id int) (watchorder.Watc requestCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - result, err := watchorder.FetchWatchOrder(requestCtx, c.httpClient, watchOrderURL) + result, err := watchorder.FetchWatchOrder(requestCtx, c.fetcher.HTTPClient, watchOrderURL) if err != nil { var statusError *watchorder.HTTPStatusError if errors.As(err, &statusError) && statusError.StatusCode == http.StatusNotFound { diff --git a/integrations/jikan/transport/client.go b/integrations/jikan/transport/client.go new file mode 100644 index 0000000..e9c77b8 --- /dev/null +++ b/integrations/jikan/transport/client.go @@ -0,0 +1,323 @@ +package transport + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net" + "net/http" + "strconv" + "strings" + "time" + + "mal/integrations/jikan/rate" + "mal/internal/observability" + netutil "mal/pkg/net" +) + +const slowLogThreshold = 750 * time.Millisecond + +type Client struct { + HTTPClient *http.Client + Limiter *rate.Limiter + Metrics *observability.Metrics + TraceEnabled func() bool +} + +type Config struct { + HTTPClient *http.Client + Limiter *rate.Limiter + Metrics *observability.Metrics + TraceEnabled func() bool +} + +type APIError struct { + StatusCode int + URL string +} + +func (e *APIError) Error() string { + return fmt.Sprintf("jikan api returned status %d", e.StatusCode) +} + +func NewHTTPClient() *http.Client { + return &http.Client{ + Timeout: 10 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + DisableKeepAlives: false, + TLSHandshakeTimeout: 5 * time.Second, + }, + } +} + +func NewClient(cfg Config) *Client { + return &Client{ + HTTPClient: cfg.HTTPClient, + Limiter: cfg.Limiter, + Metrics: cfg.Metrics, + TraceEnabled: cfg.TraceEnabled, + } +} + +// IsRetryableError returns true if the error should trigger a retry. +func IsRetryableError(err error) bool { + if err == nil { + return false + } + + var apiErr *APIError + if errors.As(err, &apiErr) { + return isRetryableStatus(apiErr.StatusCode) + } + + var netErr net.Error + if errors.As(err, &netErr) { + return true + } + + if errors.Is(err, context.DeadlineExceeded) { + return true + } + + return false +} + +// FetchWithRetry makes an HTTP request with exponential backoff on transient failures. +func (c *Client) FetchWithRetry(ctx context.Context, urlStr string, out any) error { + maxRetries := 5 + startedAt := time.Now() + attempts := 0 + endpoint := metricsEndpoint(urlStr) + logAndReturn := func(statusCode int, err error) error { + c.Metrics.ObserveJikanRequest(endpoint, statusCode, time.Since(startedAt), err) + c.logUpstream(urlStr, statusCode, attempts, startedAt, err) + return err + } + + for attempt := range maxRetries { + attempts = attempt + 1 + if err := c.prepareRetryAttempt(ctx); err != nil { + return logAndReturn(0, err) + } + + resp, err := c.doRequest(ctx, urlStr) + if err != nil { + retry, requestErr := handleRequestRetry(ctx, err, attempt, maxRetries) + if retry { + continue + } + + return logAndReturn(0, requestErr) + } + + statusCode, retry, err := handleResponseRetry(ctx, resp, urlStr, out, attempt, maxRetries) + if retry { + continue + } + + return logAndReturn(statusCode, err) + } + + return logAndReturn(0, fmt.Errorf("max retries exceeded for %s", urlStr)) +} + +func (c *Client) prepareRetryAttempt(ctx context.Context) error { + select { + case <-ctx.Done(): + return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()) + default: + } + + return c.Limiter.Wait(ctx) +} + +func (c *Client) doRequest(ctx context.Context, urlStr string) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil) + if err != nil { + return nil, fmt.Errorf("failed to create jikan request: %w", err) + } + + req.Header.Set("User-Agent", netutil.Generic) + resp, err := c.HTTPClient.Do(req) + if err != nil { + return nil, err + } + + return resp, nil +} + +func handleRequestRetry(ctx context.Context, err error, attempt int, maxRetries int) (bool, error) { + if errors.Is(err, context.Canceled) { + return false, fmt.Errorf("request canceled while retrying jikan request: %w", err) + } + + if attempt >= maxRetries-1 || !IsRetryableError(err) { + return false, fmt.Errorf("jikan api error: %w", err) + } + + if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil { + return false, retryErr + } + + return true, nil +} + +func handleResponseRetry(ctx context.Context, resp *http.Response, urlStr string, out any, attempt int, maxRetries int) (int, bool, error) { + if resp.StatusCode != http.StatusOK { + return handleStatusRetry(ctx, resp, urlStr, out, attempt, maxRetries) + } + + err := json.NewDecoder(resp.Body).Decode(out) + _ = resp.Body.Close() + if err == nil { + return resp.StatusCode, false, nil + } + + if attempt < maxRetries-1 { + if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil { + return resp.StatusCode, false, retryErr + } + return resp.StatusCode, true, nil + } + + return resp.StatusCode, false, fmt.Errorf("failed to decode jikan response: %w", err) +} + +func handleStatusRetry(ctx context.Context, resp *http.Response, urlStr string, out any, attempt int, maxRetries int) (int, bool, error) { + statusCode := resp.StatusCode + apiErr := &APIError{StatusCode: statusCode, URL: urlStr} + + retryAfter := time.Duration(0) + if parsed, ok := parseRetryAfter(resp.Header.Get("Retry-After")); ok { + retryAfter = parsed + } + + if isRetryableStatus(statusCode) && attempt < maxRetries-1 { + _ = resp.Body.Close() + if retryErr := waitForRetry(ctx, max(retryAfter, retryDelay(attempt))); retryErr != nil { + return statusCode, false, retryErr + } + return statusCode, true, nil + } + + // Best-effort decode (often useful for debugging), but still treat non-200 as error. + _ = json.NewDecoder(resp.Body).Decode(out) + _ = resp.Body.Close() + return statusCode, false, apiErr +} + +func isRetryableStatus(statusCode int) bool { + if statusCode == http.StatusTooManyRequests { + return true + } + + return statusCode >= 500 && statusCode <= 504 +} + +// retryDelay returns exponential backoff delay: 500ms, 1s, 2s, 4s, 8s (capped). +func retryDelay(attempt int) time.Duration { + base := 500 * time.Millisecond + delay := base * time.Duration(1< 8*time.Second { + return 8 * time.Second + } + + return delay +} + +// parseRetryAfter parses Retry-After header value (seconds) into duration. +func parseRetryAfter(value string) (time.Duration, bool) { + trimmed := strings.TrimSpace(value) + if trimmed == "" { + return 0, false + } + + seconds, err := strconv.Atoi(trimmed) + if err != nil { + return 0, false + } + + if seconds <= 0 { + return 0, false + } + + return time.Duration(seconds) * time.Second, true +} + +func waitForRetry(ctx context.Context, delay time.Duration) error { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-timer.C: + return nil + case <-ctx.Done(): + return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()) + } +} + +func (c *Client) logUpstream(urlStr string, statusCode int, attempts int, startedAt time.Time, err error) { + duration := time.Since(startedAt) + traceEnabled := c.TraceEnabled != nil && c.TraceEnabled() + if !traceEnabled && err == nil && statusCode < http.StatusBadRequest && duration < slowLogThreshold { + return + } + + level := observability.LogLevelInfo + if err != nil || statusCode >= http.StatusInternalServerError { + level = observability.LogLevelError + } else if statusCode == http.StatusTooManyRequests || statusCode >= http.StatusBadRequest { + level = observability.LogLevelWarn + } + + observability.LogJSON( + level, + "jikan_upstream", + "jikan", + "", + map[string]any{ + "url": urlStr, + "endpoint": metricsEndpoint(urlStr), + "status": statusCode, + "attempts": attempts, + "duration_ms": float64(duration.Microseconds()) / 1000, + }, + err, + ) +} + +func metricsEndpoint(urlStr string) string { + trimmed := strings.TrimSpace(urlStr) + if trimmed == "" { + return "unknown" + } + + prefix := "https://api.jikan.moe/v4" + trimmed = strings.TrimPrefix(trimmed, prefix) + + if idx := strings.Index(trimmed, "?"); idx >= 0 { + trimmed = trimmed[:idx] + } + + parts := strings.Split(trimmed, "/") + out := make([]string, 0, len(parts)) + for _, part := range parts { + if part == "" { + continue + } + if _, err := strconv.Atoi(part); err == nil { + out = append(out, "{id}") + continue + } + out = append(out, part) + } + + if len(out) == 0 { + return "/" + } + + return "/" + strings.Join(out, "/") +}