package jikan import ( "context" "encoding/json" "errors" "fmt" "reflect" "sync" "time" jcache "mal/integrations/jikan/cache" "mal/integrations/jikan/rate" jtransport "mal/integrations/jikan/transport" "mal/internal/config" "mal/internal/db" "mal/internal/observability" "golang.org/x/sync/singleflight" ) type Client struct { baseURL string db db.Querier retrySignal chan struct{} // signals retry worker to process queued retries sf singleflight.Group refreshSem chan struct{} cache *jcache.Store fetcher *jtransport.Client traceEnabled bool // Random anime pool for DDoS-proof truly random "Surprise Me" randomPool []Anime poolMu sync.RWMutex poolInitialized bool } const jikanSlowLogThreshold = 750 * time.Millisecond type APIError = jtransport.APIError func NewClient(cfg config.Config, queries *db.Queries, metrics *observability.Metrics) *Client { limiter := rate.NewLimiter(400 * time.Millisecond) client := &Client{ baseURL: "https://api.jikan.moe/v4", db: queries, retrySignal: make(chan struct{}, 1), refreshSem: make(chan struct{}, 4), cache: jcache.NewStore(queries, metrics), traceEnabled: cfg.JikanTrace, randomPool: make([]Anime, 0), } client.fetcher = jtransport.NewClient(jtransport.Config{ HTTPClient: jtransport.NewHTTPClient(), Limiter: limiter, Metrics: metrics, TraceEnabled: client.jikanTraceEnabled, }) return client } // IsRetryableError returns true if the error should trigger a retry. func IsRetryableError(err error) bool { return jtransport.IsRetryableError(err) } func (c *Client) jikanTraceEnabled() bool { return c.traceEnabled } func (c *Client) shouldSkipJikanCacheLog(source string, duration time.Duration, err error) bool { if c.jikanTraceEnabled() || err != nil { return false } if source == "fresh" { return duration < 50*time.Millisecond } if source == "refresh" { return duration < jikanSlowLogThreshold } return false } func jikanCacheLogLevel(source string, err error) observability.LogLevel { if err != nil { return observability.LogLevelError } if source != "fresh" && source != "refresh" { // Stale reads are expected sometimes, but worth tracking in logs. return observability.LogLevelWarn } return observability.LogLevelInfo } func (c *Client) logJikanCache(cacheKey string, source string, startedAt time.Time, err error) { if isContextError(err) { return } duration := time.Since(startedAt) if c.shouldSkipJikanCacheLog(source, duration, err) { return } observability.LogJSON( jikanCacheLogLevel(source, err), "jikan_cache", "jikan", "", map[string]any{ "cache_key": cacheKey, "source": source, "duration_ms": float64(duration.Microseconds()) / 1000, }, err, ) } func truncateErrorMessage(message string) string { if len(message) <= 400 { return message } return message[:400] } func isContextError(err error) bool { return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) } // notifyRetryWorker signals the retry worker, non-blocking. func (c *Client) notifyRetryWorker() { select { case c.retrySignal <- struct{}{}: default: } } // RetrySignal returns channel that signals when retries are enqueued. func (c *Client) RetrySignal() <-chan struct{} { return c.retrySignal } // EnqueueAnimeFetchRetry queues a failed anime fetch for later retry if the error is retryable. func (c *Client) EnqueueAnimeFetchRetry(parentCtx context.Context, animeID int, cause error) { if animeID <= 0 || !IsRetryableError(cause) { return } ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) defer cancel() err := c.db.EnqueueAnimeFetchRetry(ctx, db.EnqueueAnimeFetchRetryParams{ AnimeID: int64(animeID), LastError: truncateErrorMessage(cause.Error()), }) if err != nil { observability.Warn( "jikan_retry_enqueue_failed", "jikan", "", map[string]any{"anime_id": animeID}, err, ) return } c.notifyRetryWorker() } func (c *Client) getCache(parentCtx context.Context, key string, out any) bool { return c.cache.Get(parentCtx, key, out) } func (c *Client) getStaleCache(parentCtx context.Context, key string, out any) bool { return c.cache.GetStale(parentCtx, key, out) } func (c *Client) setCache(parentCtx context.Context, key string, data any, ttl time.Duration) { c.cache.Set(parentCtx, key, data, ttl) } func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) error { return c.fetcher.FetchWithRetry(ctx, urlStr, out) } // isEmptyResult detects if response contains no meaningful data. func isEmptyResult(out any) bool { switch v := out.(type) { case *TopAnimeResponse: return len(v.Data) == 0 case *SearchResponse: return false case *AnimeResponse: return v.Data.MalID == 0 case *EpisodesResponse: return len(v.Data) == 0 case *StaffResponse: return len(v.Data) == 0 case *StatisticsResponse: return v.Data.Total == 0 case *ThemesResponse: return len(v.Data.Openings) == 0 && len(v.Data.Endings) == 0 case *ReviewsResponse: return false // empty reviews is a valid state } return false } func cloneResponseTarget(out any) (any, bool) { if out == nil { return nil, false } outType := reflect.TypeOf(out) if outType.Kind() != reflect.Pointer || outType.Elem() == nil { return nil, false } return reflect.New(outType.Elem()).Interface(), true } func (c *Client) refreshWithCache(ctx context.Context, cacheKey string, ttl time.Duration, url string, out any) error { value, err, shared := c.sf.Do("refresh:"+cacheKey, func() (any, error) { if c.getCache(ctx, cacheKey, out) { if !isEmptyResult(out) { return json.Marshal(out) } } if err := c.fetchWithRetry(ctx, url, out); err != nil { return nil, err } // Don't cache empty results to avoid caching failures. if isEmptyResult(out) { return nil, fmt.Errorf("jikan: empty response for %s", cacheKey) } c.setCache(ctx, cacheKey, out, ttl) return json.Marshal(out) }) if err != nil { return err } if shared { observability.Info( "jikan_cache_refresh_shared", "jikan", "", map[string]any{"cache_key": cacheKey, "url": url}, ) } if bytes, ok := value.([]byte); ok { if err := json.Unmarshal(bytes, out); err == nil && !isEmptyResult(out) { return nil } } return fmt.Errorf("jikan: empty response for %s", cacheKey) } func (c *Client) refreshWithCacheAsync(cacheKey string, ttl time.Duration, url string, out any) { target, ok := cloneResponseTarget(out) if !ok { return } c.runAsyncRefresh(func(ctx context.Context) { if err := c.refreshWithCache(ctx, cacheKey, ttl, url, target); err != nil { observability.Warn( "jikan_async_cache_refresh_failed", "jikan", "", map[string]any{"cache_key": cacheKey, "url": url}, err, ) } }) } func (c *Client) runAsyncRefresh(refresh func(context.Context)) { select { case c.refreshSem <- struct{}{}: default: return } go func() { defer func() { <-c.refreshSem }() ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() refresh(ctx) }() } // getWithCache fetches URL with a stale-while-revalidate DB cache strategy. func (c *Client) getWithCache(ctx context.Context, cacheKey string, ttl time.Duration, url string, out any) error { startedAt := time.Now() if c.getCache(ctx, cacheKey, out) { if !isEmptyResult(out) { c.logJikanCache(cacheKey, "fresh", startedAt, nil) return nil } } if c.getStaleCache(ctx, cacheKey, out) && !isEmptyResult(out) { c.logJikanCache(cacheKey, "stale", startedAt, nil) c.refreshWithCacheAsync(cacheKey, ttl, url, out) return nil } if err := c.refreshWithCache(ctx, cacheKey, ttl, url, out); err != nil { if c.getStaleCache(ctx, cacheKey, out) && !isEmptyResult(out) { c.logJikanCache(cacheKey, "stale_after_error", startedAt, err) return nil } c.logJikanCache(cacheKey, "miss", startedAt, err) return err } c.logJikanCache(cacheKey, "refresh", startedAt, nil) return nil }