From e08a0e1f71f108e6fb6a5a2ad2f1608eca938e49 Mon Sep 17 00:00:00 2001 From: mkelvers Date: Tue, 26 May 2026 15:56:33 +0200 Subject: [PATCH] refactor: migrate episodes logs to observability --- internal/episodes/service/service.go | 247 ++++++++++++++++++++++++--- internal/episodes/worker.go | 15 +- 2 files changed, 234 insertions(+), 28 deletions(-) diff --git a/internal/episodes/service/service.go b/internal/episodes/service/service.go index 1a2f052..0b2694b 100644 --- a/internal/episodes/service/service.go +++ b/internal/episodes/service/service.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "mal/integrations/jikan" "mal/internal/db" "mal/internal/domain" @@ -84,11 +83,27 @@ func (s *EpisodeService) RefreshTrackedDue(ctx context.Context, limit int) error for _, id := range ids { anime, err := s.jikan.GetAnimeByID(ctx, int(id)) if err != nil { - log.Printf("episodes: failed to fetch anime for refresh anime_id=%d error=%v", id, err) + observability.Warn( + "episodes_refresh_fetch_anime_failed", + "episodes", + "", + map[string]any{ + "anime_id": int64(id), + }, + err, + ) continue } if _, err := s.refresh(ctx, anime); err != nil { - log.Printf("episodes: refresh failed anime_id=%d error=%v", id, err) + observability.Warn( + "episodes_refresh_failed", + "episodes", + "", + map[string]any{ + "anime_id": int64(id), + }, + err, + ) } } @@ -97,18 +112,43 @@ func (s *EpisodeService) RefreshTrackedDue(ctx context.Context, limit int) error func (s *EpisodeService) refresh(ctx context.Context, anime domain.Anime) (domain.CanonicalEpisodeList, error) { now := s.clock.Now() - log.Printf("episodes: refresh start anime_id=%d title=%q airing=%t", anime.MalID, anime.DisplayTitle(), anime.Airing) + observability.Info( + "episodes_refresh_start", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "title": anime.DisplayTitle(), + "airing": anime.Airing, + }, + ) jikanEpisodes, jikanErr := s.jikan.GetAllEpisodes(ctx, anime.MalID) if jikanErr != nil { - log.Printf("episodes: jikan episode metadata failed anime_id=%d error=%v", anime.MalID, jikanErr) + observability.Warn( + "episodes_jikan_metadata_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + }, + jikanErr, + ) } providerAvailability, source, providerErr := s.fetchProviderAvailability(ctx, anime) if providerErr != nil { s.markFailure(ctx, anime, providerErr) if cached, ok := s.getCached(ctx, anime.MalID); ok { - log.Printf("episodes: serving stale cache after provider failure anime_id=%d error=%v", anime.MalID, providerErr) + observability.Warn( + "episodes_provider_failed_serving_stale_cache", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + }, + providerErr, + ) return cached, nil } if jikanErr == nil { @@ -125,16 +165,44 @@ func (s *EpisodeService) fetchProviderAvailability(ctx context.Context, anime do for _, provider := range s.providers { providerID, err := s.providerID(ctx, anime, provider, titles) if err != nil { - log.Printf("episodes: provider id miss anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) + observability.Warn( + "episodes_provider_id_miss", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "provider": provider.Name(), + }, + err, + ) continue } available, err := provider.GetEpisodeAvailabilityByProviderID(ctx, providerID) if err != nil { - log.Printf("episodes: provider availability miss anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) + observability.Warn( + "episodes_provider_availability_miss", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "provider": provider.Name(), + }, + err, + ) continue } - log.Printf("episodes: provider availability hit anime_id=%d provider=%s sub=%d dub=%d", anime.MalID, provider.Name(), len(available.Sub), len(available.Dub)) + observability.Info( + "episodes_provider_availability_hit", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "provider": provider.Name(), + "sub": len(available.Sub), + "dub": len(available.Dub), + }, + ) return available, provider.Name(), nil } return domain.EpisodeAvailability{}, "", fmt.Errorf("no episode availability provider matched anime_id=%d", anime.MalID) @@ -152,13 +220,31 @@ func (s *EpisodeService) providerID(ctx context.Context, anime domain.Anime, pro } if strings.TrimSpace(row.ProviderShowID) != "" { s.metrics.ObserveCache("episode_provider_mapping", "hit") - log.Printf("episodes: provider id cache hit anime_id=%d provider=%s provider_id=%s", anime.MalID, provider.Name(), row.ProviderShowID) + observability.Info( + "episodes_provider_id_cache_hit", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "provider": provider.Name(), + "provider_id": row.ProviderShowID, + }, + ) return row.ProviderShowID, nil } s.metrics.ObserveCache("episode_provider_mapping", "miss") } else if !errors.Is(err, sql.ErrNoRows) { s.metrics.ObserveCache("episode_provider_mapping", "miss") - log.Printf("episodes: provider id cache read failed anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) + observability.Warn( + "episodes_provider_id_cache_read_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "provider": provider.Name(), + }, + err, + ) } else { s.metrics.ObserveCache("episode_provider_mapping", "miss") } @@ -183,9 +269,27 @@ func (s *EpisodeService) providerID(ctx context.Context, anime domain.Anime, pro LastError: "", }) if err != nil { - log.Printf("episodes: provider id cache write failed anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err) + observability.Warn( + "episodes_provider_id_cache_write_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "provider": provider.Name(), + }, + err, + ) } - log.Printf("episodes: provider id resolved anime_id=%d provider=%s provider_id=%s", anime.MalID, provider.Name(), providerID) + observability.Info( + "episodes_provider_id_resolved", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "provider": provider.Name(), + "provider_id": providerID, + }, + ) return providerID, nil } @@ -240,11 +344,30 @@ func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpi LastError: "", }) if err != nil { - log.Printf("episodes: cache write failed anime_id=%d source=%s error=%v", anime.MalID, source, err) + observability.Warn( + "episodes_cache_write_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "source": source, + }, + err, + ) return payload, nil } - log.Printf("episodes: refresh success anime_id=%d source=%s episodes=%d next_refresh=%s", anime.MalID, source, len(episodes), payload.NextRefreshAt) + observability.Info( + "episodes_refresh_success", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "source": source, + "episodes": len(episodes), + "next_refresh": payload.NextRefreshAt, + }, + ) return payload, nil } @@ -270,10 +393,27 @@ func (s *EpisodeService) markFailure(ctx context.Context, anime domain.Anime, ca AnimeID: int64(anime.MalID), }) if err != nil { - log.Printf("episodes: failed to mark refresh failure anime_id=%d error=%v", anime.MalID, err) + observability.Warn( + "episodes_mark_failure_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + }, + err, + ) return } - log.Printf("episodes: refresh failure recorded anime_id=%d next_retry=%s error=%v", anime.MalID, next.Format(time.RFC3339), cause) + observability.Warn( + "episodes_refresh_failure_recorded", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "next_retry": next.Format(time.RFC3339), + }, + cause, + ) } func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) { @@ -285,7 +425,15 @@ func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.Can var payload domain.CanonicalEpisodeList if err := json.Unmarshal([]byte(row.Data), &payload); err != nil { s.metrics.ObserveCache("episode_availability", "miss") - log.Printf("episodes: invalid cached payload anime_id=%d error=%v", animeID, err) + observability.Warn( + "episodes_cached_payload_invalid", + "episodes", + "", + map[string]any{ + "anime_id": animeID, + }, + err, + ) return domain.CanonicalEpisodeList{}, false } s.metrics.ObserveCache("episode_availability", "hit") @@ -302,24 +450,57 @@ func (s *EpisodeService) getFreshCached(ctx context.Context, anime domain.Anime) now := s.clock.Now() if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(now) { s.metrics.ObserveCache("episode_availability_fresh", "miss") - log.Printf("episodes: cached availability due for refresh anime_id=%d next_refresh=%s", anime.MalID, row.NextRefreshAt.Time.Format(time.RFC3339)) + observability.Info( + "episodes_cache_due_for_refresh", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "next_refresh": row.NextRefreshAt.Time.Format(time.RFC3339), + }, + ) return domain.CanonicalEpisodeList{}, false } if anime.Airing && row.UpdatedAt.Before(now.Add(-airingFallbackRefreshInterval)) { s.metrics.ObserveCache("episode_availability_fresh", "miss") - log.Printf("episodes: cached availability too old for airing anime_id=%d updated_at=%s", anime.MalID, row.UpdatedAt.Format(time.RFC3339)) + observability.Info( + "episodes_cache_too_old_for_airing", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "updated_at": row.UpdatedAt.Format(time.RFC3339), + }, + ) return domain.CanonicalEpisodeList{}, false } var payload domain.CanonicalEpisodeList if err := json.Unmarshal([]byte(row.Data), &payload); err != nil { s.metrics.ObserveCache("episode_availability_fresh", "miss") - log.Printf("episodes: invalid cached payload anime_id=%d error=%v", anime.MalID, err) + observability.Warn( + "episodes_cached_payload_invalid", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + }, + err, + ) return domain.CanonicalEpisodeList{}, false } s.metrics.ObserveCache("episode_availability_fresh", "hit") - log.Printf("episodes: served cached availability anime_id=%d episodes=%d next_refresh=%s", anime.MalID, len(payload.Episodes), payload.NextRefreshAt) + observability.Info( + "episodes_cache_served", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "episodes": len(payload.Episodes), + "next_refresh": payload.NextRefreshAt, + }, + ) return payload, true } @@ -441,13 +622,31 @@ func nextBroadcastAfter(anime domain.Anime, after time.Time) time.Time { if loaded, err := time.LoadLocation(tz); err == nil { loc = loaded } else { - log.Printf("episodes: failed to parse broadcast timezone anime_id=%d timezone=%q error=%v", anime.MalID, tz, err) + observability.Warn( + "episodes_broadcast_timezone_parse_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "timezone": tz, + }, + err, + ) } } hour, minute, ok := parseBroadcastTime(anime.Broadcast.Time) if !ok { - log.Printf("episodes: failed to parse broadcast time anime_id=%d time=%q", anime.MalID, anime.Broadcast.Time) + observability.Warn( + "episodes_broadcast_time_parse_failed", + "episodes", + "", + map[string]any{ + "anime_id": anime.MalID, + "time": anime.Broadcast.Time, + }, + nil, + ) return time.Time{} } diff --git a/internal/episodes/worker.go b/internal/episodes/worker.go index 49f7504..bcefda9 100644 --- a/internal/episodes/worker.go +++ b/internal/episodes/worker.go @@ -2,7 +2,6 @@ package episodes import ( "context" - "log" "mal/internal/domain" "mal/internal/observability" "time" @@ -18,14 +17,22 @@ func RegisterWorker(lc fx.Lifecycle, svc domain.EpisodeService, metrics *observa lc.Append(fx.Hook{ OnStart: func(context.Context) error { go func() { - log.Println("episodes: availability worker started") + observability.Info("episodes_worker_start", "episodes", "", nil) ticker := time.NewTicker(workerInterval) defer ticker.Stop() for { if err := svc.RefreshTrackedDue(ctx, 25); err != nil { metrics.ObserveWorkerTick("episodes_availability", err) - log.Printf("episodes: availability worker tick failed error=%v", err) + observability.Warn( + "episodes_worker_tick_failed", + "episodes", + "", + map[string]any{ + "worker": "episodes_availability", + }, + err, + ) } else { metrics.ObserveWorkerTick("episodes_availability", nil) } @@ -33,7 +40,7 @@ func RegisterWorker(lc fx.Lifecycle, svc domain.EpisodeService, metrics *observa select { case <-ticker.C: case <-ctx.Done(): - log.Println("episodes: availability worker stopped") + observability.Info("episodes_worker_stop", "episodes", "", nil) return } }