From b0bebec6568f108e6f1ee92fa368c1ef041b3184 Mon Sep 17 00:00:00 2001 From: mkelvers Date: Sun, 21 Jun 2026 16:49:30 +0200 Subject: [PATCH] fix: use provider availability for episodes --- internal/episodes/service/cache_store.go | 20 +++++++-- internal/episodes/service/merge.go | 37 +++++++++++++++- internal/episodes/service/service.go | 2 +- internal/episodes/service/service_test.go | 51 ++++++++++++++++++++--- 4 files changed, 99 insertions(+), 11 deletions(-) diff --git a/internal/episodes/service/cache_store.go b/internal/episodes/service/cache_store.go index 816e68a..1374803 100644 --- a/internal/episodes/service/cache_store.go +++ b/internal/episodes/service/cache_store.go @@ -131,8 +131,8 @@ func (s *EpisodeService) markFailure(ctx context.Context, anime domain.Anime, ca ) } -func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) { - row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID)) +func (s *EpisodeService) getCached(ctx context.Context, anime domain.Anime) (domain.CanonicalEpisodeList, bool) { + row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(anime.MalID)) if err != nil { s.metrics.ObserveCache("episode_availability", "miss") return domain.CanonicalEpisodeList{}, false @@ -145,12 +145,26 @@ func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.Can "episodes", "", map[string]any{ - "anime_id": animeID, + "anime_id": anime.MalID, }, err, ) return domain.CanonicalEpisodeList{}, false } + if !isCanonicalEpisodePayloadValid(payload, anime.Episodes) { + s.metrics.ObserveCache("episode_availability", "miss") + observability.Info( + "episodes_cached_payload_rejected", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "expected_count": anime.Episodes, + "cached_episodes": len(payload.Episodes), + }, + ) + return domain.CanonicalEpisodeList{}, false + } s.metrics.ObserveCache("episode_availability", "hit") return payload, true } diff --git a/internal/episodes/service/merge.go b/internal/episodes/service/merge.go index fdf56f0..98d5e02 100644 --- a/internal/episodes/service/merge.go +++ b/internal/episodes/service/merge.go @@ -36,7 +36,7 @@ func titleCandidates(anime domain.Anime) []string { func isCanonicalEpisodePayloadValid(payload domain.CanonicalEpisodeList, expectedCount int) bool { if expectedCount <= 0 { - return true + return providerBackedPayloadHasAvailability(payload) } if len(payload.Episodes) > expectedCount { return false @@ -46,18 +46,36 @@ func isCanonicalEpisodePayloadValid(payload domain.CanonicalEpisodeList, expecte return false } } + return providerBackedPayloadHasAvailability(payload) +} + +func providerBackedPayloadHasAvailability(payload domain.CanonicalEpisodeList) bool { + if payload.Source == "" || payload.Source == "jikan_fallback" || payload.Source == "legacy_disabled" { + return true + } + for _, episode := range payload.Episodes { + if !episode.HasSub && !episode.HasDub { + return false + } + } return true } func mergeEpisodes(jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability, expectedCount int) []domain.CanonicalEpisode { byNumber := map[int]episodePartial{} + providerNumbers := availableEpisodeNumbers(availability, expectedCount) + providerBacked := len(providerNumbers) > 0 + + for number := range providerNumbers { + mergeEpisode(&byNumber, number, func(item *episodePartial) {}) + } for i, ep := range jikanEpisodes { if exceedsExpectedCount(i+1, expectedCount) { break } number, ok := jikanEpisodeNumber(ep, i) - if !ok || exceedsExpectedCount(number, expectedCount) { + if !ok || exceedsExpectedCount(number, expectedCount) || (providerBacked && !providerNumbers[number]) { continue } mergeEpisode(&byNumber, number, func(item *episodePartial) { @@ -95,6 +113,21 @@ func mergeEpisodes(jikanEpisodes []jikan.Episode, availability domain.EpisodeAva return episodes } +func availableEpisodeNumbers(availability domain.EpisodeAvailability, expectedCount int) map[int]bool { + numbers := map[int]bool{} + for _, number := range availability.Sub { + if number > 0 && !exceedsExpectedCount(number, expectedCount) { + numbers[number] = true + } + } + for _, number := range availability.Dub { + if number > 0 && !exceedsExpectedCount(number, expectedCount) { + numbers[number] = true + } + } + return numbers +} + func mergeEpisode(byNumber *map[int]episodePartial, number int, update func(*episodePartial)) { item := (*byNumber)[number] update(&item) diff --git a/internal/episodes/service/service.go b/internal/episodes/service/service.go index ac2e9fc..d2891ca 100644 --- a/internal/episodes/service/service.go +++ b/internal/episodes/service/service.go @@ -143,7 +143,7 @@ func (s *EpisodeService) refresh(ctx context.Context, anime domain.Anime) (domai providerAvailability, source, providerErr := s.fetchProviderAvailability(ctx, anime) if providerErr != nil { s.markFailure(ctx, anime, providerErr) - if cached, ok := s.getCached(ctx, anime.MalID); ok { + if cached, ok := s.getCached(ctx, anime); ok { observability.Warn( "episodes_provider_failed_serving_stale_cache", "episodes", diff --git a/internal/episodes/service/service_test.go b/internal/episodes/service/service_test.go index e92405d..790f66b 100644 --- a/internal/episodes/service/service_test.go +++ b/internal/episodes/service/service_test.go @@ -7,7 +7,7 @@ import ( "time" ) -func TestMergeEpisodesUsesUnionAndSynthesizesProviderOnlyEntries(t *testing.T) { +func TestMergeEpisodesUsesProviderAvailabilityAsSourceOfTruth(t *testing.T) { episodes := mergeEpisodes([]jikan.Episode{ {MalID: 101, Episode: "1", Title: "Start"}, {MalID: 102, Episode: "2", Title: "Second", Filler: true}, @@ -17,15 +17,28 @@ func TestMergeEpisodesUsesUnionAndSynthesizesProviderOnlyEntries(t *testing.T) { Dub: []int{1, 2, 3}, }, 0) - if len(episodes) != 5 { - t.Fatalf("len(episodes) = %d, want 5", len(episodes)) + if len(episodes) != 4 { + t.Fatalf("len(episodes) = %d, want 4", len(episodes)) } assertEpisode(t, episodes[0], 1, "Start", true, true, false, false, false) assertEpisode(t, episodes[1], 2, "Second", true, true, false, true, false) assertEpisode(t, episodes[2], 3, "Episode 3", true, true, false, false, false) - assertEpisode(t, episodes[3], 5, "Future", false, false, false, false, true) - assertEpisode(t, episodes[4], 6, "Episode 6", true, false, true, false, false) + assertEpisode(t, episodes[3], 6, "Episode 6", true, false, true, false, false) +} + +func TestMergeEpisodesUsesJikanWhenProviderAvailabilityMissing(t *testing.T) { + episodes := mergeEpisodes([]jikan.Episode{ + {MalID: 101, Episode: "1", Title: "Start"}, + {MalID: 102, Episode: "2", Title: "Second"}, + }, domain.EpisodeAvailability{}, 0) + + if len(episodes) != 2 { + t.Fatalf("len(episodes) = %d, want 2", len(episodes)) + } + + assertEpisode(t, episodes[0], 1, "Start", false, false, false, false, false) + assertEpisode(t, episodes[1], 2, "Second", false, false, false, false, false) } func TestMergeEpisodesIgnoresInvalidJikanEpisodeNumbers(t *testing.T) { @@ -86,6 +99,34 @@ func TestIsCanonicalEpisodePayloadValidRejectsOverflowingCachedPayload(t *testin } } +func TestIsCanonicalEpisodePayloadValidRejectsProviderEpisodesWithoutAvailability(t *testing.T) { + payload := domain.CanonicalEpisodeList{ + Source: "AllAnime", + Episodes: []domain.CanonicalEpisode{ + {Number: 1, Title: "Episode 1", HasSub: true}, + {Number: 2, Title: "Episode 2"}, + }, + } + + if isCanonicalEpisodePayloadValid(payload, 13) { + t.Fatal("expected cached payload to be rejected") + } +} + +func TestIsCanonicalEpisodePayloadValidAllowsJikanFallbackWithoutAvailability(t *testing.T) { + payload := domain.CanonicalEpisodeList{ + Source: "jikan_fallback", + Episodes: []domain.CanonicalEpisode{ + {Number: 1, Title: "Episode 1"}, + {Number: 2, Title: "Episode 2"}, + }, + } + + if !isCanonicalEpisodePayloadValid(payload, 13) { + t.Fatal("expected cached payload to be valid") + } +} + func TestNextBroadcastAfterUsesJikanTimezone(t *testing.T) { anime := domain.Anime{Anime: jikan.Anime{MalID: 1}} anime.Broadcast.Day = "Saturdays"