extract: add cache store, trim to orchestration
This commit is contained in:
250
internal/episodes/service/cache_store.go
Normal file
250
internal/episodes/service/cache_store.go
Normal file
@@ -0,0 +1,250 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"mal/integrations/jikan"
|
||||
"mal/internal/db"
|
||||
"mal/internal/domain"
|
||||
"mal/internal/observability"
|
||||
)
|
||||
|
||||
func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability, source string, now time.Time, providerSuccess bool) (domain.CanonicalEpisodeList, error) {
|
||||
nextRefreshSQL := nextRefreshAt(anime, now)
|
||||
episodes := mergeEpisodes(jikanEpisodes, availability, anime.Episodes)
|
||||
payload := domain.CanonicalEpisodeList{
|
||||
AnimeID: anime.MalID,
|
||||
Episodes: episodes,
|
||||
Source: source,
|
||||
}
|
||||
if nextRefreshSQL.Valid {
|
||||
payload.NextRefreshAt = nextRefreshSQL.Time.Format(time.RFC3339)
|
||||
}
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return domain.CanonicalEpisodeList{}, err
|
||||
}
|
||||
|
||||
if !s.writeEpisodeAvailabilityCache(ctx, anime, source, body, now, providerSuccess, nextRefreshSQL) {
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
observability.Info(
|
||||
"episodes_refresh_success",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"source": source,
|
||||
"episodes": len(episodes),
|
||||
"next_refresh": payload.NextRefreshAt,
|
||||
},
|
||||
)
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
func (s *EpisodeService) writeEpisodeAvailabilityCache(ctx context.Context, anime domain.Anime, source string, body []byte, now time.Time, providerSuccess bool, nextRefreshSQL sql.NullTime) bool {
|
||||
var retryUntil sql.NullTime
|
||||
if anime.Airing && providerSuccess {
|
||||
retryUntil = sql.NullTime{Time: nextRefreshSQL.Time.Add(retryWindow), Valid: nextRefreshSQL.Valid}
|
||||
}
|
||||
|
||||
err := s.queries.UpsertEpisodeAvailabilityCache(ctx, db.UpsertEpisodeAvailabilityCacheParams{
|
||||
AnimeID: int64(anime.MalID),
|
||||
Data: string(body),
|
||||
NextRefreshAt: nextRefreshSQL,
|
||||
RetryUntilAt: retryUntil,
|
||||
LastAttemptAt: sql.NullTime{Time: now, Valid: true},
|
||||
LastSuccessAt: sql.NullTime{Time: now, Valid: providerSuccess},
|
||||
FailureCount: 0,
|
||||
LastError: "",
|
||||
})
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
observability.Warn(
|
||||
"episodes_cache_write_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"source": source,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *EpisodeService) markFailure(ctx context.Context, anime domain.Anime, cause error) {
|
||||
now := s.clock.Now()
|
||||
next := nextRetryTime(anime, now)
|
||||
var retryUntil sql.NullTime
|
||||
nextBroadcast := nextBroadcastBeforeOrAt(anime, now)
|
||||
if !nextBroadcast.IsZero() {
|
||||
retryUntil = sql.NullTime{Time: nextBroadcast.Add(retryWindow), Valid: true}
|
||||
}
|
||||
|
||||
var nextSQL sql.NullTime
|
||||
if !next.IsZero() {
|
||||
nextSQL = sql.NullTime{Time: next, Valid: true}
|
||||
}
|
||||
|
||||
writeCtx := ctx
|
||||
if ctx.Err() != nil {
|
||||
var cancel context.CancelFunc
|
||||
writeCtx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
}
|
||||
err := s.queries.MarkEpisodeAvailabilityRefreshFailed(writeCtx, db.MarkEpisodeAvailabilityRefreshFailedParams{
|
||||
LastAttemptAt: sql.NullTime{Time: now, Valid: true},
|
||||
LastError: truncate(cause.Error(), 400),
|
||||
NextRefreshAt: nextSQL,
|
||||
RetryUntilAt: retryUntil,
|
||||
AnimeID: int64(anime.MalID),
|
||||
})
|
||||
if err != nil {
|
||||
observability.Warn(
|
||||
"episodes_mark_failure_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return
|
||||
}
|
||||
observability.Warn(
|
||||
"episodes_refresh_failure_recorded",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"next_retry": next.Format(time.RFC3339),
|
||||
},
|
||||
cause,
|
||||
)
|
||||
}
|
||||
|
||||
func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) {
|
||||
row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID))
|
||||
if err != nil {
|
||||
s.metrics.ObserveCache("episode_availability", "miss")
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
var payload domain.CanonicalEpisodeList
|
||||
if err := json.Unmarshal([]byte(row.Data), &payload); err != nil {
|
||||
s.metrics.ObserveCache("episode_availability", "miss")
|
||||
observability.Warn(
|
||||
"episodes_cached_payload_invalid",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": animeID,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
s.metrics.ObserveCache("episode_availability", "hit")
|
||||
return payload, true
|
||||
}
|
||||
|
||||
func (s *EpisodeService) getFreshCached(ctx context.Context, anime domain.Anime) (domain.CanonicalEpisodeList, bool) {
|
||||
row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(anime.MalID))
|
||||
if err != nil {
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
|
||||
now := s.clock.Now()
|
||||
if !s.isFreshEpisodeCache(anime, row, now) {
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
|
||||
payload, ok := s.decodeFreshCachedPayload(anime, row.Data)
|
||||
if !ok {
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
if !isCanonicalEpisodePayloadValid(payload, anime.Episodes) {
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
observability.Info(
|
||||
"episodes_cached_payload_rejected",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"expected_count": anime.Episodes,
|
||||
"cached_episodes": len(payload.Episodes),
|
||||
},
|
||||
)
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "hit")
|
||||
observability.Info(
|
||||
"episodes_cache_served",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"episodes": len(payload.Episodes),
|
||||
"next_refresh": payload.NextRefreshAt,
|
||||
},
|
||||
)
|
||||
return payload, true
|
||||
}
|
||||
|
||||
func (s *EpisodeService) isFreshEpisodeCache(anime domain.Anime, row db.EpisodeAvailabilityCache, now time.Time) bool {
|
||||
if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(now) {
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
observability.Info(
|
||||
"episodes_cache_due_for_refresh",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"next_refresh": row.NextRefreshAt.Time.Format(time.RFC3339),
|
||||
},
|
||||
)
|
||||
return false
|
||||
}
|
||||
if anime.Airing && row.UpdatedAt.Before(now.Add(-airingFallbackRefreshInterval)) {
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
observability.Info(
|
||||
"episodes_cache_too_old_for_airing",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"updated_at": row.UpdatedAt.Format(time.RFC3339),
|
||||
},
|
||||
)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *EpisodeService) decodeFreshCachedPayload(anime domain.Anime, raw string) (domain.CanonicalEpisodeList, bool) {
|
||||
var payload domain.CanonicalEpisodeList
|
||||
err := json.Unmarshal([]byte(raw), &payload)
|
||||
if err == nil {
|
||||
return payload, true
|
||||
}
|
||||
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
observability.Warn(
|
||||
"episodes_cached_payload_invalid",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
@@ -3,24 +3,13 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"mal/integrations/jikan"
|
||||
"mal/internal/db"
|
||||
"mal/internal/domain"
|
||||
"mal/internal/observability"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
retryInterval = 15 * time.Minute
|
||||
retryWindow = 3 * time.Hour
|
||||
airingFallbackRefreshInterval = 6 * time.Hour
|
||||
)
|
||||
|
||||
type Clock interface {
|
||||
@@ -229,386 +218,6 @@ func (s *EpisodeService) fetchProviderAvailability(ctx context.Context, anime do
|
||||
return domain.EpisodeAvailability{}, "", fmt.Errorf("no episode availability provider matched anime_id=%d", anime.MalID)
|
||||
}
|
||||
|
||||
func (s *EpisodeService) providerID(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider, titles []string) (string, error) {
|
||||
providerID, found, err := s.cachedProviderID(ctx, anime, provider)
|
||||
if found || err != nil {
|
||||
return providerID, err
|
||||
}
|
||||
|
||||
providerID, err = provider.ResolveEpisodeProviderID(ctx, anime.MalID, titles)
|
||||
if err != nil {
|
||||
s.cacheProviderIDFailure(ctx, anime, provider, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
s.cacheProviderIDSuccess(ctx, anime, provider, providerID)
|
||||
observability.Info(
|
||||
"episodes_provider_id_resolved",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"provider": provider.Name(),
|
||||
"provider_id": providerID,
|
||||
},
|
||||
)
|
||||
return providerID, nil
|
||||
}
|
||||
|
||||
func (s *EpisodeService) cachedProviderID(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider) (string, bool, error) {
|
||||
row, err := s.queries.GetEpisodeProviderMapping(ctx, db.GetEpisodeProviderMappingParams{
|
||||
AnimeID: int64(anime.MalID),
|
||||
Provider: provider.Name(),
|
||||
})
|
||||
if err != nil {
|
||||
s.metrics.ObserveCache("episode_provider_mapping", "miss")
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return "", false, nil
|
||||
}
|
||||
observability.Warn(
|
||||
"episodes_provider_id_cache_read_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"provider": provider.Name(),
|
||||
},
|
||||
err,
|
||||
)
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
if row.FailedUntil.Valid && row.FailedUntil.Time.After(s.clock.Now()) {
|
||||
s.metrics.ObserveCache("episode_provider_mapping", "hit")
|
||||
return "", true, fmt.Errorf("cached provider mapping failure active until %s: %s", row.FailedUntil.Time.Format(time.RFC3339), row.LastError)
|
||||
}
|
||||
if strings.TrimSpace(row.ProviderShowID) == "" {
|
||||
s.metrics.ObserveCache("episode_provider_mapping", "miss")
|
||||
return "", false, nil
|
||||
}
|
||||
|
||||
s.metrics.ObserveCache("episode_provider_mapping", "hit")
|
||||
observability.Info(
|
||||
"episodes_provider_id_cache_hit",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"provider": provider.Name(),
|
||||
"provider_id": row.ProviderShowID,
|
||||
},
|
||||
)
|
||||
return row.ProviderShowID, true, nil
|
||||
}
|
||||
|
||||
func (s *EpisodeService) cacheProviderIDFailure(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider, resolveErr error) {
|
||||
_ = s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{
|
||||
AnimeID: int64(anime.MalID),
|
||||
Provider: provider.Name(),
|
||||
ProviderShowID: "",
|
||||
FailedUntil: sql.NullTime{Time: s.clock.Now().Add(time.Hour), Valid: true},
|
||||
LastError: truncate(resolveErr.Error(), 400),
|
||||
})
|
||||
}
|
||||
|
||||
func (s *EpisodeService) cacheProviderIDSuccess(ctx context.Context, anime domain.Anime, provider domain.EpisodeAvailabilityProvider, providerID string) {
|
||||
err := s.queries.UpsertEpisodeProviderMapping(ctx, db.UpsertEpisodeProviderMappingParams{
|
||||
AnimeID: int64(anime.MalID),
|
||||
Provider: provider.Name(),
|
||||
ProviderShowID: providerID,
|
||||
FailedUntil: sql.NullTime{},
|
||||
LastError: "",
|
||||
})
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
observability.Warn(
|
||||
"episodes_provider_id_cache_write_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"provider": provider.Name(),
|
||||
},
|
||||
err,
|
||||
)
|
||||
}
|
||||
|
||||
func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability, source string, now time.Time, providerSuccess bool) (domain.CanonicalEpisodeList, error) {
|
||||
nextRefreshSQL := nextRefreshAt(anime, now)
|
||||
episodes := mergeEpisodes(jikanEpisodes, availability, anime.Episodes)
|
||||
payload := domain.CanonicalEpisodeList{
|
||||
AnimeID: anime.MalID,
|
||||
Episodes: episodes,
|
||||
Source: source,
|
||||
}
|
||||
if nextRefreshSQL.Valid {
|
||||
payload.NextRefreshAt = nextRefreshSQL.Time.Format(time.RFC3339)
|
||||
}
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return domain.CanonicalEpisodeList{}, err
|
||||
}
|
||||
|
||||
if !s.writeEpisodeAvailabilityCache(ctx, anime, source, body, now, providerSuccess, nextRefreshSQL) {
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
observability.Info(
|
||||
"episodes_refresh_success",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"source": source,
|
||||
"episodes": len(episodes),
|
||||
"next_refresh": payload.NextRefreshAt,
|
||||
},
|
||||
)
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
func nextRefreshAt(anime domain.Anime, now time.Time) sql.NullTime {
|
||||
if !anime.Airing {
|
||||
return sql.NullTime{}
|
||||
}
|
||||
|
||||
// During the hours immediately following a broadcast time, providers can lag.
|
||||
// Keep retrying for a short window, even if the provider request succeeded.
|
||||
lastBroadcast := nextBroadcastBeforeOrAt(anime, now)
|
||||
if !lastBroadcast.IsZero() && now.Before(lastBroadcast.Add(retryWindow)) {
|
||||
return sql.NullTime{Time: now.Add(retryInterval).UTC(), Valid: true}
|
||||
}
|
||||
|
||||
next := nextBroadcastAfter(anime, now)
|
||||
if !next.IsZero() {
|
||||
return sql.NullTime{Time: next, Valid: true}
|
||||
}
|
||||
|
||||
// Broadcast metadata is often missing or wrong for currently airing shows.
|
||||
// Avoid "never refresh again" caches by falling back to a fixed interval.
|
||||
return sql.NullTime{Time: now.Add(airingFallbackRefreshInterval).UTC(), Valid: true}
|
||||
}
|
||||
|
||||
func (s *EpisodeService) writeEpisodeAvailabilityCache(ctx context.Context, anime domain.Anime, source string, body []byte, now time.Time, providerSuccess bool, nextRefreshSQL sql.NullTime) bool {
|
||||
var retryUntil sql.NullTime
|
||||
if anime.Airing && providerSuccess {
|
||||
retryUntil = sql.NullTime{Time: nextRefreshSQL.Time.Add(retryWindow), Valid: nextRefreshSQL.Valid}
|
||||
}
|
||||
|
||||
err := s.queries.UpsertEpisodeAvailabilityCache(ctx, db.UpsertEpisodeAvailabilityCacheParams{
|
||||
AnimeID: int64(anime.MalID),
|
||||
Data: string(body),
|
||||
NextRefreshAt: nextRefreshSQL,
|
||||
RetryUntilAt: retryUntil,
|
||||
LastAttemptAt: sql.NullTime{Time: now, Valid: true},
|
||||
LastSuccessAt: sql.NullTime{Time: now, Valid: providerSuccess},
|
||||
FailureCount: 0,
|
||||
LastError: "",
|
||||
})
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
observability.Warn(
|
||||
"episodes_cache_write_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"source": source,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *EpisodeService) markFailure(ctx context.Context, anime domain.Anime, cause error) {
|
||||
now := s.clock.Now()
|
||||
next := nextRetryTime(anime, now)
|
||||
var retryUntil sql.NullTime
|
||||
nextBroadcast := nextBroadcastBeforeOrAt(anime, now)
|
||||
if !nextBroadcast.IsZero() {
|
||||
retryUntil = sql.NullTime{Time: nextBroadcast.Add(retryWindow), Valid: true}
|
||||
}
|
||||
|
||||
var nextSQL sql.NullTime
|
||||
if !next.IsZero() {
|
||||
nextSQL = sql.NullTime{Time: next, Valid: true}
|
||||
}
|
||||
|
||||
writeCtx := ctx
|
||||
if ctx.Err() != nil {
|
||||
var cancel context.CancelFunc
|
||||
writeCtx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
}
|
||||
err := s.queries.MarkEpisodeAvailabilityRefreshFailed(writeCtx, db.MarkEpisodeAvailabilityRefreshFailedParams{
|
||||
LastAttemptAt: sql.NullTime{Time: now, Valid: true},
|
||||
LastError: truncate(cause.Error(), 400),
|
||||
NextRefreshAt: nextSQL,
|
||||
RetryUntilAt: retryUntil,
|
||||
AnimeID: int64(anime.MalID),
|
||||
})
|
||||
if err != nil {
|
||||
observability.Warn(
|
||||
"episodes_mark_failure_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return
|
||||
}
|
||||
observability.Warn(
|
||||
"episodes_refresh_failure_recorded",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"next_retry": next.Format(time.RFC3339),
|
||||
},
|
||||
cause,
|
||||
)
|
||||
}
|
||||
|
||||
func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) {
|
||||
row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID))
|
||||
if err != nil {
|
||||
s.metrics.ObserveCache("episode_availability", "miss")
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
var payload domain.CanonicalEpisodeList
|
||||
if err := json.Unmarshal([]byte(row.Data), &payload); err != nil {
|
||||
s.metrics.ObserveCache("episode_availability", "miss")
|
||||
observability.Warn(
|
||||
"episodes_cached_payload_invalid",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": animeID,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
s.metrics.ObserveCache("episode_availability", "hit")
|
||||
return payload, true
|
||||
}
|
||||
|
||||
func (s *EpisodeService) getFreshCached(ctx context.Context, anime domain.Anime) (domain.CanonicalEpisodeList, bool) {
|
||||
row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(anime.MalID))
|
||||
if err != nil {
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
|
||||
now := s.clock.Now()
|
||||
if !s.isFreshEpisodeCache(anime, row, now) {
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
|
||||
payload, ok := s.decodeFreshCachedPayload(anime, row.Data)
|
||||
if !ok {
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
if !isCanonicalEpisodePayloadValid(payload, anime.Episodes) {
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
observability.Info(
|
||||
"episodes_cached_payload_rejected",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"expected_count": anime.Episodes,
|
||||
"cached_episodes": len(payload.Episodes),
|
||||
},
|
||||
)
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "hit")
|
||||
observability.Info(
|
||||
"episodes_cache_served",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"episodes": len(payload.Episodes),
|
||||
"next_refresh": payload.NextRefreshAt,
|
||||
},
|
||||
)
|
||||
return payload, true
|
||||
}
|
||||
|
||||
func (s *EpisodeService) isFreshEpisodeCache(anime domain.Anime, row db.EpisodeAvailabilityCache, now time.Time) bool {
|
||||
if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(now) {
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
observability.Info(
|
||||
"episodes_cache_due_for_refresh",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"next_refresh": row.NextRefreshAt.Time.Format(time.RFC3339),
|
||||
},
|
||||
)
|
||||
return false
|
||||
}
|
||||
if anime.Airing && row.UpdatedAt.Before(now.Add(-airingFallbackRefreshInterval)) {
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
observability.Info(
|
||||
"episodes_cache_too_old_for_airing",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"updated_at": row.UpdatedAt.Format(time.RFC3339),
|
||||
},
|
||||
)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *EpisodeService) decodeFreshCachedPayload(anime domain.Anime, raw string) (domain.CanonicalEpisodeList, bool) {
|
||||
var payload domain.CanonicalEpisodeList
|
||||
err := json.Unmarshal([]byte(raw), &payload)
|
||||
if err == nil {
|
||||
return payload, true
|
||||
}
|
||||
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
observability.Warn(
|
||||
"episodes_cached_payload_invalid",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
|
||||
func isCanonicalEpisodePayloadValid(payload domain.CanonicalEpisodeList, expectedCount int) bool {
|
||||
if expectedCount <= 0 {
|
||||
return true
|
||||
}
|
||||
if len(payload.Episodes) > expectedCount {
|
||||
return false
|
||||
}
|
||||
for _, episode := range payload.Episodes {
|
||||
if episode.Number <= 0 || episode.Number > expectedCount {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *EpisodeService) jikanOnly(ctx context.Context, anime domain.Anime, source string) (domain.CanonicalEpisodeList, error) {
|
||||
episodes, err := s.jikan.GetAllEpisodes(ctx, anime.MalID)
|
||||
if err != nil {
|
||||
@@ -620,203 +229,3 @@ func (s *EpisodeService) jikanOnly(ctx context.Context, anime domain.Anime, sour
|
||||
Source: source,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func titleCandidates(anime domain.Anime) []string {
|
||||
out := []string{anime.Title}
|
||||
if anime.TitleEnglish != "" && anime.TitleEnglish != anime.Title {
|
||||
out = append(out, anime.TitleEnglish)
|
||||
}
|
||||
if anime.TitleJapanese != "" {
|
||||
out = append(out, anime.TitleJapanese)
|
||||
}
|
||||
for _, syn := range anime.TitleSynonyms {
|
||||
if syn != "" && syn != anime.Title && syn != anime.TitleEnglish && syn != anime.TitleJapanese {
|
||||
out = append(out, syn)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
type episodePartial struct {
|
||||
title string
|
||||
filler bool
|
||||
recap bool
|
||||
sub bool
|
||||
dub bool
|
||||
}
|
||||
|
||||
func mergeEpisodes(jikanEpisodes []jikan.Episode, availability domain.EpisodeAvailability, expectedCount int) []domain.CanonicalEpisode {
|
||||
byNumber := map[int]episodePartial{}
|
||||
|
||||
for i, ep := range jikanEpisodes {
|
||||
if exceedsExpectedCount(i+1, expectedCount) {
|
||||
break
|
||||
}
|
||||
number, ok := jikanEpisodeNumber(ep, i)
|
||||
if !ok || exceedsExpectedCount(number, expectedCount) {
|
||||
continue
|
||||
}
|
||||
mergeEpisode(&byNumber, number, func(item *episodePartial) {
|
||||
item.title = strings.TrimSpace(ep.Title)
|
||||
item.filler = ep.Filler
|
||||
item.recap = ep.Recap
|
||||
})
|
||||
}
|
||||
mergeAvailability(&byNumber, availability.Sub, expectedCount, func(item *episodePartial) { item.sub = true })
|
||||
mergeAvailability(&byNumber, availability.Dub, expectedCount, func(item *episodePartial) { item.dub = true })
|
||||
|
||||
numbers := make([]int, 0, len(byNumber))
|
||||
for number := range byNumber {
|
||||
numbers = append(numbers, number)
|
||||
}
|
||||
sort.Ints(numbers)
|
||||
|
||||
episodes := make([]domain.CanonicalEpisode, 0, len(numbers))
|
||||
for _, number := range numbers {
|
||||
item := byNumber[number]
|
||||
title := item.title
|
||||
if title == "" {
|
||||
title = fmt.Sprintf("Episode %d", number)
|
||||
}
|
||||
episodes = append(episodes, domain.CanonicalEpisode{
|
||||
Number: number,
|
||||
Title: title,
|
||||
HasSub: item.sub,
|
||||
HasDub: item.dub,
|
||||
SubOnly: item.sub && !item.dub,
|
||||
Filler: item.filler,
|
||||
Recap: item.recap,
|
||||
})
|
||||
}
|
||||
return episodes
|
||||
}
|
||||
|
||||
func mergeEpisode(byNumber *map[int]episodePartial, number int, update func(*episodePartial)) {
|
||||
item := (*byNumber)[number]
|
||||
update(&item)
|
||||
(*byNumber)[number] = item
|
||||
}
|
||||
|
||||
func mergeAvailability(byNumber *map[int]episodePartial, numbers []int, expectedCount int, update func(*episodePartial)) {
|
||||
for _, number := range numbers {
|
||||
if number <= 0 || exceedsExpectedCount(number, expectedCount) {
|
||||
continue
|
||||
}
|
||||
mergeEpisode(byNumber, number, update)
|
||||
}
|
||||
}
|
||||
|
||||
func jikanEpisodeNumber(ep jikan.Episode, index int) (int, bool) {
|
||||
number, err := strconv.Atoi(strings.TrimSpace(ep.Episode))
|
||||
if err == nil && number > 0 {
|
||||
return number, true
|
||||
}
|
||||
if index < 0 {
|
||||
return 0, false
|
||||
}
|
||||
return index + 1, true
|
||||
}
|
||||
|
||||
func exceedsExpectedCount(number int, expectedCount int) bool {
|
||||
return expectedCount > 0 && number > expectedCount
|
||||
}
|
||||
|
||||
func nextRetryTime(anime domain.Anime, now time.Time) time.Time {
|
||||
broadcast := nextBroadcastBeforeOrAt(anime, now)
|
||||
if broadcast.IsZero() || now.After(broadcast.Add(retryWindow)) {
|
||||
return nextBroadcastAfter(anime, now)
|
||||
}
|
||||
return now.Add(retryInterval)
|
||||
}
|
||||
|
||||
func nextBroadcastBeforeOrAt(anime domain.Anime, now time.Time) time.Time {
|
||||
next := nextBroadcastAfter(anime, now.AddDate(0, 0, -7))
|
||||
if next.IsZero() || next.After(now) {
|
||||
return time.Time{}
|
||||
}
|
||||
return next
|
||||
}
|
||||
|
||||
func nextBroadcastAfter(anime domain.Anime, after time.Time) time.Time {
|
||||
day := weekdayFromJikan(anime.Broadcast.Day)
|
||||
if day < 0 || strings.TrimSpace(anime.Broadcast.Time) == "" {
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
loc := time.UTC
|
||||
if tz := strings.TrimSpace(anime.Broadcast.Timezone); tz != "" {
|
||||
if loaded, err := time.LoadLocation(tz); err == nil {
|
||||
loc = loaded
|
||||
} else {
|
||||
observability.Warn(
|
||||
"episodes_broadcast_timezone_parse_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"timezone": tz,
|
||||
},
|
||||
err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
hour, minute, ok := parseBroadcastTime(anime.Broadcast.Time)
|
||||
if !ok {
|
||||
observability.Warn(
|
||||
"episodes_broadcast_time_parse_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"time": anime.Broadcast.Time,
|
||||
},
|
||||
nil,
|
||||
)
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
localAfter := after.In(loc)
|
||||
daysAhead := (int(day) - int(localAfter.Weekday()) + 7) % 7
|
||||
candidate := time.Date(localAfter.Year(), localAfter.Month(), localAfter.Day()+daysAhead, hour, minute, 0, 0, loc)
|
||||
if !candidate.After(localAfter) {
|
||||
candidate = candidate.AddDate(0, 0, 7)
|
||||
}
|
||||
return candidate.UTC()
|
||||
}
|
||||
|
||||
func weekdayFromJikan(day string) time.Weekday {
|
||||
switch strings.ToLower(strings.TrimSpace(day)) {
|
||||
case "sundays":
|
||||
return time.Sunday
|
||||
case "mondays":
|
||||
return time.Monday
|
||||
case "tuesdays":
|
||||
return time.Tuesday
|
||||
case "wednesdays":
|
||||
return time.Wednesday
|
||||
case "thursdays":
|
||||
return time.Thursday
|
||||
case "fridays":
|
||||
return time.Friday
|
||||
case "saturdays":
|
||||
return time.Saturday
|
||||
default:
|
||||
return -1
|
||||
}
|
||||
}
|
||||
|
||||
func parseBroadcastTime(value string) (int, int, bool) {
|
||||
t, err := time.Parse("15:04", strings.TrimSpace(value))
|
||||
if err != nil {
|
||||
return 0, 0, false
|
||||
}
|
||||
return t.Hour(), t.Minute(), true
|
||||
}
|
||||
|
||||
func truncate(value string, maxLen int) string {
|
||||
if len(value) <= maxLen {
|
||||
return value
|
||||
}
|
||||
return value[:maxLen]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user