diff --git a/internal/episodes/service/cache_store.go b/internal/episodes/service/cache_store.go new file mode 100644 index 0000000..816e68a --- /dev/null +++ b/internal/episodes/service/cache_store.go @@ -0,0 +1,250 @@ +package service + +import ( + "context" + "database/sql" + "encoding/json" + "time" + + "mal/integrations/jikan" + "mal/internal/db" + "mal/internal/domain" + "mal/internal/observability" +) + +func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability, source string, now time.Time, providerSuccess bool) (domain.CanonicalEpisodeList, error) { + nextRefreshSQL := nextRefreshAt(anime, now) + episodes := mergeEpisodes(jikanEpisodes, availability, anime.Episodes) + payload := domain.CanonicalEpisodeList{ + AnimeID: anime.MalID, + Episodes: episodes, + Source: source, + } + if nextRefreshSQL.Valid { + payload.NextRefreshAt = nextRefreshSQL.Time.Format(time.RFC3339) + } + + body, err := json.Marshal(payload) + if err != nil { + return domain.CanonicalEpisodeList{}, err + } + + if !s.writeEpisodeAvailabilityCache(ctx, anime, source, body, now, providerSuccess, nextRefreshSQL) { + return payload, nil + } + + observability.Info( + "episodes_refresh_success", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "source": source, + "episodes": len(episodes), + "next_refresh": payload.NextRefreshAt, + }, + ) + return payload, nil +} + +func (s *EpisodeService) writeEpisodeAvailabilityCache(ctx context.Context, anime domain.Anime, source string, body []byte, now time.Time, providerSuccess bool, nextRefreshSQL sql.NullTime) bool { + var retryUntil sql.NullTime + if anime.Airing && providerSuccess { + retryUntil = sql.NullTime{Time: nextRefreshSQL.Time.Add(retryWindow), Valid: nextRefreshSQL.Valid} + } + + err := s.queries.UpsertEpisodeAvailabilityCache(ctx, db.UpsertEpisodeAvailabilityCacheParams{ + AnimeID: int64(anime.MalID), + Data: string(body), + NextRefreshAt: nextRefreshSQL, + RetryUntilAt: retryUntil, + LastAttemptAt: sql.NullTime{Time: now, Valid: true}, + LastSuccessAt: sql.NullTime{Time: now, Valid: providerSuccess}, + FailureCount: 0, + LastError: "", + }) + if err == nil { + return true + } + + observability.Warn( + "episodes_cache_write_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "source": source, + }, + err, + ) + return false +} + +func (s *EpisodeService) markFailure(ctx context.Context, anime domain.Anime, cause error) { + now := s.clock.Now() + next := nextRetryTime(anime, now) + var retryUntil sql.NullTime + nextBroadcast := nextBroadcastBeforeOrAt(anime, now) + if !nextBroadcast.IsZero() { + retryUntil = sql.NullTime{Time: nextBroadcast.Add(retryWindow), Valid: true} + } + + var nextSQL sql.NullTime + if !next.IsZero() { + nextSQL = sql.NullTime{Time: next, Valid: true} + } + + writeCtx := ctx + if ctx.Err() != nil { + var cancel context.CancelFunc + writeCtx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + } + err := s.queries.MarkEpisodeAvailabilityRefreshFailed(writeCtx, db.MarkEpisodeAvailabilityRefreshFailedParams{ + LastAttemptAt: sql.NullTime{Time: now, Valid: true}, + LastError: truncate(cause.Error(), 400), + NextRefreshAt: nextSQL, + RetryUntilAt: retryUntil, + AnimeID: int64(anime.MalID), + }) + if err != nil { + observability.Warn( + "episodes_mark_failure_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + }, + err, + ) + return + } + observability.Warn( + "episodes_refresh_failure_recorded", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "next_retry": next.Format(time.RFC3339), + }, + cause, + ) +} + +func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) { + row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID)) + if err != nil { + s.metrics.ObserveCache("episode_availability", "miss") + return domain.CanonicalEpisodeList{}, false + } + var payload domain.CanonicalEpisodeList + if err := json.Unmarshal([]byte(row.Data), &payload); err != nil { + s.metrics.ObserveCache("episode_availability", "miss") + observability.Warn( + "episodes_cached_payload_invalid", + "episodes", + "", + map[string]any{ + "anime_id": animeID, + }, + err, + ) + return domain.CanonicalEpisodeList{}, false + } + s.metrics.ObserveCache("episode_availability", "hit") + return payload, true +} + +func (s *EpisodeService) getFreshCached(ctx context.Context, anime domain.Anime) (domain.CanonicalEpisodeList, bool) { + row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(anime.MalID)) + if err != nil { + s.metrics.ObserveCache("episode_availability_fresh", "miss") + return domain.CanonicalEpisodeList{}, false + } + + now := s.clock.Now() + if !s.isFreshEpisodeCache(anime, row, now) { + return domain.CanonicalEpisodeList{}, false + } + + payload, ok := s.decodeFreshCachedPayload(anime, row.Data) + if !ok { + return domain.CanonicalEpisodeList{}, false + } + if !isCanonicalEpisodePayloadValid(payload, anime.Episodes) { + s.metrics.ObserveCache("episode_availability_fresh", "miss") + observability.Info( + "episodes_cached_payload_rejected", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "expected_count": anime.Episodes, + "cached_episodes": len(payload.Episodes), + }, + ) + return domain.CanonicalEpisodeList{}, false + } + s.metrics.ObserveCache("episode_availability_fresh", "hit") + observability.Info( + "episodes_cache_served", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "episodes": len(payload.Episodes), + "next_refresh": payload.NextRefreshAt, + }, + ) + return payload, true +} + +func (s *EpisodeService) isFreshEpisodeCache(anime domain.Anime, row db.EpisodeAvailabilityCache, now time.Time) bool { + if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(now) { + s.metrics.ObserveCache("episode_availability_fresh", "miss") + observability.Info( + "episodes_cache_due_for_refresh", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "next_refresh": row.NextRefreshAt.Time.Format(time.RFC3339), + }, + ) + return false + } + if anime.Airing && row.UpdatedAt.Before(now.Add(-airingFallbackRefreshInterval)) { + s.metrics.ObserveCache("episode_availability_fresh", "miss") + observability.Info( + "episodes_cache_too_old_for_airing", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "updated_at": row.UpdatedAt.Format(time.RFC3339), + }, + ) + return false + } + return true +} + +func (s *EpisodeService) decodeFreshCachedPayload(anime domain.Anime, raw string) (domain.CanonicalEpisodeList, bool) { + var payload domain.CanonicalEpisodeList + err := json.Unmarshal([]byte(raw), &payload) + if err == nil { + return payload, true + } + + s.metrics.ObserveCache("episode_availability_fresh", "miss") + observability.Warn( + "episodes_cached_payload_invalid", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + }, + err, + ) + return domain.CanonicalEpisodeList{}, false +} diff --git a/internal/episodes/service/service.go b/internal/episodes/service/service.go index 0e49620..ac2e9fc 100644 --- a/internal/episodes/service/service.go +++ b/internal/episodes/service/service.go @@ -3,24 +3,13 @@ package service import ( "context" - "database/sql" - "encoding/json" - "errors" "fmt" + "time" + "mal/integrations/jikan" "mal/internal/db" "mal/internal/domain" "mal/internal/observability" - "sort" - "strconv" - "strings" - "time" -) - -const ( - retryInterval = 15 * time.Minute - retryWindow = 3 * time.Hour - airingFallbackRefreshInterval = 6 * time.Hour ) type Clock interface { @@ -229,386 +218,6 @@ func (s *EpisodeService) fetchProviderAvailability(ctx context.Context, anime do return domain.EpisodeAvailability{}, "", fmt.Errorf("no episode availability provider matched anime_id=%d", anime.MalID) } -func (s *EpisodeService) providerID(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider, titles []string) (string, error) { - providerID, found, err := s.cachedProviderID(ctx, anime, provider) - if found || err != nil { - return providerID, err - } - - providerID, err = provider.ResolveEpisodeProviderID(ctx, anime.MalID, titles) - if err != nil { - s.cacheProviderIDFailure(ctx, anime, provider, err) - return "", err - } - - s.cacheProviderIDSuccess(ctx, anime, provider, providerID) - observability.Info( - "episodes_provider_id_resolved", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "provider": provider.Name(), - "provider_id": providerID, - }, - ) - return providerID, nil -} - -func (s *EpisodeService) cachedProviderID(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider) (string, bool, error) { - row, err := s.queries.GetEpisodeProviderMapping(ctx, db.GetEpisodeProviderMappingParams{ - AnimeID: int64(anime.MalID), - Provider: provider.Name(), - }) - if err != nil { - s.metrics.ObserveCache("episode_provider_mapping", "miss") - if errors.Is(err, sql.ErrNoRows) { - return "", false, nil - } - observability.Warn( - "episodes_provider_id_cache_read_failed", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "provider": provider.Name(), - }, - err, - ) - return "", false, nil - } - - if row.FailedUntil.Valid && row.FailedUntil.Time.After(s.clock.Now()) { - s.metrics.ObserveCache("episode_provider_mapping", "hit") - return "", true, fmt.Errorf("cached provider mapping failure active until %s: %s", row.FailedUntil.Time.Format(time.RFC3339), row.LastError) - } - if strings.TrimSpace(row.ProviderShowID) == "" { - s.metrics.ObserveCache("episode_provider_mapping", "miss") - return "", false, nil - } - - s.metrics.ObserveCache("episode_provider_mapping", "hit") - observability.Info( - "episodes_provider_id_cache_hit", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "provider": provider.Name(), - "provider_id": row.ProviderShowID, - }, - ) - return row.ProviderShowID, true, nil -} - -func (s *EpisodeService) cacheProviderIDFailure(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider, resolveErr error) { - _ = s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{ - AnimeID: int64(anime.MalID), - Provider: provider.Name(), - ProviderShowID: "", - FailedUntil: sql.NullTime{Time: s.clock.Now().Add(time.Hour), Valid: true}, - LastError: truncate(resolveErr.Error(), 400), - }) -} - -func (s *EpisodeService) cacheProviderIDSuccess(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider, providerID string) { - err := s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{ - AnimeID: int64(anime.MalID), - Provider: provider.Name(), - ProviderShowID: providerID, - FailedUntil: sql.NullTime{}, - LastError: "", - }) - if err == nil { - return - } - - observability.Warn( - "episodes_provider_id_cache_write_failed", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "provider": provider.Name(), - }, - err, - ) -} - -func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability, source string, now time.Time, providerSuccess bool) (domain.CanonicalEpisodeList, error) { - nextRefreshSQL := nextRefreshAt(anime, now) - episodes := mergeEpisodes(jikanEpisodes, availability, anime.Episodes) - payload := domain.CanonicalEpisodeList{ - AnimeID: anime.MalID, - Episodes: episodes, - Source: source, - } - if nextRefreshSQL.Valid { - payload.NextRefreshAt = nextRefreshSQL.Time.Format(time.RFC3339) - } - - body, err := json.Marshal(payload) - if err != nil { - return domain.CanonicalEpisodeList{}, err - } - - if !s.writeEpisodeAvailabilityCache(ctx, anime, source, body, now, providerSuccess, nextRefreshSQL) { - return payload, nil - } - - observability.Info( - "episodes_refresh_success", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "source": source, - "episodes": len(episodes), - "next_refresh": payload.NextRefreshAt, - }, - ) - return payload, nil -} - -func nextRefreshAt(anime domain.Anime, now time.Time) sql.NullTime { - if !anime.Airing { - return sql.NullTime{} - } - - // During the hours immediately following a broadcast time, providers can lag. - // Keep retrying for a short window, even if the provider request succeeded. - lastBroadcast := nextBroadcastBeforeOrAt(anime, now) - if !lastBroadcast.IsZero() && now.Before(lastBroadcast.Add(retryWindow)) { - return sql.NullTime{Time: now.Add(retryInterval).UTC(), Valid: true} - } - - next := nextBroadcastAfter(anime, now) - if !next.IsZero() { - return sql.NullTime{Time: next, Valid: true} - } - - // Broadcast metadata is often missing or wrong for currently airing shows. - // Avoid "never refresh again" caches by falling back to a fixed interval. - return sql.NullTime{Time: now.Add(airingFallbackRefreshInterval).UTC(), Valid: true} -} - -func (s *EpisodeService) writeEpisodeAvailabilityCache(ctx context.Context, anime domain.Anime, source string, body []byte, now time.Time, providerSuccess bool, nextRefreshSQL sql.NullTime) bool { - var retryUntil sql.NullTime - if anime.Airing && providerSuccess { - retryUntil = sql.NullTime{Time: nextRefreshSQL.Time.Add(retryWindow), Valid: nextRefreshSQL.Valid} - } - - err := s.queries.UpsertEpisodeAvailabilityCache(ctx, db.UpsertEpisodeAvailabilityCacheParams{ - AnimeID: int64(anime.MalID), - Data: string(body), - NextRefreshAt: nextRefreshSQL, - RetryUntilAt: retryUntil, - LastAttemptAt: sql.NullTime{Time: now, Valid: true}, - LastSuccessAt: sql.NullTime{Time: now, Valid: providerSuccess}, - FailureCount: 0, - LastError: "", - }) - if err == nil { - return true - } - - observability.Warn( - "episodes_cache_write_failed", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "source": source, - }, - err, - ) - return false -} - -func (s *EpisodeService) markFailure(ctx context.Context, anime domain.Anime, cause error) { - now := s.clock.Now() - next := nextRetryTime(anime, now) - var retryUntil sql.NullTime - nextBroadcast := nextBroadcastBeforeOrAt(anime, now) - if !nextBroadcast.IsZero() { - retryUntil = sql.NullTime{Time: nextBroadcast.Add(retryWindow), Valid: true} - } - - var nextSQL sql.NullTime - if !next.IsZero() { - nextSQL = sql.NullTime{Time: next, Valid: true} - } - - writeCtx := ctx - if ctx.Err() != nil { - var cancel context.CancelFunc - writeCtx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - } - err := s.queries.MarkEpisodeAvailabilityRefreshFailed(writeCtx, db.MarkEpisodeAvailabilityRefreshFailedParams{ - LastAttemptAt: sql.NullTime{Time: now, Valid: true}, - LastError: truncate(cause.Error(), 400), - NextRefreshAt: nextSQL, - RetryUntilAt: retryUntil, - AnimeID: int64(anime.MalID), - }) - if err != nil { - observability.Warn( - "episodes_mark_failure_failed", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - }, - err, - ) - return - } - observability.Warn( - "episodes_refresh_failure_recorded", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "next_retry": next.Format(time.RFC3339), - }, - cause, - ) -} - -func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) { - row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID)) - if err != nil { - s.metrics.ObserveCache("episode_availability", "miss") - return domain.CanonicalEpisodeList{}, false - } - var payload domain.CanonicalEpisodeList - if err := json.Unmarshal([]byte(row.Data), &payload); err != nil { - s.metrics.ObserveCache("episode_availability", "miss") - observability.Warn( - "episodes_cached_payload_invalid", - "episodes", - "", - map[string]any{ - "anime_id": animeID, - }, - err, - ) - return domain.CanonicalEpisodeList{}, false - } - s.metrics.ObserveCache("episode_availability", "hit") - return payload, true -} - -func (s *EpisodeService) getFreshCached(ctx context.Context, anime domain.Anime) (domain.CanonicalEpisodeList, bool) { - row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(anime.MalID)) - if err != nil { - s.metrics.ObserveCache("episode_availability_fresh", "miss") - return domain.CanonicalEpisodeList{}, false - } - - now := s.clock.Now() - if !s.isFreshEpisodeCache(anime, row, now) { - return domain.CanonicalEpisodeList{}, false - } - - payload, ok := s.decodeFreshCachedPayload(anime, row.Data) - if !ok { - return domain.CanonicalEpisodeList{}, false - } - if !isCanonicalEpisodePayloadValid(payload, anime.Episodes) { - s.metrics.ObserveCache("episode_availability_fresh", "miss") - observability.Info( - "episodes_cached_payload_rejected", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "expected_count": anime.Episodes, - "cached_episodes": len(payload.Episodes), - }, - ) - return domain.CanonicalEpisodeList{}, false - } - s.metrics.ObserveCache("episode_availability_fresh", "hit") - observability.Info( - "episodes_cache_served", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "episodes": len(payload.Episodes), - "next_refresh": payload.NextRefreshAt, - }, - ) - return payload, true -} - -func (s *EpisodeService) isFreshEpisodeCache(anime domain.Anime, row db.EpisodeAvailabilityCache, now time.Time) bool { - if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(now) { - s.metrics.ObserveCache("episode_availability_fresh", "miss") - observability.Info( - "episodes_cache_due_for_refresh", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "next_refresh": row.NextRefreshAt.Time.Format(time.RFC3339), - }, - ) - return false - } - if anime.Airing && row.UpdatedAt.Before(now.Add(-airingFallbackRefreshInterval)) { - s.metrics.ObserveCache("episode_availability_fresh", "miss") - observability.Info( - "episodes_cache_too_old_for_airing", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "updated_at": row.UpdatedAt.Format(time.RFC3339), - }, - ) - return false - } - return true -} - -func (s *EpisodeService) decodeFreshCachedPayload(anime domain.Anime, raw string) (domain.CanonicalEpisodeList, bool) { - var payload domain.CanonicalEpisodeList - err := json.Unmarshal([]byte(raw), &payload) - if err == nil { - return payload, true - } - - s.metrics.ObserveCache("episode_availability_fresh", "miss") - observability.Warn( - "episodes_cached_payload_invalid", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - }, - err, - ) - return domain.CanonicalEpisodeList{}, false -} - -func isCanonicalEpisodePayloadValid(payload domain.CanonicalEpisodeList, expectedCount int) bool { - if expectedCount <= 0 { - return true - } - if len(payload.Episodes) > expectedCount { - return false - } - for _, episode := range payload.Episodes { - if episode.Number <= 0 || episode.Number > expectedCount { - return false - } - } - return true -} - func (s *EpisodeService) jikanOnly(ctx context.Context, anime domain.Anime, source string) (domain.CanonicalEpisodeList, error) { episodes, err := s.jikan.GetAllEpisodes(ctx, anime.MalID) if err != nil { @@ -620,203 +229,3 @@ func (s *EpisodeService) jikanOnly(ctx context.Context, anime domain.Anime, sour Source: source, }, nil } - -func titleCandidates(anime domain.Anime) []string { - out := []string{anime.Title} - if anime.TitleEnglish != "" && anime.TitleEnglish != anime.Title { - out = append(out, anime.TitleEnglish) - } - if anime.TitleJapanese != "" { - out = append(out, anime.TitleJapanese) - } - for _, syn := range anime.TitleSynonyms { - if syn != "" && syn != anime.Title && syn != anime.TitleEnglish && syn != anime.TitleJapanese { - out = append(out, syn) - } - } - return out -} - -type episodePartial struct { - title string - filler bool - recap bool - sub bool - dub bool -} - -func mergeEpisodes(jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability, expectedCount int) []domain.CanonicalEpisode { - byNumber := map[int]episodePartial{} - - for i, ep := range jikanEpisodes { - if exceedsExpectedCount(i+1, expectedCount) { - break - } - number, ok := jikanEpisodeNumber(ep, i) - if !ok || exceedsExpectedCount(number, expectedCount) { - continue - } - mergeEpisode(&byNumber, number, func(item *episodePartial) { - item.title = strings.TrimSpace(ep.Title) - item.filler = ep.Filler - item.recap = ep.Recap - }) - } - mergeAvailability(&byNumber, availability.Sub, expectedCount, func(item *episodePartial) { item.sub = true }) - mergeAvailability(&byNumber, availability.Dub, expectedCount, func(item *episodePartial) { item.dub = true }) - - numbers := make([]int, 0, len(byNumber)) - for number := range byNumber { - numbers = append(numbers, number) - } - sort.Ints(numbers) - - episodes := make([]domain.CanonicalEpisode, 0, len(numbers)) - for _, number := range numbers { - item := byNumber[number] - title := item.title - if title == "" { - title = fmt.Sprintf("Episode %d", number) - } - episodes = append(episodes, domain.CanonicalEpisode{ - Number: number, - Title: title, - HasSub: item.sub, - HasDub: item.dub, - SubOnly: item.sub && !item.dub, - Filler: item.filler, - Recap: item.recap, - }) - } - return episodes -} - -func mergeEpisode(byNumber *map[int]episodePartial, number int, update func(*episodePartial)) { - item := (*byNumber)[number] - update(&item) - (*byNumber)[number] = item -} - -func mergeAvailability(byNumber *map[int]episodePartial, numbers []int, expectedCount int, update func(*episodePartial)) { - for _, number := range numbers { - if number <= 0 || exceedsExpectedCount(number, expectedCount) { - continue - } - mergeEpisode(byNumber, number, update) - } -} - -func jikanEpisodeNumber(ep jikan.Episode, index int) (int, bool) { - number, err := strconv.Atoi(strings.TrimSpace(ep.Episode)) - if err == nil && number > 0 { - return number, true - } - if index < 0 { - return 0, false - } - return index + 1, true -} - -func exceedsExpectedCount(number int, expectedCount int) bool { - return expectedCount > 0 && number > expectedCount -} - -func nextRetryTime(anime domain.Anime, now time.Time) time.Time { - broadcast := nextBroadcastBeforeOrAt(anime, now) - if broadcast.IsZero() || now.After(broadcast.Add(retryWindow)) { - return nextBroadcastAfter(anime, now) - } - return now.Add(retryInterval) -} - -func nextBroadcastBeforeOrAt(anime domain.Anime, now time.Time) time.Time { - next := nextBroadcastAfter(anime, now.AddDate(0, 0, -7)) - if next.IsZero() || next.After(now) { - return time.Time{} - } - return next -} - -func nextBroadcastAfter(anime domain.Anime, after time.Time) time.Time { - day := weekdayFromJikan(anime.Broadcast.Day) - if day < 0 || strings.TrimSpace(anime.Broadcast.Time) == "" { - return time.Time{} - } - - loc := time.UTC - if tz := strings.TrimSpace(anime.Broadcast.Timezone); tz != "" { - if loaded, err := time.LoadLocation(tz); err == nil { - loc = loaded - } else { - observability.Warn( - "episodes_broadcast_timezone_parse_failed", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "timezone": tz, - }, - err, - ) - } - } - - hour, minute, ok := parseBroadcastTime(anime.Broadcast.Time) - if !ok { - observability.Warn( - "episodes_broadcast_time_parse_failed", - "episodes", - "", - map[string]any{ - "anime_id": anime.MalID, - "time": anime.Broadcast.Time, - }, - nil, - ) - return time.Time{} - } - - localAfter := after.In(loc) - daysAhead := (int(day) - int(localAfter.Weekday()) + 7) % 7 - candidate := time.Date(localAfter.Year(), localAfter.Month(), localAfter.Day()+daysAhead, hour, minute, 0, 0, loc) - if !candidate.After(localAfter) { - candidate = candidate.AddDate(0, 0, 7) - } - return candidate.UTC() -} - -func weekdayFromJikan(day string) time.Weekday { - switch strings.ToLower(strings.TrimSpace(day)) { - case "sundays": - return time.Sunday - case "mondays": - return time.Monday - case "tuesdays": - return time.Tuesday - case "wednesdays": - return time.Wednesday - case "thursdays": - return time.Thursday - case "fridays": - return time.Friday - case "saturdays": - return time.Saturday - default: - return -1 - } -} - -func parseBroadcastTime(value string) (int, int, bool) { - t, err := time.Parse("15:04", strings.TrimSpace(value)) - if err != nil { - return 0, 0, false - } - return t.Hour(), t.Minute(), true -} - -func truncate(value string, maxLen int) string { - if len(value) <= maxLen { - return value - } - return value[:maxLen] -}