From 501dcb7d3888278da94be8cb48702b9795154080 Mon Sep 17 00:00:00 2001 From: mkelvers Date: Sun, 17 May 2026 21:16:34 +0200 Subject: [PATCH] feat: add episode service with background refresh worker --- internal/episodes/module.go | 35 ++ internal/episodes/service/service.go | 459 +++++++++++++++++++++++++++ internal/episodes/worker.go | 44 +++ 3 files changed, 538 insertions(+) create mode 100644 internal/episodes/module.go create mode 100644 internal/episodes/service/service.go create mode 100644 internal/episodes/worker.go diff --git a/internal/episodes/module.go b/internal/episodes/module.go new file mode 100644 index 0000000..951b70a --- /dev/null +++ b/internal/episodes/module.go @@ -0,0 +1,35 @@ +package episodes + +import ( + "os" + "strings" + + "mal/integrations/jikan" + "mal/integrations/playback/allanime" + "mal/internal/db" + "mal/internal/domain" + episodeService "mal/internal/episodes/service" + + "go.uber.org/fx" +) + +func episodeAvailabilityEnabled() bool { + value := strings.ToLower(strings.TrimSpace(os.Getenv("EPISODE_AVAILABILITY_MODE"))) + return value != "legacy" && value != "jikan" +} + +var Module = fx.Options( + fx.Provide( + episodeAvailabilityEnabled, + fx.Annotate( + func(queries *db.Queries, jikanClient *jikan.Client, providers []domain.EpisodeAvailabilityProvider, enabled bool) domain.EpisodeService { + return episodeService.NewEpisodeService(queries, jikanClient, providers, enabled) + }, + fx.ParamTags(``, ``, ``, ``), + ), + ), + fx.Provide(func(p *allanime.AllAnimeProvider) []domain.EpisodeAvailabilityProvider { + return []domain.EpisodeAvailabilityProvider{p} + }), + fx.Invoke(RegisterWorker), +) diff --git a/internal/episodes/service/service.go b/internal/episodes/service/service.go new file mode 100644 index 0000000..ede335f --- /dev/null +++ b/internal/episodes/service/service.go @@ -0,0 +1,459 @@ +package service + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "log" + "mal/integrations/jikan" + "mal/internal/db" + "mal/internal/domain" + "sort" + "strings" + "time" +) + +const ( + retryInterval = 15 * time.Minute + retryWindow = 3 * time.Hour +) + +type Clock interface { + Now() time.Time +} + +type realClock struct{} + +func (realClock) Now() time.Time { return time.Now() } + +type EpisodeService struct { + queries *db.Queries + jikan *jikan.Client + providers []domain.EpisodeAvailabilityProvider + clock Clock + enabled bool +} + +func NewEpisodeService(queries *db.Queries, jikanClient *jikan.Client, providers []domain.EpisodeAvailabilityProvider, enabled bool) domain.EpisodeService { + return NewEpisodeServiceWithClock(queries, jikanClient, providers, enabled, realClock{}) +} + +func NewEpisodeServiceWithClock(queries *db.Queries, jikanClient *jikan.Client, providers []domain.EpisodeAvailabilityProvider, enabled bool, clock Clock) *EpisodeService { + return &EpisodeService{ + queries: queries, + jikan: jikanClient, + providers: providers, + clock: clock, + enabled: enabled, + } +} + +func (s *EpisodeService) GetCanonicalEpisodes(ctx context.Context, anime domain.Anime, forceRefresh bool) (domain.CanonicalEpisodeList, error) { + if !s.enabled { + return s.jikanOnly(ctx, anime, "legacy_disabled") + } + + if !forceRefresh { + if cached, ok := s.getFreshCached(ctx, anime.MalID); ok { + return cached, nil + } + } + + return s.refresh(ctx, anime) +} + +func (s *EpisodeService) RefreshTrackedDue(ctx context.Context, limit int) error { + if !s.enabled { + return nil + } + if limit <= 0 { + limit = 25 + } + + ids, err := s.queries.GetTrackedAiringAnimeIDsDueForEpisodeRefresh(ctx, int64(limit)) + if err != nil { + return fmt.Errorf("get due tracked anime: %w", err) + } + + for _, id := range ids { + anime, err := s.jikan.GetAnimeByID(ctx, int(id)) + if err != nil { + log.Printf("episodes: failed to fetch anime for refresh anime_id=%d error=%v", id, err) + continue + } + if _, err := s.refresh(ctx, anime); err != nil { + log.Printf("episodes: refresh failed anime_id=%d error=%v", id, err) + } + } + + return nil +} + +func (s *EpisodeService) refresh(ctx context.Context, anime domain.Anime) (domain.CanonicalEpisodeList, error) { + now := s.clock.Now() + log.Printf("episodes: refresh start anime_id=%d title=%q airing=%t", anime.MalID, anime.DisplayTitle(), anime.Airing) + + jikanEpisodes, jikanErr := s.jikan.GetAllEpisodes(ctx, anime.MalID) + if jikanErr != nil { + log.Printf("episodes: jikan episode metadata failed anime_id=%d error=%v", anime.MalID, jikanErr) + } + + providerAvailability, source, providerErr := s.fetchProviderAvailability(ctx, anime) + if providerErr != nil { + s.markFailure(ctx, anime, providerErr) + if cached, ok := s.getCached(ctx, anime.MalID); ok { + log.Printf("episodes: serving stale cache after provider failure anime_id=%d error=%v", anime.MalID, providerErr) + return cached, nil + } + if jikanErr == nil { + return s.store(ctx, anime, jikanEpisodes, domain.EpisodeAvailability{}, "jikan_fallback", now, false) + } + return domain.CanonicalEpisodeList{}, providerErr + } + + return s.store(ctx, anime, jikanEpisodes, providerAvailability, source, now, true) +} + +func (s *EpisodeService) fetchProviderAvailability(ctx context.Context, anime domain.Anime) (domain.EpisodeAvailability, string, error) { + titles := titleCandidates(anime) + for _, provider := range s.providers { + providerID, err := s.providerID(ctx, anime, provider, titles) + if err != nil { + log.Printf("episodes: provider id miss anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) + continue + } + + available, err := provider.GetEpisodeAvailabilityByProviderID(ctx, providerID) + if err != nil { + log.Printf("episodes: provider availability miss anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) + continue + } + log.Printf("episodes: provider availability hit anime_id=%d provider=%s sub=%d dub=%d", anime.MalID, provider.Name(), len(available.Sub), len(available.Dub)) + return available, provider.Name(), nil + } + return domain.EpisodeAvailability{}, "", fmt.Errorf("no episode availability provider matched anime_id=%d", anime.MalID) +} + +func (s *EpisodeService) providerID(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider, titles []string) (string, error) { + row, err := s.queries.GetEpisodeProviderMapping(ctx, db.GetEpisodeProviderMappingParams{ + AnimeID: int64(anime.MalID), + Provider: provider.Name(), + }) + if err == nil { + if row.FailedUntil.Valid && row.FailedUntil.Time.After(s.clock.Now()) { + return "", fmt.Errorf("cached provider mapping failure active until %s: %s", row.FailedUntil.Time.Format(time.RFC3339), row.LastError) + } + if strings.TrimSpace(row.ProviderShowID) != "" { + log.Printf("episodes: provider id cache hit anime_id=%d provider=%s provider_id=%s", anime.MalID, provider.Name(), row.ProviderShowID) + return row.ProviderShowID, nil + } + } else if !errors.Is(err, sql.ErrNoRows) { + log.Printf("episodes: provider id cache read failed anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) + } + + providerID, err := provider.ResolveEpisodeProviderID(ctx, anime.MalID, titles) + if err != nil { + _ = s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{ + AnimeID: int64(anime.MalID), + Provider: provider.Name(), + ProviderShowID: "", + FailedUntil: sql.NullTime{Time: s.clock.Now().Add(time.Hour), Valid: true}, + LastError: truncate(err.Error(), 400), + }) + return "", err + } + + err = s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{ + AnimeID: int64(anime.MalID), + Provider: provider.Name(), + ProviderShowID: providerID, + FailedUntil: sql.NullTime{}, + LastError: "", + }) + if err != nil { + log.Printf("episodes: provider id cache write failed anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) + } + log.Printf("episodes: provider id resolved anime_id=%d provider=%s provider_id=%s", anime.MalID, provider.Name(), providerID) + return providerID, nil +} + +func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability, source string, now time.Time, providerSuccess bool) (domain.CanonicalEpisodeList, error) { + nextRefresh := nextBroadcastAfter(anime, now) + var nextRefreshSQL sql.NullTime + if anime.Airing && !nextRefresh.IsZero() { + nextRefreshSQL = sql.NullTime{Time: nextRefresh, Valid: true} + } + + episodes := mergeEpisodes(jikanEpisodes, availability) + payload := domain.CanonicalEpisodeList{ + AnimeID: anime.MalID, + Episodes: episodes, + Source: source, + } + if nextRefreshSQL.Valid { + payload.NextRefreshAt = nextRefreshSQL.Time.Format(time.RFC3339) + } + + body, err := json.Marshal(payload) + if err != nil { + return domain.CanonicalEpisodeList{}, err + } + + var retryUntil sql.NullTime + if anime.Airing && providerSuccess { + retryUntil = sql.NullTime{Time: nextRefreshSQL.Time.Add(retryWindow), Valid: nextRefreshSQL.Valid} + } + + err = s.queries.UpsertEpisodeAvailabilityCache(ctx, db.UpsertEpisodeAvailabilityCacheParams{ + AnimeID: int64(anime.MalID), + Data: string(body), + NextRefreshAt: nextRefreshSQL, + RetryUntilAt: retryUntil, + LastAttemptAt: sql.NullTime{Time: now, Valid: true}, + LastSuccessAt: sql.NullTime{Time: now, Valid: providerSuccess}, + FailureCount: 0, + LastError: "", + }) + if err != nil { + log.Printf("episodes: cache write failed anime_id=%d source=%s error=%v", anime.MalID, source, err) + return payload, nil + } + + log.Printf("episodes: refresh success anime_id=%d source=%s episodes=%d next_refresh=%s", anime.MalID, source, len(episodes), payload.NextRefreshAt) + return payload, nil +} + +func (s *EpisodeService) markFailure(ctx context.Context, anime domain.Anime, cause error) { + now := s.clock.Now() + next := nextRetryTime(anime, now) + var retryUntil sql.NullTime + nextBroadcast := nextBroadcastBeforeOrAt(anime, now) + if !nextBroadcast.IsZero() { + retryUntil = sql.NullTime{Time: nextBroadcast.Add(retryWindow), Valid: true} + } + + var nextSQL sql.NullTime + if !next.IsZero() { + nextSQL = sql.NullTime{Time: next, Valid: true} + } + + err := s.queries.MarkEpisodeAvailabilityRefreshFailed(ctx, db.MarkEpisodeAvailabilityRefreshFailedParams{ + LastAttemptAt: sql.NullTime{Time: now, Valid: true}, + LastError: truncate(cause.Error(), 400), + NextRefreshAt: nextSQL, + RetryUntilAt: retryUntil, + AnimeID: int64(anime.MalID), + }) + if err != nil { + log.Printf("episodes: failed to mark refresh failure anime_id=%d error=%v", anime.MalID, err) + return + } + log.Printf("episodes: refresh failure recorded anime_id=%d next_retry=%s error=%v", anime.MalID, next.Format(time.RFC3339), cause) +} + +func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) { + row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID)) + if err != nil { + return domain.CanonicalEpisodeList{}, false + } + var payload domain.CanonicalEpisodeList + if err := json.Unmarshal([]byte(row.Data), &payload); err != nil { + log.Printf("episodes: invalid cached payload anime_id=%d error=%v", animeID, err) + return domain.CanonicalEpisodeList{}, false + } + return payload, true +} + +func (s *EpisodeService) getFreshCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) { + row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID)) + if err != nil { + return domain.CanonicalEpisodeList{}, false + } + if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(s.clock.Now()) { + log.Printf("episodes: cached availability due for refresh anime_id=%d next_refresh=%s", animeID, row.NextRefreshAt.Time.Format(time.RFC3339)) + return domain.CanonicalEpisodeList{}, false + } + + var payload domain.CanonicalEpisodeList + if err := json.Unmarshal([]byte(row.Data), &payload); err != nil { + log.Printf("episodes: invalid cached payload anime_id=%d error=%v", animeID, err) + return domain.CanonicalEpisodeList{}, false + } + log.Printf("episodes: served cached availability anime_id=%d episodes=%d next_refresh=%s", animeID, len(payload.Episodes), payload.NextRefreshAt) + return payload, true +} + +func (s *EpisodeService) jikanOnly(ctx context.Context, anime domain.Anime, source string) (domain.CanonicalEpisodeList, error) { + episodes, err := s.jikan.GetAllEpisodes(ctx, anime.MalID) + if err != nil { + return domain.CanonicalEpisodeList{}, err + } + return domain.CanonicalEpisodeList{ + AnimeID: anime.MalID, + Episodes: mergeEpisodes(episodes, domain.EpisodeAvailability{}), + Source: source, + }, nil +} + +func titleCandidates(anime domain.Anime) []string { + out := []string{anime.Title} + if anime.TitleEnglish != "" && anime.TitleEnglish != anime.Title { + out = append(out, anime.TitleEnglish) + } + if anime.TitleJapanese != "" { + out = append(out, anime.TitleJapanese) + } + for _, syn := range anime.TitleSynonyms { + if syn != "" && syn != anime.Title && syn != anime.TitleEnglish && syn != anime.TitleJapanese { + out = append(out, syn) + } + } + return out +} + +func mergeEpisodes(jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability) []domain.CanonicalEpisode { + type partial struct { + title string + filler bool + recap bool + sub bool + dub bool + } + byNumber := map[int]partial{} + + for _, ep := range jikanEpisodes { + if ep.MalID <= 0 { + continue + } + item := byNumber[ep.MalID] + item.title = strings.TrimSpace(ep.Title) + item.filler = ep.Filler + item.recap = ep.Recap + byNumber[ep.MalID] = item + } + for _, n := range availability.Sub { + if n <= 0 { + continue + } + item := byNumber[n] + item.sub = true + byNumber[n] = item + } + for _, n := range availability.Dub { + if n <= 0 { + continue + } + item := byNumber[n] + item.dub = true + byNumber[n] = item + } + + numbers := make([]int, 0, len(byNumber)) + for number := range byNumber { + numbers = append(numbers, number) + } + sort.Ints(numbers) + + episodes := make([]domain.CanonicalEpisode, 0, len(numbers)) + for _, number := range numbers { + item := byNumber[number] + title := item.title + if title == "" { + title = fmt.Sprintf("Episode %d", number) + } + episodes = append(episodes, domain.CanonicalEpisode{ + Number: number, + Title: title, + HasSub: item.sub, + HasDub: item.dub, + SubOnly: item.sub && !item.dub, + Filler: item.filler, + Recap: item.recap, + }) + } + return episodes +} + +func nextRetryTime(anime domain.Anime, now time.Time) time.Time { + broadcast := nextBroadcastBeforeOrAt(anime, now) + if broadcast.IsZero() || now.After(broadcast.Add(retryWindow)) { + return nextBroadcastAfter(anime, now) + } + return now.Add(retryInterval) +} + +func nextBroadcastBeforeOrAt(anime domain.Anime, now time.Time) time.Time { + next := nextBroadcastAfter(anime, now.AddDate(0, 0, -7)) + if next.IsZero() || next.After(now) { + return time.Time{} + } + return next +} + +func nextBroadcastAfter(anime domain.Anime, after time.Time) time.Time { + day := weekdayFromJikan(anime.Broadcast.Day) + if day < 0 || strings.TrimSpace(anime.Broadcast.Time) == "" { + return time.Time{} + } + + loc := time.UTC + if tz := strings.TrimSpace(anime.Broadcast.Timezone); tz != "" { + if loaded, err := time.LoadLocation(tz); err == nil { + loc = loaded + } else { + log.Printf("episodes: failed to parse broadcast timezone anime_id=%d timezone=%q error=%v", anime.MalID, tz, err) + } + } + + hour, minute, ok := parseBroadcastTime(anime.Broadcast.Time) + if !ok { + log.Printf("episodes: failed to parse broadcast time anime_id=%d time=%q", anime.MalID, anime.Broadcast.Time) + return time.Time{} + } + + localAfter := after.In(loc) + daysAhead := (int(day) - int(localAfter.Weekday()) + 7) % 7 + candidate := time.Date(localAfter.Year(), localAfter.Month(), localAfter.Day()+daysAhead, hour, minute, 0, 0, loc) + if !candidate.After(localAfter) { + candidate = candidate.AddDate(0, 0, 7) + } + return candidate.UTC() +} + +func weekdayFromJikan(day string) time.Weekday { + switch strings.ToLower(strings.TrimSpace(day)) { + case "sundays": + return time.Sunday + case "mondays": + return time.Monday + case "tuesdays": + return time.Tuesday + case "wednesdays": + return time.Wednesday + case "thursdays": + return time.Thursday + case "fridays": + return time.Friday + case "saturdays": + return time.Saturday + default: + return -1 + } +} + +func parseBroadcastTime(value string) (int, int, bool) { + t, err := time.Parse("15:04", strings.TrimSpace(value)) + if err != nil { + return 0, 0, false + } + return t.Hour(), t.Minute(), true +} + +func truncate(value string, maxLen int) string { + if len(value) <= maxLen { + return value + } + return value[:maxLen] +} diff --git a/internal/episodes/worker.go b/internal/episodes/worker.go new file mode 100644 index 0000000..a080556 --- /dev/null +++ b/internal/episodes/worker.go @@ -0,0 +1,44 @@ +package episodes + +import ( + "context" + "log" + "mal/internal/domain" + "time" + + "go.uber.org/fx" +) + +const workerInterval = time.Minute + +func RegisterWorker(lc fx.Lifecycle, svc domain.EpisodeService) { + ctx, cancel := context.WithCancel(context.Background()) + + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + go func() { + log.Println("episodes: availability worker started") + ticker := time.NewTicker(workerInterval) + defer ticker.Stop() + + for { + if err := svc.RefreshTrackedDue(ctx, 25); err != nil { + log.Printf("episodes: availability worker tick failed error=%v", err) + } + + select { + case <-ticker.C: + case <-ctx.Done(): + log.Println("episodes: availability worker stopped") + return + } + } + }() + return nil + }, + OnStop: func(context.Context) error { + cancel() + return nil + }, + }) +}