package service import ( "context" "database/sql" "encoding/json" "errors" "fmt" "log" "mal/integrations/jikan" "mal/internal/db" "mal/internal/domain" "mal/internal/observability" "sort" "strings" "time" ) const ( retryInterval = 15 * time.Minute retryWindow = 3 * time.Hour ) type Clock interface { Now() time.Time } type realClock struct{} func (realClock) Now() time.Time { return time.Now() } type EpisodeService struct { queries *db.Queries jikan *jikan.Client providers []domain.EpisodeAvailabilityProvider clock Clock enabled bool metrics *observability.Metrics } func NewEpisodeService(queries *db.Queries, jikanClient *jikan.Client, providers []domain.EpisodeAvailabilityProvider, enabled bool, metrics *observability.Metrics) domain.EpisodeService { return NewEpisodeServiceWithClock(queries, jikanClient, providers, enabled, realClock{}, metrics) } func NewEpisodeServiceWithClock(queries *db.Queries, jikanClient *jikan.Client, providers []domain.EpisodeAvailabilityProvider, enabled bool, clock Clock, metrics *observability.Metrics) *EpisodeService { return &EpisodeService{ queries: queries, jikan: jikanClient, providers: providers, clock: clock, enabled: enabled, metrics: metrics, } } func (s *EpisodeService) GetCanonicalEpisodes(ctx context.Context, anime domain.Anime, forceRefresh bool) (domain.CanonicalEpisodeList, error) { if !s.enabled { return s.jikanOnly(ctx, anime, "legacy_disabled") } if !forceRefresh { if cached, ok := s.getFreshCached(ctx, anime.MalID); ok { return cached, nil } } return s.refresh(ctx, anime) } func (s *EpisodeService) RefreshTrackedDue(ctx context.Context, limit int) error { if !s.enabled { return nil } if limit <= 0 { limit = 25 } ids, err := s.queries.GetTrackedAiringAnimeIDsDueForEpisodeRefresh(ctx, int64(limit)) if err != nil { return fmt.Errorf("get due tracked anime: %w", err) } for _, id := range ids { anime, err := s.jikan.GetAnimeByID(ctx, int(id)) if err != nil { log.Printf("episodes: failed to fetch anime for refresh anime_id=%d error=%v", id, err) continue } if _, err := s.refresh(ctx, anime); err != nil { log.Printf("episodes: refresh failed anime_id=%d error=%v", id, err) } } return nil } func (s *EpisodeService) refresh(ctx context.Context, anime domain.Anime) (domain.CanonicalEpisodeList, error) { now := s.clock.Now() log.Printf("episodes: refresh start anime_id=%d title=%q airing=%t", anime.MalID, anime.DisplayTitle(), anime.Airing) jikanEpisodes, jikanErr := s.jikan.GetAllEpisodes(ctx, anime.MalID) if jikanErr != nil { log.Printf("episodes: jikan episode metadata failed anime_id=%d error=%v", anime.MalID, jikanErr) } providerAvailability, source, providerErr := s.fetchProviderAvailability(ctx, anime) if providerErr != nil { s.markFailure(ctx, anime, providerErr) if cached, ok := s.getCached(ctx, anime.MalID); ok { log.Printf("episodes: serving stale cache after provider failure anime_id=%d error=%v", anime.MalID, providerErr) return cached, nil } if jikanErr == nil { return s.store(ctx, anime, jikanEpisodes, domain.EpisodeAvailability{}, "jikan_fallback", now, false) } return domain.CanonicalEpisodeList{}, providerErr } return s.store(ctx, anime, jikanEpisodes, providerAvailability, source, now, true) } func (s *EpisodeService) fetchProviderAvailability(ctx context.Context, anime domain.Anime) (domain.EpisodeAvailability, string, error) { titles := titleCandidates(anime) for _, provider := range s.providers { providerID, err := s.providerID(ctx, anime, provider, titles) if err != nil { log.Printf("episodes: provider id miss anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) continue } available, err := provider.GetEpisodeAvailabilityByProviderID(ctx, providerID) if err != nil { log.Printf("episodes: provider availability miss anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) continue } log.Printf("episodes: provider availability hit anime_id=%d provider=%s sub=%d dub=%d", anime.MalID, provider.Name(), len(available.Sub), len(available.Dub)) return available, provider.Name(), nil } return domain.EpisodeAvailability{}, "", fmt.Errorf("no episode availability provider matched anime_id=%d", anime.MalID) } func (s *EpisodeService) providerID(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider, titles []string) (string, error) { row, err := s.queries.GetEpisodeProviderMapping(ctx, db.GetEpisodeProviderMappingParams{ AnimeID: int64(anime.MalID), Provider: provider.Name(), }) if err == nil { if row.FailedUntil.Valid && row.FailedUntil.Time.After(s.clock.Now()) { s.metrics.ObserveCache("episode_provider_mapping", "hit") return "", fmt.Errorf("cached provider mapping failure active until %s: %s", row.FailedUntil.Time.Format(time.RFC3339), row.LastError) } if strings.TrimSpace(row.ProviderShowID) != "" { s.metrics.ObserveCache("episode_provider_mapping", "hit") log.Printf("episodes: provider id cache hit anime_id=%d provider=%s provider_id=%s", anime.MalID, provider.Name(), row.ProviderShowID) return row.ProviderShowID, nil } s.metrics.ObserveCache("episode_provider_mapping", "miss") } else if !errors.Is(err, sql.ErrNoRows) { s.metrics.ObserveCache("episode_provider_mapping", "miss") log.Printf("episodes: provider id cache read failed anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) } else { s.metrics.ObserveCache("episode_provider_mapping", "miss") } providerID, err := provider.ResolveEpisodeProviderID(ctx, anime.MalID, titles) if err != nil { _ = s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{ AnimeID: int64(anime.MalID), Provider: provider.Name(), ProviderShowID: "", FailedUntil: sql.NullTime{Time: s.clock.Now().Add(time.Hour), Valid: true}, LastError: truncate(err.Error(), 400), }) return "", err } err = s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{ AnimeID: int64(anime.MalID), Provider: provider.Name(), ProviderShowID: providerID, FailedUntil: sql.NullTime{}, LastError: "", }) if err != nil { log.Printf("episodes: provider id cache write failed anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) } log.Printf("episodes: provider id resolved anime_id=%d provider=%s provider_id=%s", anime.MalID, provider.Name(), providerID) return providerID, nil } func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability, source string, now time.Time, providerSuccess bool) (domain.CanonicalEpisodeList, error) { nextRefresh := nextBroadcastAfter(anime, now) var nextRefreshSQL sql.NullTime if anime.Airing && !nextRefresh.IsZero() { nextRefreshSQL = sql.NullTime{Time: nextRefresh, Valid: true} } episodes := mergeEpisodes(jikanEpisodes, availability) payload := domain.CanonicalEpisodeList{ AnimeID: anime.MalID, Episodes: episodes, Source: source, } if nextRefreshSQL.Valid { payload.NextRefreshAt = nextRefreshSQL.Time.Format(time.RFC3339) } body, err := json.Marshal(payload) if err != nil { return domain.CanonicalEpisodeList{}, err } var retryUntil sql.NullTime if anime.Airing && providerSuccess { retryUntil = sql.NullTime{Time: nextRefreshSQL.Time.Add(retryWindow), Valid: nextRefreshSQL.Valid} } err = s.queries.UpsertEpisodeAvailabilityCache(ctx, db.UpsertEpisodeAvailabilityCacheParams{ AnimeID: int64(anime.MalID), Data: string(body), NextRefreshAt: nextRefreshSQL, RetryUntilAt: retryUntil, LastAttemptAt: sql.NullTime{Time: now, Valid: true}, LastSuccessAt: sql.NullTime{Time: now, Valid: providerSuccess}, FailureCount: 0, LastError: "", }) if err != nil { log.Printf("episodes: cache write failed anime_id=%d source=%s error=%v", anime.MalID, source, err) return payload, nil } log.Printf("episodes: refresh success anime_id=%d source=%s episodes=%d next_refresh=%s", anime.MalID, source, len(episodes), payload.NextRefreshAt) return payload, nil } func (s *EpisodeService) markFailure(ctx context.Context, anime domain.Anime, cause error) { now := s.clock.Now() next := nextRetryTime(anime, now) var retryUntil sql.NullTime nextBroadcast := nextBroadcastBeforeOrAt(anime, now) if !nextBroadcast.IsZero() { retryUntil = sql.NullTime{Time: nextBroadcast.Add(retryWindow), Valid: true} } var nextSQL sql.NullTime if !next.IsZero() { nextSQL = sql.NullTime{Time: next, Valid: true} } err := s.queries.MarkEpisodeAvailabilityRefreshFailed(ctx, db.MarkEpisodeAvailabilityRefreshFailedParams{ LastAttemptAt: sql.NullTime{Time: now, Valid: true}, LastError: truncate(cause.Error(), 400), NextRefreshAt: nextSQL, RetryUntilAt: retryUntil, AnimeID: int64(anime.MalID), }) if err != nil { log.Printf("episodes: failed to mark refresh failure anime_id=%d error=%v", anime.MalID, err) return } log.Printf("episodes: refresh failure recorded anime_id=%d next_retry=%s error=%v", anime.MalID, next.Format(time.RFC3339), cause) } func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) { row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID)) if err != nil { s.metrics.ObserveCache("episode_availability", "miss") return domain.CanonicalEpisodeList{}, false } var payload domain.CanonicalEpisodeList if err := json.Unmarshal([]byte(row.Data), &payload); err != nil { s.metrics.ObserveCache("episode_availability", "miss") log.Printf("episodes: invalid cached payload anime_id=%d error=%v", animeID, err) return domain.CanonicalEpisodeList{}, false } s.metrics.ObserveCache("episode_availability", "hit") return payload, true } func (s *EpisodeService) getFreshCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) { row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID)) if err != nil { s.metrics.ObserveCache("episode_availability_fresh", "miss") return domain.CanonicalEpisodeList{}, false } if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(s.clock.Now()) { s.metrics.ObserveCache("episode_availability_fresh", "miss") log.Printf("episodes: cached availability due for refresh anime_id=%d next_refresh=%s", animeID, row.NextRefreshAt.Time.Format(time.RFC3339)) return domain.CanonicalEpisodeList{}, false } var payload domain.CanonicalEpisodeList if err := json.Unmarshal([]byte(row.Data), &payload); err != nil { s.metrics.ObserveCache("episode_availability_fresh", "miss") log.Printf("episodes: invalid cached payload anime_id=%d error=%v", animeID, err) return domain.CanonicalEpisodeList{}, false } s.metrics.ObserveCache("episode_availability_fresh", "hit") log.Printf("episodes: served cached availability anime_id=%d episodes=%d next_refresh=%s", animeID, len(payload.Episodes), payload.NextRefreshAt) return payload, true } func (s *EpisodeService) jikanOnly(ctx context.Context, anime domain.Anime, source string) (domain.CanonicalEpisodeList, error) { episodes, err := s.jikan.GetAllEpisodes(ctx, anime.MalID) if err != nil { return domain.CanonicalEpisodeList{}, err } return domain.CanonicalEpisodeList{ AnimeID: anime.MalID, Episodes: mergeEpisodes(episodes, domain.EpisodeAvailability{}), Source: source, }, nil } func titleCandidates(anime domain.Anime) []string { out := []string{anime.Title} if anime.TitleEnglish != "" && anime.TitleEnglish != anime.Title { out = append(out, anime.TitleEnglish) } if anime.TitleJapanese != "" { out = append(out, anime.TitleJapanese) } for _, syn := range anime.TitleSynonyms { if syn != "" && syn != anime.Title && syn != anime.TitleEnglish && syn != anime.TitleJapanese { out = append(out, syn) } } return out } func mergeEpisodes(jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability) []domain.CanonicalEpisode { type partial struct { title string filler bool recap bool sub bool dub bool } byNumber := map[int]partial{} for _, ep := range jikanEpisodes { if ep.MalID <= 0 { continue } item := byNumber[ep.MalID] item.title = strings.TrimSpace(ep.Title) item.filler = ep.Filler item.recap = ep.Recap byNumber[ep.MalID] = item } for _, n := range availability.Sub { if n <= 0 { continue } item := byNumber[n] item.sub = true byNumber[n] = item } for _, n := range availability.Dub { if n <= 0 { continue } item := byNumber[n] item.dub = true byNumber[n] = item } numbers := make([]int, 0, len(byNumber)) for number := range byNumber { numbers = append(numbers, number) } sort.Ints(numbers) episodes := make([]domain.CanonicalEpisode, 0, len(numbers)) for _, number := range numbers { item := byNumber[number] title := item.title if title == "" { title = fmt.Sprintf("Episode %d", number) } episodes = append(episodes, domain.CanonicalEpisode{ Number: number, Title: title, HasSub: item.sub, HasDub: item.dub, SubOnly: item.sub && !item.dub, Filler: item.filler, Recap: item.recap, }) } return episodes } func nextRetryTime(anime domain.Anime, now time.Time) time.Time { broadcast := nextBroadcastBeforeOrAt(anime, now) if broadcast.IsZero() || now.After(broadcast.Add(retryWindow)) { return nextBroadcastAfter(anime, now) } return now.Add(retryInterval) } func nextBroadcastBeforeOrAt(anime domain.Anime, now time.Time) time.Time { next := nextBroadcastAfter(anime, now.AddDate(0, 0, -7)) if next.IsZero() || next.After(now) { return time.Time{} } return next } func nextBroadcastAfter(anime domain.Anime, after time.Time) time.Time { day := weekdayFromJikan(anime.Broadcast.Day) if day < 0 || strings.TrimSpace(anime.Broadcast.Time) == "" { return time.Time{} } loc := time.UTC if tz := strings.TrimSpace(anime.Broadcast.Timezone); tz != "" { if loaded, err := time.LoadLocation(tz); err == nil { loc = loaded } else { log.Printf("episodes: failed to parse broadcast timezone anime_id=%d timezone=%q error=%v", anime.MalID, tz, err) } } hour, minute, ok := parseBroadcastTime(anime.Broadcast.Time) if !ok { log.Printf("episodes: failed to parse broadcast time anime_id=%d time=%q", anime.MalID, anime.Broadcast.Time) return time.Time{} } localAfter := after.In(loc) daysAhead := (int(day) - int(localAfter.Weekday()) + 7) % 7 candidate := time.Date(localAfter.Year(), localAfter.Month(), localAfter.Day()+daysAhead, hour, minute, 0, 0, loc) if !candidate.After(localAfter) { candidate = candidate.AddDate(0, 0, 7) } return candidate.UTC() } func weekdayFromJikan(day string) time.Weekday { switch strings.ToLower(strings.TrimSpace(day)) { case "sundays": return time.Sunday case "mondays": return time.Monday case "tuesdays": return time.Tuesday case "wednesdays": return time.Wednesday case "thursdays": return time.Thursday case "fridays": return time.Friday case "saturdays": return time.Saturday default: return -1 } } func parseBroadcastTime(value string) (int, int, bool) { t, err := time.Parse("15:04", strings.TrimSpace(value)) if err != nil { return 0, 0, false } return t.Hour(), t.Minute(), true } func truncate(value string, maxLen int) string { if len(value) <= maxLen { return value } return value[:maxLen] }