refactor: split jikan client into transport/cache/rate subpackages
This commit is contained in:
86
integrations/jikan/cache/store.go
vendored
Normal file
86
integrations/jikan/cache/store.go
vendored
Normal file
@@ -0,0 +1,86 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"mal/internal/db"
|
||||
"mal/internal/observability"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
db db.Querier
|
||||
metrics *observability.Metrics
|
||||
}
|
||||
|
||||
func NewStore(queries db.Querier, metrics *observability.Metrics) *Store {
|
||||
return &Store{db: queries, metrics: metrics}
|
||||
}
|
||||
|
||||
// Get retrieves a fresh cached value by key.
|
||||
func (s *Store) Get(parentCtx context.Context, key string, out any) bool {
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
data, err := s.db.GetJikanCache(ctx, key)
|
||||
if err != nil {
|
||||
s.metrics.ObserveCache("jikan", "miss")
|
||||
return false
|
||||
}
|
||||
|
||||
if err := json.Unmarshal([]byte(data), out); err != nil {
|
||||
s.metrics.ObserveCache("jikan", "miss")
|
||||
return false
|
||||
}
|
||||
|
||||
s.metrics.ObserveCache("jikan", "hit")
|
||||
return true
|
||||
}
|
||||
|
||||
// GetStale retrieves an expired-but-available cached value by key.
|
||||
func (s *Store) GetStale(parentCtx context.Context, key string, out any) bool {
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
data, err := s.db.GetJikanCacheStale(ctx, key)
|
||||
if err != nil {
|
||||
s.metrics.ObserveCache("jikan_stale", "miss")
|
||||
return false
|
||||
}
|
||||
|
||||
if err := json.Unmarshal([]byte(data), out); err != nil {
|
||||
s.metrics.ObserveCache("jikan_stale", "miss")
|
||||
return false
|
||||
}
|
||||
|
||||
s.metrics.ObserveCache("jikan_stale", "hit")
|
||||
return true
|
||||
}
|
||||
|
||||
// Set stores data in cache with the specified TTL.
|
||||
func (s *Store) Set(parentCtx context.Context, key string, data any, ttl time.Duration) {
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
bytes, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = s.db.SetJikanCache(ctx, db.SetJikanCacheParams{
|
||||
Key: key,
|
||||
Data: string(bytes),
|
||||
ExpiresAt: time.Now().Add(ttl),
|
||||
})
|
||||
if err != nil {
|
||||
observability.LogJSON(
|
||||
observability.LogLevelError,
|
||||
"jikan_cache_set",
|
||||
"jikan",
|
||||
"",
|
||||
map[string]any{"cache_key": key},
|
||||
err,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -3,20 +3,17 @@ package jikan
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
jcache "mal/integrations/jikan/cache"
|
||||
"mal/integrations/jikan/rate"
|
||||
jtransport "mal/integrations/jikan/transport"
|
||||
"mal/internal/config"
|
||||
"mal/internal/db"
|
||||
"mal/internal/observability"
|
||||
netutil "mal/pkg/net"
|
||||
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
@@ -24,15 +21,13 @@ import (
|
||||
var traceEnabled bool
|
||||
|
||||
type Client struct {
|
||||
httpClient *http.Client
|
||||
baseURL string
|
||||
db db.Querier
|
||||
retrySignal chan struct{} // signals retry worker to process queued retries
|
||||
mu sync.Mutex
|
||||
lastReqTime time.Time // rate limiting: last request timestamp
|
||||
sf singleflight.Group
|
||||
refreshSem chan struct{}
|
||||
metrics *observability.Metrics
|
||||
cache *jcache.Store
|
||||
fetcher *jtransport.Client
|
||||
|
||||
// Random anime pool for DDoS-proof truly random "Surprise Me"
|
||||
randomPool []Anime
|
||||
@@ -42,107 +37,30 @@ type Client struct {
|
||||
|
||||
const jikanSlowLogThreshold = 750 * time.Millisecond
|
||||
|
||||
type APIError = jtransport.APIError
|
||||
|
||||
func NewClient(cfg config.Config, queries *db.Queries, metrics *observability.Metrics) *Client {
|
||||
traceEnabled = cfg.JikanTrace
|
||||
limiter := rate.NewLimiter(400 * time.Millisecond)
|
||||
return &Client{
|
||||
httpClient: &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
Transport: &http.Transport{
|
||||
MaxIdleConns: 10,
|
||||
IdleConnTimeout: 30 * time.Second,
|
||||
DisableKeepAlives: false,
|
||||
TLSHandshakeTimeout: 5 * time.Second,
|
||||
},
|
||||
},
|
||||
baseURL: "https://api.jikan.moe/v4",
|
||||
db: queries,
|
||||
metrics: metrics,
|
||||
retrySignal: make(chan struct{}, 1),
|
||||
refreshSem: make(chan struct{}, 4),
|
||||
randomPool: make([]Anime, 0),
|
||||
cache: jcache.NewStore(queries, metrics),
|
||||
fetcher: jtransport.NewClient(jtransport.Config{
|
||||
HTTPClient: jtransport.NewHTTPClient(),
|
||||
Limiter: limiter,
|
||||
Metrics: metrics,
|
||||
TraceEnabled: jikanTraceEnabled,
|
||||
}),
|
||||
randomPool: make([]Anime, 0),
|
||||
}
|
||||
}
|
||||
|
||||
type APIError struct {
|
||||
StatusCode int
|
||||
URL string
|
||||
}
|
||||
|
||||
func (e *APIError) Error() string {
|
||||
return fmt.Sprintf("jikan api returned status %d", e.StatusCode)
|
||||
}
|
||||
|
||||
// IsRetryableError returns true if the error should trigger a retry.
|
||||
func IsRetryableError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var apiErr *APIError
|
||||
if errors.As(err, &apiErr) {
|
||||
return isRetryableStatus(apiErr.StatusCode)
|
||||
}
|
||||
|
||||
var netErr net.Error
|
||||
if errors.As(err, &netErr) {
|
||||
return true
|
||||
}
|
||||
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func isRetryableStatus(statusCode int) bool {
|
||||
if statusCode == http.StatusTooManyRequests {
|
||||
return true
|
||||
}
|
||||
|
||||
return statusCode >= 500 && statusCode <= 504
|
||||
}
|
||||
|
||||
// retryDelay returns exponential backoff delay: 500ms, 1s, 2s, 4s, 8s (capped).
|
||||
func retryDelay(attempt int) time.Duration {
|
||||
base := 500 * time.Millisecond
|
||||
delay := base * time.Duration(1<<attempt)
|
||||
if delay > 8*time.Second {
|
||||
return 8 * time.Second
|
||||
}
|
||||
|
||||
return delay
|
||||
}
|
||||
|
||||
// parseRetryAfter parses Retry-After header value (seconds) into duration.
|
||||
func parseRetryAfter(value string) (time.Duration, bool) {
|
||||
trimmed := strings.TrimSpace(value)
|
||||
if trimmed == "" {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
seconds, err := strconv.Atoi(trimmed)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
if seconds <= 0 {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
return time.Duration(seconds) * time.Second, true
|
||||
}
|
||||
|
||||
func waitForRetry(ctx context.Context, delay time.Duration) error {
|
||||
timer := time.NewTimer(delay)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err())
|
||||
}
|
||||
return jtransport.IsRetryableError(err)
|
||||
}
|
||||
|
||||
func jikanTraceEnabled() bool {
|
||||
@@ -198,35 +116,6 @@ func logJikanCache(cacheKey string, source string, startedAt time.Time, err erro
|
||||
)
|
||||
}
|
||||
|
||||
func logJikanUpstream(urlStr string, statusCode int, attempts int, startedAt time.Time, err error) {
|
||||
duration := time.Since(startedAt)
|
||||
if !jikanTraceEnabled() && err == nil && statusCode < http.StatusBadRequest && duration < jikanSlowLogThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
level := observability.LogLevelInfo
|
||||
if err != nil || statusCode >= http.StatusInternalServerError {
|
||||
level = observability.LogLevelError
|
||||
} else if statusCode == http.StatusTooManyRequests || statusCode >= http.StatusBadRequest {
|
||||
level = observability.LogLevelWarn
|
||||
}
|
||||
|
||||
observability.LogJSON(
|
||||
level,
|
||||
"jikan_upstream",
|
||||
"jikan",
|
||||
"",
|
||||
map[string]any{
|
||||
"url": urlStr,
|
||||
"endpoint": metricsEndpoint(urlStr),
|
||||
"status": statusCode,
|
||||
"attempts": attempts,
|
||||
"duration_ms": float64(duration.Microseconds()) / 1000,
|
||||
},
|
||||
err,
|
||||
)
|
||||
}
|
||||
|
||||
func truncateErrorMessage(message string) string {
|
||||
if len(message) <= 400 {
|
||||
return message
|
||||
@@ -268,99 +157,20 @@ func (c *Client) EnqueueAnimeFetchRetry(parentCtx context.Context, animeID int,
|
||||
c.notifyRetryWorker()
|
||||
}
|
||||
|
||||
// waitRateLimit enforces Jikan's 3 req/sec rate limit with 400ms spacing.
|
||||
func (c *Client) waitRateLimit(ctx context.Context) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
// Jikan has a 3 req/sec limit AND a 60 req/min limit.
|
||||
// 400ms base delay keeps us safely under the 3/sec limit.
|
||||
nextAllowed := c.lastReqTime.Add(400 * time.Millisecond)
|
||||
if now.Before(nextAllowed) {
|
||||
timer := time.NewTimer(nextAllowed.Sub(now))
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("request canceled while waiting for rate limit: %w", ctx.Err())
|
||||
}
|
||||
c.lastReqTime = time.Now()
|
||||
} else {
|
||||
c.lastReqTime = now
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getCache retrieves cached data by key, returns true on cache hit.
|
||||
func (c *Client) getCache(parentCtx context.Context, key string, out any) bool {
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
data, err := c.db.GetJikanCache(ctx, key)
|
||||
if err != nil {
|
||||
c.metrics.ObserveCache("jikan", "miss")
|
||||
return false
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(data), out)
|
||||
if err != nil {
|
||||
c.metrics.ObserveCache("jikan", "miss")
|
||||
return false
|
||||
}
|
||||
|
||||
c.metrics.ObserveCache("jikan", "hit")
|
||||
return true
|
||||
return c.cache.Get(parentCtx, key, out)
|
||||
}
|
||||
|
||||
// getStaleCache retrieves expired-but-available cache by key.
|
||||
func (c *Client) getStaleCache(parentCtx context.Context, key string, out any) bool {
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
data, err := c.db.GetJikanCacheStale(ctx, key)
|
||||
if err != nil {
|
||||
c.metrics.ObserveCache("jikan_stale", "miss")
|
||||
return false
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(data), out)
|
||||
if err != nil {
|
||||
c.metrics.ObserveCache("jikan_stale", "miss")
|
||||
return false
|
||||
}
|
||||
|
||||
c.metrics.ObserveCache("jikan_stale", "hit")
|
||||
return true
|
||||
return c.cache.GetStale(parentCtx, key, out)
|
||||
}
|
||||
|
||||
// setCache stores data in cache with specified TTL.
|
||||
func (c *Client) setCache(parentCtx context.Context, key string, data any, ttl time.Duration) {
|
||||
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
||||
defer cancel()
|
||||
c.cache.Set(parentCtx, key, data, ttl)
|
||||
}
|
||||
|
||||
bytes, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = c.db.SetJikanCache(ctx, db.SetJikanCacheParams{
|
||||
Key: key,
|
||||
Data: string(bytes),
|
||||
ExpiresAt: time.Now().Add(ttl),
|
||||
})
|
||||
if err != nil {
|
||||
observability.LogJSON(
|
||||
observability.LogLevelError,
|
||||
"jikan_cache_set",
|
||||
"jikan",
|
||||
"",
|
||||
map[string]any{"cache_key": key},
|
||||
err,
|
||||
)
|
||||
}
|
||||
func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) error {
|
||||
return c.fetcher.FetchWithRetry(ctx, urlStr, out)
|
||||
}
|
||||
|
||||
// isEmptyResult detects if response contains no meaningful data.
|
||||
@@ -411,7 +221,7 @@ func (c *Client) refreshWithCache(ctx context.Context, cacheKey string, ttl time
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Don't cache empty results to avoid caching failures
|
||||
// Don't cache empty results to avoid caching failures.
|
||||
if isEmptyResult(out) {
|
||||
return nil, fmt.Errorf("jikan: empty response for %s", cacheKey)
|
||||
}
|
||||
@@ -488,160 +298,3 @@ func (c *Client) getWithCache(ctx context.Context, cacheKey string, ttl time.Dur
|
||||
logJikanCache(cacheKey, "refresh", startedAt, nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
// fetchWithRetry makes HTTP request with exponential backoff retry on transient failures.
|
||||
func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) error {
|
||||
maxRetries := 5
|
||||
startedAt := time.Now()
|
||||
attempts := 0
|
||||
endpoint := metricsEndpoint(urlStr)
|
||||
logAndReturn := func(statusCode int, err error) error {
|
||||
c.metrics.ObserveJikanRequest(endpoint, statusCode, time.Since(startedAt), err)
|
||||
logJikanUpstream(urlStr, statusCode, attempts, startedAt, err)
|
||||
return err
|
||||
}
|
||||
|
||||
for attempt := range maxRetries {
|
||||
attempts = attempt + 1
|
||||
if err := c.prepareRetryAttempt(ctx); err != nil {
|
||||
return logAndReturn(0, err)
|
||||
}
|
||||
|
||||
resp, err := c.doRequest(ctx, urlStr)
|
||||
if err != nil {
|
||||
retry, requestErr := handleRequestRetry(ctx, err, attempt, maxRetries)
|
||||
if retry {
|
||||
continue
|
||||
}
|
||||
|
||||
return logAndReturn(0, requestErr)
|
||||
}
|
||||
|
||||
statusCode, retry, err := handleResponseRetry(ctx, resp, urlStr, out, attempt, maxRetries)
|
||||
if retry {
|
||||
continue
|
||||
}
|
||||
|
||||
return logAndReturn(statusCode, err)
|
||||
}
|
||||
|
||||
return logAndReturn(0, fmt.Errorf("max retries exceeded for %s", urlStr))
|
||||
}
|
||||
|
||||
func (c *Client) prepareRetryAttempt(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
return c.waitRateLimit(ctx)
|
||||
}
|
||||
|
||||
func (c *Client) doRequest(ctx context.Context, urlStr string) (*http.Response, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create jikan request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("User-Agent", netutil.Generic)
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func handleRequestRetry(ctx context.Context, err error, attempt int, maxRetries int) (bool, error) {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return false, fmt.Errorf("request canceled while retrying jikan request: %w", err)
|
||||
}
|
||||
|
||||
if attempt >= maxRetries-1 || !IsRetryableError(err) {
|
||||
return false, fmt.Errorf("jikan api error: %w", err)
|
||||
}
|
||||
|
||||
if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil {
|
||||
return false, retryErr
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func handleResponseRetry(ctx context.Context, resp *http.Response, urlStr string, out any, attempt int, maxRetries int) (int, bool, error) {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return handleStatusRetry(ctx, resp, urlStr, out, attempt, maxRetries)
|
||||
}
|
||||
|
||||
err := json.NewDecoder(resp.Body).Decode(out)
|
||||
_ = resp.Body.Close()
|
||||
if err == nil {
|
||||
return resp.StatusCode, false, nil
|
||||
}
|
||||
|
||||
if attempt < maxRetries-1 {
|
||||
if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil {
|
||||
return resp.StatusCode, false, retryErr
|
||||
}
|
||||
return resp.StatusCode, true, nil
|
||||
}
|
||||
|
||||
return resp.StatusCode, false, fmt.Errorf("failed to decode jikan response: %w", err)
|
||||
}
|
||||
|
||||
func handleStatusRetry(ctx context.Context, resp *http.Response, urlStr string, out any, attempt int, maxRetries int) (int, bool, error) {
|
||||
statusCode := resp.StatusCode
|
||||
apiErr := &APIError{StatusCode: statusCode, URL: urlStr}
|
||||
|
||||
retryAfter := time.Duration(0)
|
||||
if parsed, ok := parseRetryAfter(resp.Header.Get("Retry-After")); ok {
|
||||
retryAfter = parsed
|
||||
}
|
||||
|
||||
if isRetryableStatus(statusCode) && attempt < maxRetries-1 {
|
||||
_ = resp.Body.Close()
|
||||
if retryErr := waitForRetry(ctx, max(retryAfter, retryDelay(attempt))); retryErr != nil {
|
||||
return statusCode, false, retryErr
|
||||
}
|
||||
return statusCode, true, nil
|
||||
}
|
||||
|
||||
// Best-effort decode (often useful for debugging), but still treat non-200 as error.
|
||||
_ = json.NewDecoder(resp.Body).Decode(out)
|
||||
_ = resp.Body.Close()
|
||||
return statusCode, false, apiErr
|
||||
}
|
||||
|
||||
func metricsEndpoint(urlStr string) string {
|
||||
trimmed := strings.TrimSpace(urlStr)
|
||||
if trimmed == "" {
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
prefix := "https://api.jikan.moe/v4"
|
||||
trimmed = strings.TrimPrefix(trimmed, prefix)
|
||||
|
||||
if idx := strings.Index(trimmed, "?"); idx >= 0 {
|
||||
trimmed = trimmed[:idx]
|
||||
}
|
||||
|
||||
parts := strings.Split(trimmed, "/")
|
||||
out := make([]string, 0, len(parts))
|
||||
for _, part := range parts {
|
||||
if part == "" {
|
||||
continue
|
||||
}
|
||||
if _, err := strconv.Atoi(part); err == nil {
|
||||
out = append(out, "{id}")
|
||||
continue
|
||||
}
|
||||
out = append(out, part)
|
||||
}
|
||||
|
||||
if len(out) == 0 {
|
||||
return "/"
|
||||
}
|
||||
|
||||
return "/" + strings.Join(out, "/")
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ func TestGetWithCacheReturnsStaleAndRefreshesAsync(t *testing.T) {
|
||||
stale := TopAnimeResponse{Data: []Anime{{MalID: 1, Title: "stale"}}}
|
||||
insertCachedResponse(t, sqlDB, "top:1", stale, time.Now().Add(-time.Hour))
|
||||
|
||||
client.httpClient = &http.Client{
|
||||
client.fetcher.HTTPClient = &http.Client{
|
||||
Transport: roundTripFunc(func(*http.Request) (*http.Response, error) {
|
||||
body := `{"data":[{"mal_id":2,"title":"fresh"}]}`
|
||||
return &http.Response{
|
||||
|
||||
42
integrations/jikan/rate/limiter.go
Normal file
42
integrations/jikan/rate/limiter.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package rate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Limiter struct {
|
||||
mu sync.Mutex
|
||||
lastReqTime time.Time
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
func NewLimiter(interval time.Duration) *Limiter {
|
||||
return &Limiter{interval: interval}
|
||||
}
|
||||
|
||||
// Wait enforces minimum spacing between upstream Jikan requests.
|
||||
func (l *Limiter) Wait(ctx context.Context) error {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
nextAllowed := l.lastReqTime.Add(l.interval)
|
||||
if now.Before(nextAllowed) {
|
||||
timer := time.NewTimer(nextAllowed.Sub(now))
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("request canceled while waiting for rate limit: %w", ctx.Err())
|
||||
}
|
||||
l.lastReqTime = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
l.lastReqTime = now
|
||||
return nil
|
||||
}
|
||||
@@ -84,7 +84,7 @@ func (c *Client) refreshWatchOrder(ctx context.Context, id int) (watchorder.Watc
|
||||
requestCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
result, err := watchorder.FetchWatchOrder(requestCtx, c.httpClient, watchOrderURL)
|
||||
result, err := watchorder.FetchWatchOrder(requestCtx, c.fetcher.HTTPClient, watchOrderURL)
|
||||
if err != nil {
|
||||
var statusError *watchorder.HTTPStatusError
|
||||
if errors.As(err, &statusError) && statusError.StatusCode == http.StatusNotFound {
|
||||
|
||||
323
integrations/jikan/transport/client.go
Normal file
323
integrations/jikan/transport/client.go
Normal file
@@ -0,0 +1,323 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"mal/integrations/jikan/rate"
|
||||
"mal/internal/observability"
|
||||
netutil "mal/pkg/net"
|
||||
)
|
||||
|
||||
const slowLogThreshold = 750 * time.Millisecond
|
||||
|
||||
type Client struct {
|
||||
HTTPClient *http.Client
|
||||
Limiter *rate.Limiter
|
||||
Metrics *observability.Metrics
|
||||
TraceEnabled func() bool
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
HTTPClient *http.Client
|
||||
Limiter *rate.Limiter
|
||||
Metrics *observability.Metrics
|
||||
TraceEnabled func() bool
|
||||
}
|
||||
|
||||
type APIError struct {
|
||||
StatusCode int
|
||||
URL string
|
||||
}
|
||||
|
||||
func (e *APIError) Error() string {
|
||||
return fmt.Sprintf("jikan api returned status %d", e.StatusCode)
|
||||
}
|
||||
|
||||
func NewHTTPClient() *http.Client {
|
||||
return &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
Transport: &http.Transport{
|
||||
MaxIdleConns: 10,
|
||||
IdleConnTimeout: 30 * time.Second,
|
||||
DisableKeepAlives: false,
|
||||
TLSHandshakeTimeout: 5 * time.Second,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func NewClient(cfg Config) *Client {
|
||||
return &Client{
|
||||
HTTPClient: cfg.HTTPClient,
|
||||
Limiter: cfg.Limiter,
|
||||
Metrics: cfg.Metrics,
|
||||
TraceEnabled: cfg.TraceEnabled,
|
||||
}
|
||||
}
|
||||
|
||||
// IsRetryableError returns true if the error should trigger a retry.
|
||||
func IsRetryableError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var apiErr *APIError
|
||||
if errors.As(err, &apiErr) {
|
||||
return isRetryableStatus(apiErr.StatusCode)
|
||||
}
|
||||
|
||||
var netErr net.Error
|
||||
if errors.As(err, &netErr) {
|
||||
return true
|
||||
}
|
||||
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// FetchWithRetry makes an HTTP request with exponential backoff on transient failures.
|
||||
func (c *Client) FetchWithRetry(ctx context.Context, urlStr string, out any) error {
|
||||
maxRetries := 5
|
||||
startedAt := time.Now()
|
||||
attempts := 0
|
||||
endpoint := metricsEndpoint(urlStr)
|
||||
logAndReturn := func(statusCode int, err error) error {
|
||||
c.Metrics.ObserveJikanRequest(endpoint, statusCode, time.Since(startedAt), err)
|
||||
c.logUpstream(urlStr, statusCode, attempts, startedAt, err)
|
||||
return err
|
||||
}
|
||||
|
||||
for attempt := range maxRetries {
|
||||
attempts = attempt + 1
|
||||
if err := c.prepareRetryAttempt(ctx); err != nil {
|
||||
return logAndReturn(0, err)
|
||||
}
|
||||
|
||||
resp, err := c.doRequest(ctx, urlStr)
|
||||
if err != nil {
|
||||
retry, requestErr := handleRequestRetry(ctx, err, attempt, maxRetries)
|
||||
if retry {
|
||||
continue
|
||||
}
|
||||
|
||||
return logAndReturn(0, requestErr)
|
||||
}
|
||||
|
||||
statusCode, retry, err := handleResponseRetry(ctx, resp, urlStr, out, attempt, maxRetries)
|
||||
if retry {
|
||||
continue
|
||||
}
|
||||
|
||||
return logAndReturn(statusCode, err)
|
||||
}
|
||||
|
||||
return logAndReturn(0, fmt.Errorf("max retries exceeded for %s", urlStr))
|
||||
}
|
||||
|
||||
func (c *Client) prepareRetryAttempt(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
return c.Limiter.Wait(ctx)
|
||||
}
|
||||
|
||||
func (c *Client) doRequest(ctx context.Context, urlStr string) (*http.Response, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create jikan request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("User-Agent", netutil.Generic)
|
||||
resp, err := c.HTTPClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func handleRequestRetry(ctx context.Context, err error, attempt int, maxRetries int) (bool, error) {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return false, fmt.Errorf("request canceled while retrying jikan request: %w", err)
|
||||
}
|
||||
|
||||
if attempt >= maxRetries-1 || !IsRetryableError(err) {
|
||||
return false, fmt.Errorf("jikan api error: %w", err)
|
||||
}
|
||||
|
||||
if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil {
|
||||
return false, retryErr
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func handleResponseRetry(ctx context.Context, resp *http.Response, urlStr string, out any, attempt int, maxRetries int) (int, bool, error) {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return handleStatusRetry(ctx, resp, urlStr, out, attempt, maxRetries)
|
||||
}
|
||||
|
||||
err := json.NewDecoder(resp.Body).Decode(out)
|
||||
_ = resp.Body.Close()
|
||||
if err == nil {
|
||||
return resp.StatusCode, false, nil
|
||||
}
|
||||
|
||||
if attempt < maxRetries-1 {
|
||||
if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil {
|
||||
return resp.StatusCode, false, retryErr
|
||||
}
|
||||
return resp.StatusCode, true, nil
|
||||
}
|
||||
|
||||
return resp.StatusCode, false, fmt.Errorf("failed to decode jikan response: %w", err)
|
||||
}
|
||||
|
||||
func handleStatusRetry(ctx context.Context, resp *http.Response, urlStr string, out any, attempt int, maxRetries int) (int, bool, error) {
|
||||
statusCode := resp.StatusCode
|
||||
apiErr := &APIError{StatusCode: statusCode, URL: urlStr}
|
||||
|
||||
retryAfter := time.Duration(0)
|
||||
if parsed, ok := parseRetryAfter(resp.Header.Get("Retry-After")); ok {
|
||||
retryAfter = parsed
|
||||
}
|
||||
|
||||
if isRetryableStatus(statusCode) && attempt < maxRetries-1 {
|
||||
_ = resp.Body.Close()
|
||||
if retryErr := waitForRetry(ctx, max(retryAfter, retryDelay(attempt))); retryErr != nil {
|
||||
return statusCode, false, retryErr
|
||||
}
|
||||
return statusCode, true, nil
|
||||
}
|
||||
|
||||
// Best-effort decode (often useful for debugging), but still treat non-200 as error.
|
||||
_ = json.NewDecoder(resp.Body).Decode(out)
|
||||
_ = resp.Body.Close()
|
||||
return statusCode, false, apiErr
|
||||
}
|
||||
|
||||
func isRetryableStatus(statusCode int) bool {
|
||||
if statusCode == http.StatusTooManyRequests {
|
||||
return true
|
||||
}
|
||||
|
||||
return statusCode >= 500 && statusCode <= 504
|
||||
}
|
||||
|
||||
// retryDelay returns exponential backoff delay: 500ms, 1s, 2s, 4s, 8s (capped).
|
||||
func retryDelay(attempt int) time.Duration {
|
||||
base := 500 * time.Millisecond
|
||||
delay := base * time.Duration(1<<attempt)
|
||||
if delay > 8*time.Second {
|
||||
return 8 * time.Second
|
||||
}
|
||||
|
||||
return delay
|
||||
}
|
||||
|
||||
// parseRetryAfter parses Retry-After header value (seconds) into duration.
|
||||
func parseRetryAfter(value string) (time.Duration, bool) {
|
||||
trimmed := strings.TrimSpace(value)
|
||||
if trimmed == "" {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
seconds, err := strconv.Atoi(trimmed)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
if seconds <= 0 {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
return time.Duration(seconds) * time.Second, true
|
||||
}
|
||||
|
||||
func waitForRetry(ctx context.Context, delay time.Duration) error {
|
||||
timer := time.NewTimer(delay)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err())
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) logUpstream(urlStr string, statusCode int, attempts int, startedAt time.Time, err error) {
|
||||
duration := time.Since(startedAt)
|
||||
traceEnabled := c.TraceEnabled != nil && c.TraceEnabled()
|
||||
if !traceEnabled && err == nil && statusCode < http.StatusBadRequest && duration < slowLogThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
level := observability.LogLevelInfo
|
||||
if err != nil || statusCode >= http.StatusInternalServerError {
|
||||
level = observability.LogLevelError
|
||||
} else if statusCode == http.StatusTooManyRequests || statusCode >= http.StatusBadRequest {
|
||||
level = observability.LogLevelWarn
|
||||
}
|
||||
|
||||
observability.LogJSON(
|
||||
level,
|
||||
"jikan_upstream",
|
||||
"jikan",
|
||||
"",
|
||||
map[string]any{
|
||||
"url": urlStr,
|
||||
"endpoint": metricsEndpoint(urlStr),
|
||||
"status": statusCode,
|
||||
"attempts": attempts,
|
||||
"duration_ms": float64(duration.Microseconds()) / 1000,
|
||||
},
|
||||
err,
|
||||
)
|
||||
}
|
||||
|
||||
func metricsEndpoint(urlStr string) string {
|
||||
trimmed := strings.TrimSpace(urlStr)
|
||||
if trimmed == "" {
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
prefix := "https://api.jikan.moe/v4"
|
||||
trimmed = strings.TrimPrefix(trimmed, prefix)
|
||||
|
||||
if idx := strings.Index(trimmed, "?"); idx >= 0 {
|
||||
trimmed = trimmed[:idx]
|
||||
}
|
||||
|
||||
parts := strings.Split(trimmed, "/")
|
||||
out := make([]string, 0, len(parts))
|
||||
for _, part := range parts {
|
||||
if part == "" {
|
||||
continue
|
||||
}
|
||||
if _, err := strconv.Atoi(part); err == nil {
|
||||
out = append(out, "{id}")
|
||||
continue
|
||||
}
|
||||
out = append(out, part)
|
||||
}
|
||||
|
||||
if len(out) == 0 {
|
||||
return "/"
|
||||
}
|
||||
|
||||
return "/" + strings.Join(out, "/")
|
||||
}
|
||||
Reference in New Issue
Block a user