refactor: migrate episodes logs to observability
This commit is contained in:
@@ -6,7 +6,6 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"mal/integrations/jikan"
|
||||
"mal/internal/db"
|
||||
"mal/internal/domain"
|
||||
@@ -84,11 +83,27 @@ func (s *EpisodeService) RefreshTrackedDue(ctx context.Context, limit int) error
|
||||
for _, id := range ids {
|
||||
anime, err := s.jikan.GetAnimeByID(ctx, int(id))
|
||||
if err != nil {
|
||||
log.Printf("episodes: failed to fetch anime for refresh anime_id=%d error=%v", id, err)
|
||||
observability.Warn(
|
||||
"episodes_refresh_fetch_anime_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": int64(id),
|
||||
},
|
||||
err,
|
||||
)
|
||||
continue
|
||||
}
|
||||
if _, err := s.refresh(ctx, anime); err != nil {
|
||||
log.Printf("episodes: refresh failed anime_id=%d error=%v", id, err)
|
||||
observability.Warn(
|
||||
"episodes_refresh_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": int64(id),
|
||||
},
|
||||
err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,18 +112,43 @@ func (s *EpisodeService) RefreshTrackedDue(ctx context.Context, limit int) error
|
||||
|
||||
func (s *EpisodeService) refresh(ctx context.Context, anime domain.Anime) (domain.CanonicalEpisodeList, error) {
|
||||
now := s.clock.Now()
|
||||
log.Printf("episodes: refresh start anime_id=%d title=%q airing=%t", anime.MalID, anime.DisplayTitle(), anime.Airing)
|
||||
observability.Info(
|
||||
"episodes_refresh_start",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"title": anime.DisplayTitle(),
|
||||
"airing": anime.Airing,
|
||||
},
|
||||
)
|
||||
|
||||
jikanEpisodes, jikanErr := s.jikan.GetAllEpisodes(ctx, anime.MalID)
|
||||
if jikanErr != nil {
|
||||
log.Printf("episodes: jikan episode metadata failed anime_id=%d error=%v", anime.MalID, jikanErr)
|
||||
observability.Warn(
|
||||
"episodes_jikan_metadata_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
},
|
||||
jikanErr,
|
||||
)
|
||||
}
|
||||
|
||||
providerAvailability, source, providerErr := s.fetchProviderAvailability(ctx, anime)
|
||||
if providerErr != nil {
|
||||
s.markFailure(ctx, anime, providerErr)
|
||||
if cached, ok := s.getCached(ctx, anime.MalID); ok {
|
||||
log.Printf("episodes: serving stale cache after provider failure anime_id=%d error=%v", anime.MalID, providerErr)
|
||||
observability.Warn(
|
||||
"episodes_provider_failed_serving_stale_cache",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
},
|
||||
providerErr,
|
||||
)
|
||||
return cached, nil
|
||||
}
|
||||
if jikanErr == nil {
|
||||
@@ -125,16 +165,44 @@ func (s *EpisodeService) fetchProviderAvailability(ctx context.Context, anime do
|
||||
for _, provider := range s.providers {
|
||||
providerID, err := s.providerID(ctx, anime, provider, titles)
|
||||
if err != nil {
|
||||
log.Printf("episodes: provider id miss anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err)
|
||||
observability.Warn(
|
||||
"episodes_provider_id_miss",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"provider": provider.Name(),
|
||||
},
|
||||
err,
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
available, err := provider.GetEpisodeAvailabilityByProviderID(ctx, providerID)
|
||||
if err != nil {
|
||||
log.Printf("episodes: provider availability miss anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err)
|
||||
observability.Warn(
|
||||
"episodes_provider_availability_miss",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"provider": provider.Name(),
|
||||
},
|
||||
err,
|
||||
)
|
||||
continue
|
||||
}
|
||||
log.Printf("episodes: provider availability hit anime_id=%d provider=%s sub=%d dub=%d", anime.MalID, provider.Name(), len(available.Sub), len(available.Dub))
|
||||
observability.Info(
|
||||
"episodes_provider_availability_hit",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"provider": provider.Name(),
|
||||
"sub": len(available.Sub),
|
||||
"dub": len(available.Dub),
|
||||
},
|
||||
)
|
||||
return available, provider.Name(), nil
|
||||
}
|
||||
return domain.EpisodeAvailability{}, "", fmt.Errorf("no episode availability provider matched anime_id=%d", anime.MalID)
|
||||
@@ -152,13 +220,31 @@ func (s *EpisodeService) providerID(ctx context.Context, anime domain.Anime, pro
|
||||
}
|
||||
if strings.TrimSpace(row.ProviderShowID) != "" {
|
||||
s.metrics.ObserveCache("episode_provider_mapping", "hit")
|
||||
log.Printf("episodes: provider id cache hit anime_id=%d provider=%s provider_id=%s", anime.MalID, provider.Name(), row.ProviderShowID)
|
||||
observability.Info(
|
||||
"episodes_provider_id_cache_hit",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"provider": provider.Name(),
|
||||
"provider_id": row.ProviderShowID,
|
||||
},
|
||||
)
|
||||
return row.ProviderShowID, nil
|
||||
}
|
||||
s.metrics.ObserveCache("episode_provider_mapping", "miss")
|
||||
} else if !errors.Is(err, sql.ErrNoRows) {
|
||||
s.metrics.ObserveCache("episode_provider_mapping", "miss")
|
||||
log.Printf("episodes: provider id cache read failed anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err)
|
||||
observability.Warn(
|
||||
"episodes_provider_id_cache_read_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"provider": provider.Name(),
|
||||
},
|
||||
err,
|
||||
)
|
||||
} else {
|
||||
s.metrics.ObserveCache("episode_provider_mapping", "miss")
|
||||
}
|
||||
@@ -183,9 +269,27 @@ func (s *EpisodeService) providerID(ctx context.Context, anime domain.Anime, pro
|
||||
LastError: "",
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("episodes: provider id cache write failed anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err)
|
||||
observability.Warn(
|
||||
"episodes_provider_id_cache_write_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"provider": provider.Name(),
|
||||
},
|
||||
err,
|
||||
)
|
||||
}
|
||||
log.Printf("episodes: provider id resolved anime_id=%d provider=%s provider_id=%s", anime.MalID, provider.Name(), providerID)
|
||||
observability.Info(
|
||||
"episodes_provider_id_resolved",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"provider": provider.Name(),
|
||||
"provider_id": providerID,
|
||||
},
|
||||
)
|
||||
return providerID, nil
|
||||
}
|
||||
|
||||
@@ -240,11 +344,30 @@ func (s *EpisodeService) store(ctx context.Context, anime domain.Anime, jikanEpi
|
||||
LastError: "",
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("episodes: cache write failed anime_id=%d source=%s error=%v", anime.MalID, source, err)
|
||||
observability.Warn(
|
||||
"episodes_cache_write_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"source": source,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
log.Printf("episodes: refresh success anime_id=%d source=%s episodes=%d next_refresh=%s", anime.MalID, source, len(episodes), payload.NextRefreshAt)
|
||||
observability.Info(
|
||||
"episodes_refresh_success",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"source": source,
|
||||
"episodes": len(episodes),
|
||||
"next_refresh": payload.NextRefreshAt,
|
||||
},
|
||||
)
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
@@ -270,10 +393,27 @@ func (s *EpisodeService) markFailure(ctx context.Context, anime domain.Anime, ca
|
||||
AnimeID: int64(anime.MalID),
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("episodes: failed to mark refresh failure anime_id=%d error=%v", anime.MalID, err)
|
||||
observability.Warn(
|
||||
"episodes_mark_failure_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return
|
||||
}
|
||||
log.Printf("episodes: refresh failure recorded anime_id=%d next_retry=%s error=%v", anime.MalID, next.Format(time.RFC3339), cause)
|
||||
observability.Warn(
|
||||
"episodes_refresh_failure_recorded",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"next_retry": next.Format(time.RFC3339),
|
||||
},
|
||||
cause,
|
||||
)
|
||||
}
|
||||
|
||||
func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) {
|
||||
@@ -285,7 +425,15 @@ func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.Can
|
||||
var payload domain.CanonicalEpisodeList
|
||||
if err := json.Unmarshal([]byte(row.Data), &payload); err != nil {
|
||||
s.metrics.ObserveCache("episode_availability", "miss")
|
||||
log.Printf("episodes: invalid cached payload anime_id=%d error=%v", animeID, err)
|
||||
observability.Warn(
|
||||
"episodes_cached_payload_invalid",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": animeID,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
s.metrics.ObserveCache("episode_availability", "hit")
|
||||
@@ -302,24 +450,57 @@ func (s *EpisodeService) getFreshCached(ctx context.Context, anime domain.Anime)
|
||||
now := s.clock.Now()
|
||||
if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(now) {
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
log.Printf("episodes: cached availability due for refresh anime_id=%d next_refresh=%s", anime.MalID, row.NextRefreshAt.Time.Format(time.RFC3339))
|
||||
observability.Info(
|
||||
"episodes_cache_due_for_refresh",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"next_refresh": row.NextRefreshAt.Time.Format(time.RFC3339),
|
||||
},
|
||||
)
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
|
||||
if anime.Airing && row.UpdatedAt.Before(now.Add(-airingFallbackRefreshInterval)) {
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
log.Printf("episodes: cached availability too old for airing anime_id=%d updated_at=%s", anime.MalID, row.UpdatedAt.Format(time.RFC3339))
|
||||
observability.Info(
|
||||
"episodes_cache_too_old_for_airing",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"updated_at": row.UpdatedAt.Format(time.RFC3339),
|
||||
},
|
||||
)
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
|
||||
var payload domain.CanonicalEpisodeList
|
||||
if err := json.Unmarshal([]byte(row.Data), &payload); err != nil {
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||
log.Printf("episodes: invalid cached payload anime_id=%d error=%v", anime.MalID, err)
|
||||
observability.Warn(
|
||||
"episodes_cached_payload_invalid",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
},
|
||||
err,
|
||||
)
|
||||
return domain.CanonicalEpisodeList{}, false
|
||||
}
|
||||
s.metrics.ObserveCache("episode_availability_fresh", "hit")
|
||||
log.Printf("episodes: served cached availability anime_id=%d episodes=%d next_refresh=%s", anime.MalID, len(payload.Episodes), payload.NextRefreshAt)
|
||||
observability.Info(
|
||||
"episodes_cache_served",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"episodes": len(payload.Episodes),
|
||||
"next_refresh": payload.NextRefreshAt,
|
||||
},
|
||||
)
|
||||
return payload, true
|
||||
}
|
||||
|
||||
@@ -441,13 +622,31 @@ func nextBroadcastAfter(anime domain.Anime, after time.Time) time.Time {
|
||||
if loaded, err := time.LoadLocation(tz); err == nil {
|
||||
loc = loaded
|
||||
} else {
|
||||
log.Printf("episodes: failed to parse broadcast timezone anime_id=%d timezone=%q error=%v", anime.MalID, tz, err)
|
||||
observability.Warn(
|
||||
"episodes_broadcast_timezone_parse_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"timezone": tz,
|
||||
},
|
||||
err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
hour, minute, ok := parseBroadcastTime(anime.Broadcast.Time)
|
||||
if !ok {
|
||||
log.Printf("episodes: failed to parse broadcast time anime_id=%d time=%q", anime.MalID, anime.Broadcast.Time)
|
||||
observability.Warn(
|
||||
"episodes_broadcast_time_parse_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"anime_id": anime.MalID,
|
||||
"time": anime.Broadcast.Time,
|
||||
},
|
||||
nil,
|
||||
)
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ package episodes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"mal/internal/domain"
|
||||
"mal/internal/observability"
|
||||
"time"
|
||||
@@ -18,14 +17,22 @@ func RegisterWorker(lc fx.Lifecycle, svc domain.EpisodeService, metrics *observa
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(context.Context) error {
|
||||
go func() {
|
||||
log.Println("episodes: availability worker started")
|
||||
observability.Info("episodes_worker_start", "episodes", "", nil)
|
||||
ticker := time.NewTicker(workerInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
if err := svc.RefreshTrackedDue(ctx, 25); err != nil {
|
||||
metrics.ObserveWorkerTick("episodes_availability", err)
|
||||
log.Printf("episodes: availability worker tick failed error=%v", err)
|
||||
observability.Warn(
|
||||
"episodes_worker_tick_failed",
|
||||
"episodes",
|
||||
"",
|
||||
map[string]any{
|
||||
"worker": "episodes_availability",
|
||||
},
|
||||
err,
|
||||
)
|
||||
} else {
|
||||
metrics.ObserveWorkerTick("episodes_availability", nil)
|
||||
}
|
||||
@@ -33,7 +40,7 @@ func RegisterWorker(lc fx.Lifecycle, svc domain.EpisodeService, metrics *observa
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-ctx.Done():
|
||||
log.Println("episodes: availability worker stopped")
|
||||
observability.Info("episodes_worker_stop", "episodes", "", nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user