diff --git a/internal/episodes/service/service.go b/internal/episodes/service/service.go index 91b0729..0e49620 100644 --- a/internal/episodes/service/service.go +++ b/internal/episodes/service/service.go @@ -230,77 +230,18 @@ func (s *EpisodeService) fetchProviderAvailability(ctx context.Context, anime do } func (s *EpisodeService) providerID(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider, titles []string) (string, error) { - row, err := s.queries.GetEpisodeProviderMapping(ctx, db.GetEpisodeProviderMappingParams{ - AnimeID: int64(anime.MalID), - Provider: provider.Name(), - }) - if err == nil { - if row.FailedUntil.Valid && row.FailedUntil.Time.After(s.clock.Now()) { - s.metrics.ObserveCache("episode_provider_mapping", "hit") - return "", fmt.Errorf("cached provider mapping failure active until %s: %s", row.FailedUntil.Time.Format(time.RFC3339), row.LastError) - } - if strings.TrimSpace(row.ProviderShowID) != "" { - s.metrics.ObserveCache("episode_provider_mapping", "hit") - observability.Info( - "episodes_provider_id_cache_hit", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "provider": provider.Name(), - "provider_id": row.ProviderShowID, - }, - ) - return row.ProviderShowID, nil - } - s.metrics.ObserveCache("episode_provider_mapping", "miss") - } else if !errors.Is(err, sql.ErrNoRows) { - s.metrics.ObserveCache("episode_provider_mapping", "miss") - observability.Warn( - "episodes_provider_id_cache_read_failed", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "provider": provider.Name(), - }, - err, - ) - } else { - s.metrics.ObserveCache("episode_provider_mapping", "miss") + providerID, found, err := s.cachedProviderID(ctx, anime, provider) + if found || err != nil { + return providerID, err } - providerID, err := provider.ResolveEpisodeProviderID(ctx, anime.MalID, titles) + providerID, err = provider.ResolveEpisodeProviderID(ctx, anime.MalID, titles) if err != nil { - _ = s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{ - AnimeID: int64(anime.MalID), - Provider: provider.Name(), - ProviderShowID: "", - FailedUntil: sql.NullTime{Time: s.clock.Now().Add(time.Hour), Valid: true}, - LastError: truncate(err.Error(), 400), - }) + s.cacheProviderIDFailure(ctx, anime, provider, err) return "", err } - err = s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{ - AnimeID: int64(anime.MalID), - Provider: provider.Name(), - ProviderShowID: providerID, - FailedUntil: sql.NullTime{}, - LastError: "", - }) - if err != nil { - observability.Warn( - "episodes_provider_id_cache_write_failed", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "provider": provider.Name(), - }, - err, - ) - } + s.cacheProviderIDSuccess(ctx, anime, provider, providerID) observability.Info( "episodes_provider_id_resolved", "episodes", @@ -314,26 +255,88 @@ func (s *EpisodeService) providerID(ctx context.Context, anime domain.Anime, pro return providerID, nil } -func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability, source string, now time.Time, providerSuccess bool) (domain.CanonicalEpisodeList, error) { - var nextRefreshSQL sql.NullTime - if anime.Airing { - // During the hours immediately following a broadcast time, providers can lag. - // Keep retrying for a short window, even if the provider request succeeded. - lastBroadcast := nextBroadcastBeforeOrAt(anime, now) - if !lastBroadcast.IsZero() && now.Before(lastBroadcast.Add(retryWindow)) { - nextRefreshSQL = sql.NullTime{Time: now.Add(retryInterval).UTC(), Valid: true} - } else { - next := nextBroadcastAfter(anime, now) - if !next.IsZero() { - nextRefreshSQL = sql.NullTime{Time: next, Valid: true} - } else { - // Broadcast metadata is often missing or wrong for currently airing shows. - // Avoid "never refresh again" caches by falling back to a fixed interval. - nextRefreshSQL = sql.NullTime{Time: now.Add(airingFallbackRefreshInterval).UTC(), Valid: true} - } +func (s *EpisodeService) cachedProviderID(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider) (string, bool, error) { + row, err := s.queries.GetEpisodeProviderMapping(ctx, db.GetEpisodeProviderMappingParams{ + AnimeID: int64(anime.MalID), + Provider: provider.Name(), + }) + if err != nil { + s.metrics.ObserveCache("episode_provider_mapping", "miss") + if errors.Is(err, sql.ErrNoRows) { + return "", false, nil } + observability.Warn( + "episodes_provider_id_cache_read_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "provider": provider.Name(), + }, + err, + ) + return "", false, nil } + if row.FailedUntil.Valid && row.FailedUntil.Time.After(s.clock.Now()) { + s.metrics.ObserveCache("episode_provider_mapping", "hit") + return "", true, fmt.Errorf("cached provider mapping failure active until %s: %s", row.FailedUntil.Time.Format(time.RFC3339), row.LastError) + } + if strings.TrimSpace(row.ProviderShowID) == "" { + s.metrics.ObserveCache("episode_provider_mapping", "miss") + return "", false, nil + } + + s.metrics.ObserveCache("episode_provider_mapping", "hit") + observability.Info( + "episodes_provider_id_cache_hit", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "provider": provider.Name(), + "provider_id": row.ProviderShowID, + }, + ) + return row.ProviderShowID, true, nil +} + +func (s *EpisodeService) cacheProviderIDFailure(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider, resolveErr error) { + _ = s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{ + AnimeID: int64(anime.MalID), + Provider: provider.Name(), + ProviderShowID: "", + FailedUntil: sql.NullTime{Time: s.clock.Now().Add(time.Hour), Valid: true}, + LastError: truncate(resolveErr.Error(), 400), + }) +} + +func (s *EpisodeService) cacheProviderIDSuccess(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider, providerID string) { + err := s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{ + AnimeID: int64(anime.MalID), + Provider: provider.Name(), + ProviderShowID: providerID, + FailedUntil: sql.NullTime{}, + LastError: "", + }) + if err == nil { + return + } + + observability.Warn( + "episodes_provider_id_cache_write_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "provider": provider.Name(), + }, + err, + ) +} + +func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability, source string, now time.Time, providerSuccess bool) (domain.CanonicalEpisodeList, error) { + nextRefreshSQL := nextRefreshAt(anime, now) episodes := mergeEpisodes(jikanEpisodes, availability, anime.Episodes) payload := domain.CanonicalEpisodeList{ AnimeID: anime.MalID, @@ -349,32 +352,7 @@ func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpi return domain.CanonicalEpisodeList{}, err } - var retryUntil sql.NullTime - if anime.Airing && providerSuccess { - retryUntil = sql.NullTime{Time: nextRefreshSQL.Time.Add(retryWindow), Valid: nextRefreshSQL.Valid} - } - - err = s.queries.UpsertEpisodeAvailabilityCache(ctx, db.UpsertEpisodeAvailabilityCacheParams{ - AnimeID: int64(anime.MalID), - Data: string(body), - NextRefreshAt: nextRefreshSQL, - RetryUntilAt: retryUntil, - LastAttemptAt: sql.NullTime{Time: now, Valid: true}, - LastSuccessAt: sql.NullTime{Time: now, Valid: providerSuccess}, - FailureCount: 0, - LastError: "", - }) - if err != nil { - observability.Warn( - "episodes_cache_write_failed", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "source": source, - }, - err, - ) + if !s.writeEpisodeAvailabilityCache(ctx, anime, source, body, now, providerSuccess, nextRefreshSQL) { return payload, nil } @@ -392,6 +370,61 @@ func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpi return payload, nil } +func nextRefreshAt(anime domain.Anime, now time.Time) sql.NullTime { + if !anime.Airing { + return sql.NullTime{} + } + + // During the hours immediately following a broadcast time, providers can lag. + // Keep retrying for a short window, even if the provider request succeeded. + lastBroadcast := nextBroadcastBeforeOrAt(anime, now) + if !lastBroadcast.IsZero() && now.Before(lastBroadcast.Add(retryWindow)) { + return sql.NullTime{Time: now.Add(retryInterval).UTC(), Valid: true} + } + + next := nextBroadcastAfter(anime, now) + if !next.IsZero() { + return sql.NullTime{Time: next, Valid: true} + } + + // Broadcast metadata is often missing or wrong for currently airing shows. + // Avoid "never refresh again" caches by falling back to a fixed interval. + return sql.NullTime{Time: now.Add(airingFallbackRefreshInterval).UTC(), Valid: true} +} + +func (s *EpisodeService) writeEpisodeAvailabilityCache(ctx context.Context, anime domain.Anime, source string, body []byte, now time.Time, providerSuccess bool, nextRefreshSQL sql.NullTime) bool { + var retryUntil sql.NullTime + if anime.Airing && providerSuccess { + retryUntil = sql.NullTime{Time: nextRefreshSQL.Time.Add(retryWindow), Valid: nextRefreshSQL.Valid} + } + + err := s.queries.UpsertEpisodeAvailabilityCache(ctx, db.UpsertEpisodeAvailabilityCacheParams{ + AnimeID: int64(anime.MalID), + Data: string(body), + NextRefreshAt: nextRefreshSQL, + RetryUntilAt: retryUntil, + LastAttemptAt: sql.NullTime{Time: now, Valid: true}, + LastSuccessAt: sql.NullTime{Time: now, Valid: providerSuccess}, + FailureCount: 0, + LastError: "", + }) + if err == nil { + return true + } + + observability.Warn( + "episodes_cache_write_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "source": source, + }, + err, + ) + return false +} + func (s *EpisodeService) markFailure(ctx context.Context, anime domain.Anime, cause error) { now := s.clock.Now() next := nextRetryTime(anime, now) @@ -475,46 +508,12 @@ func (s *EpisodeService) getFreshCached(ctx context.Context, anime domain.Anime) } now := s.clock.Now() - if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(now) { - s.metrics.ObserveCache("episode_availability_fresh", "miss") - observability.Info( - "episodes_cache_due_for_refresh", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "next_refresh": row.NextRefreshAt.Time.Format(time.RFC3339), - }, - ) + if !s.isFreshEpisodeCache(anime, row, now) { return domain.CanonicalEpisodeList{}, false } - if anime.Airing && row.UpdatedAt.Before(now.Add(-airingFallbackRefreshInterval)) { - s.metrics.ObserveCache("episode_availability_fresh", "miss") - observability.Info( - "episodes_cache_too_old_for_airing", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "updated_at": row.UpdatedAt.Format(time.RFC3339), - }, - ) - return domain.CanonicalEpisodeList{}, false - } - - var payload domain.CanonicalEpisodeList - if err := json.Unmarshal([]byte(row.Data), &payload); err != nil { - s.metrics.ObserveCache("episode_availability_fresh", "miss") - observability.Warn( - "episodes_cached_payload_invalid", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - }, - err, - ) + payload, ok := s.decodeFreshCachedPayload(anime, row.Data) + if !ok { return domain.CanonicalEpisodeList{}, false } if !isCanonicalEpisodePayloadValid(payload, anime.Episodes) { @@ -545,6 +544,56 @@ func (s *EpisodeService) getFreshCached(ctx context.Context, anime domain.Anime) return payload, true } +func (s *EpisodeService) isFreshEpisodeCache(anime domain.Anime, row db.EpisodeAvailabilityCache, now time.Time) bool { + if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(now) { + s.metrics.ObserveCache("episode_availability_fresh", "miss") + observability.Info( + "episodes_cache_due_for_refresh", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "next_refresh": row.NextRefreshAt.Time.Format(time.RFC3339), + }, + ) + return false + } + if anime.Airing && row.UpdatedAt.Before(now.Add(-airingFallbackRefreshInterval)) { + s.metrics.ObserveCache("episode_availability_fresh", "miss") + observability.Info( + "episodes_cache_too_old_for_airing", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "updated_at": row.UpdatedAt.Format(time.RFC3339), + }, + ) + return false + } + return true +} + +func (s *EpisodeService) decodeFreshCachedPayload(anime domain.Anime, raw string) (domain.CanonicalEpisodeList, bool) { + var payload domain.CanonicalEpisodeList + err := json.Unmarshal([]byte(raw), &payload) + if err == nil { + return payload, true + } + + s.metrics.ObserveCache("episode_availability_fresh", "miss") + observability.Warn( + "episodes_cached_payload_invalid", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + }, + err, + ) + return domain.CanonicalEpisodeList{}, false +} + func isCanonicalEpisodePayloadValid(payload domain.CanonicalEpisodeList, expectedCount int) bool { if expectedCount <= 0 { return true