From 2e79c32afe4cfd8e2bc3675ea51833c4a58cf270 Mon Sep 17 00:00:00 2001 From: mkelvers Date: Thu, 11 Jun 2026 14:38:40 +0200 Subject: [PATCH] refactor: split getTopPicksForYou into focused helpers --- internal/anime/service.go | 297 ++++++++++++++++++++++---------------- 1 file changed, 171 insertions(+), 126 deletions(-) diff --git a/internal/anime/service.go b/internal/anime/service.go index 5e4612c..b1674fd 100644 --- a/internal/anime/service.go +++ b/internal/anime/service.go @@ -22,6 +22,20 @@ type animeService struct { repo domain.AnimeRepository } +type rankedCandidate struct { + id int + collaborativeScore float64 + profileSearchScore float64 + anime jikan.Anime + hasAnime bool +} + +type candidateStore struct { + watchlistAnimeIDs map[int]struct{} + byID map[int]rankedCandidate + mu sync.Mutex +} + func wrapAnimes(in []jikan.Anime) []domain.Anime { out := make([]domain.Anime, 0, len(in)) for _, a := range in { @@ -34,6 +48,65 @@ func NewAnimeService(jikan *jikan.Client, repo domain.AnimeRepository) *animeSer return &animeService{jikan: jikan, repo: repo} } +func newCandidateStore(watchlist []db.GetUserWatchListRow) *candidateStore { + watchlistAnimeIDs := make(map[int]struct{}, len(watchlist)) + for _, entry := range watchlist { + if entry.AnimeID <= 0 { + continue + } + watchlistAnimeIDs[int(entry.AnimeID)] = struct{}{} + } + + return &candidateStore{ + watchlistAnimeIDs: watchlistAnimeIDs, + byID: map[int]rankedCandidate{}, + } +} + +func (s *candidateStore) upsert(candidate rankedCandidate) { + if candidate.id <= 0 { + return + } + if _, exists := s.watchlistAnimeIDs[candidate.id]; exists { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + + current, ok := s.byID[candidate.id] + if !ok { + s.byID[candidate.id] = candidate + return + } + + current.collaborativeScore += candidate.collaborativeScore + current.profileSearchScore += candidate.profileSearchScore + if candidate.hasAnime { + current.anime = candidate.anime + current.hasAnime = true + } + s.byID[candidate.id] = current +} + +func (s *candidateStore) ranked() []rankedCandidate { + ranked := make([]rankedCandidate, 0, len(s.byID)) + for _, item := range s.byID { + ranked = append(ranked, item) + } + + sort.Slice(ranked, func(i, j int) bool { + left := rankedCandidateRetrievalScore(ranked[i].collaborativeScore, ranked[i].profileSearchScore) + right := rankedCandidateRetrievalScore(ranked[j].collaborativeScore, ranked[j].profileSearchScore) + if left == right { + return ranked[i].id < ranked[j].id + } + return left > right + }) + + return ranked +} + func isAiringScheduleCandidate(entry db.GetUserWatchListRow) bool { status := strings.TrimSpace(entry.Status) if status != "watching" && status != "plan_to_watch" { @@ -160,112 +233,48 @@ func (s *animeService) GetTopPicksForYou(ctx context.Context, userID string) (do return s.getTopPicksForYou(ctx, userID, forYouFullResultLimit) } -func (s *animeService) getTopPicksForYou( - ctx context.Context, - userID string, - resultLimit int, -) (domain.CatalogSectionData, error) { - if strings.TrimSpace(userID) == "" { - return domain.CatalogSectionData{Animes: []domain.Anime{}}, nil - } - - watchlist, err := s.repo.GetUserWatchList(ctx, userID) - if err != nil { - return domain.CatalogSectionData{}, err - } - - now := time.Now() - seedPool := buildRecommendationSeeds(now, watchlist) - if len(seedPool) == 0 { - return domain.CatalogSectionData{Animes: []domain.Anime{}}, nil - } - - type rankedCandidate struct { - id int - collaborativeScore float64 - profileSearchScore float64 - anime jikan.Anime - hasAnime bool - } - - watchlistAnimeIDs := make(map[int]struct{}, len(watchlist)) - for _, entry := range watchlist { - if entry.AnimeID <= 0 { - continue - } - watchlistAnimeIDs[int(entry.AnimeID)] = struct{}{} - } - - candidatesByID := map[int]rankedCandidate{} - var candidatesByIDMu sync.Mutex - upsertCandidate := func(candidate rankedCandidate) { - if candidate.id <= 0 { - return - } - if _, exists := watchlistAnimeIDs[candidate.id]; exists { - return - } - - candidatesByIDMu.Lock() - defer candidatesByIDMu.Unlock() - - current, ok := candidatesByID[candidate.id] - if !ok { - candidatesByID[candidate.id] = candidate - return - } - - current.collaborativeScore += candidate.collaborativeScore - current.profileSearchScore += candidate.profileSearchScore - if candidate.hasAnime { - current.anime = candidate.anime - current.hasAnime = true - } - candidatesByID[candidate.id] = current - } - +func (s *animeService) fetchSeedAnimes(ctx context.Context, seedPool []recommendationSeed) ([]jikan.Anime, error) { seedAnimes := make([]jikan.Anime, len(seedPool)) - var seedFetchGroup errgroup.Group - seedFetchGroup.SetLimit(4) + var g errgroup.Group + g.SetLimit(4) for i, seed := range seedPool { - seedFetchGroup.Go(func() error { - anime, fetchErr := s.jikan.GetAnimeByID(ctx, seed.animeID) - if fetchErr != nil { - return fetchErr + g.Go(func() error { + anime, err := s.jikan.GetAnimeByID(ctx, seed.animeID) + if err != nil { + return err } seedAnimes[i] = anime return nil }) } - if err := seedFetchGroup.Wait(); err != nil { - return domain.CatalogSectionData{}, err + if err := g.Wait(); err != nil { + return nil, err } - profile := buildTasteProfile(now, seedPool, seedAnimes) + return seedAnimes, nil +} - var recommendationGroup errgroup.Group - recommendationGroup.SetLimit(4) +func (s *animeService) collectCollaborativeCandidates(ctx context.Context, seedPool []recommendationSeed, store *candidateStore) error { + var g errgroup.Group + g.SetLimit(4) for _, seed := range seedPool { - recommendationGroup.Go(func() error { - recs, recErr := s.jikan.GetAnimeRecommendations(ctx, seed.animeID) - if recErr != nil { - return recErr + g.Go(func() error { + recs, err := s.jikan.GetAnimeRecommendations(ctx, seed.animeID) + if err != nil { + return err } for i, rec := range recs { if i >= forYouMaxRecommendations { break } id := rec.Entry.MalID - if id <= 0 { + if id <= 0 || id == seed.animeID { continue } - if id == seed.animeID { - continue - } - upsertCandidate(rankedCandidate{ + store.upsert(rankedCandidate{ id: id, collaborativeScore: float64(rec.Votes) * seed.weight, }) @@ -274,17 +283,17 @@ func (s *animeService) getTopPicksForYou( }) } - if err := recommendationGroup.Wait(); err != nil { - return domain.CatalogSectionData{}, err - } + return g.Wait() +} - profileQueries := buildProfileSearchQueries(profile) - var profileSearchGroup errgroup.Group - profileSearchGroup.SetLimit(3) +func (s *animeService) collectProfileSearchCandidates(ctx context.Context, profile userTasteProfile, store *candidateStore) error { + queries := buildProfileSearchQueries(profile) + var g errgroup.Group + g.SetLimit(3) - for _, query := range profileQueries { - profileSearchGroup.Go(func() error { - res, searchErr := s.jikan.SearchAdvanced( + for _, query := range queries { + g.Go(func() error { + res, err := s.jikan.SearchAdvanced( ctx, "", "", @@ -297,7 +306,7 @@ func (s *animeService) getTopPicksForYou( 1, forYouProfileSearchLimit, ) - if searchErr != nil { + if err != nil { observability.Warn( "top_pick_profile_search_failed", "anime", @@ -306,7 +315,7 @@ func (s *animeService) getTopPicksForYou( "genres": query.genreIDs, "studio_id": query.studioID, }, - searchErr, + err, ) return nil } @@ -315,7 +324,7 @@ func (s *animeService) getTopPicksForYou( if anime.MalID <= 0 { continue } - upsertCandidate(rankedCandidate{ + store.upsert(rankedCandidate{ id: anime.MalID, profileSearchScore: query.weight * profileSearchRankWeight(i), anime: anime, @@ -326,46 +335,34 @@ func (s *animeService) getTopPicksForYou( }) } - if err := profileSearchGroup.Wait(); err != nil { - return domain.CatalogSectionData{}, err - } + return g.Wait() +} - if len(candidatesByID) == 0 { - return domain.CatalogSectionData{Animes: []domain.Anime{}}, nil - } - - rankedIDs := make([]rankedCandidate, 0, len(candidatesByID)) - for _, item := range candidatesByID { - rankedIDs = append(rankedIDs, item) - } - sort.Slice(rankedIDs, func(i, j int) bool { - left := rankedCandidateRetrievalScore(rankedIDs[i].collaborativeScore, rankedIDs[i].profileSearchScore) - right := rankedCandidateRetrievalScore(rankedIDs[j].collaborativeScore, rankedIDs[j].profileSearchScore) - if left == right { - return rankedIDs[i].id < rankedIDs[j].id - } - return left > right - }) - - limit := min(len(rankedIDs), forYouCandidateFetchLimit) +func (s *animeService) scoreRankedCandidates( + ctx context.Context, + now time.Time, + profile userTasteProfile, + ranked []rankedCandidate, +) ([]recommendationCandidate, error) { + limit := min(len(ranked), forYouCandidateFetchLimit) candidates := make([]recommendationCandidate, 0, limit) var candidatesMu sync.Mutex - var detailGroup errgroup.Group - detailGroup.SetLimit(6) + var g errgroup.Group + g.SetLimit(6) for i := 0; i < limit; i++ { - item := rankedIDs[i] - detailGroup.Go(func() error { + item := ranked[i] + g.Go(func() error { anime := item.anime if !item.hasAnime || !hasTasteMetadata(anime) { - fetchedAnime, fetchErr := s.jikan.GetAnimeByID(ctx, item.id) - if fetchErr != nil { + fetchedAnime, err := s.jikan.GetAnimeByID(ctx, item.id) + if err != nil { observability.Warn( "recommendation_anime_fetch_failed", "anime", "", map[string]any{"anime_id": item.id}, - fetchErr, + err, ) return nil } @@ -386,8 +383,8 @@ func (s *animeService) getTopPicksForYou( }) } - if err := detailGroup.Wait(); err != nil { - return domain.CatalogSectionData{}, err + if err := g.Wait(); err != nil { + return nil, err } sort.Slice(candidates, func(i, j int) bool { @@ -397,6 +394,54 @@ func (s *animeService) getTopPicksForYou( return candidates[i].score > candidates[j].score }) + return candidates, nil +} + +func (s *animeService) getTopPicksForYou( + ctx context.Context, + userID string, + resultLimit int, +) (domain.CatalogSectionData, error) { + if strings.TrimSpace(userID) == "" { + return domain.CatalogSectionData{Animes: []domain.Anime{}}, nil + } + + watchlist, err := s.repo.GetUserWatchList(ctx, userID) + if err != nil { + return domain.CatalogSectionData{}, err + } + + now := time.Now() + seedPool := buildRecommendationSeeds(now, watchlist) + if len(seedPool) == 0 { + return domain.CatalogSectionData{Animes: []domain.Anime{}}, nil + } + + seedAnimes, err := s.fetchSeedAnimes(ctx, seedPool) + if err != nil { + return domain.CatalogSectionData{}, err + } + + profile := buildTasteProfile(now, seedPool, seedAnimes) + store := newCandidateStore(watchlist) + + if err := s.collectCollaborativeCandidates(ctx, seedPool, store); err != nil { + return domain.CatalogSectionData{}, err + } + if err := s.collectProfileSearchCandidates(ctx, profile, store); err != nil { + return domain.CatalogSectionData{}, err + } + + ranked := store.ranked() + if len(ranked) == 0 { + return domain.CatalogSectionData{Animes: []domain.Anime{}}, nil + } + + candidates, err := s.scoreRankedCandidates(ctx, now, profile, ranked) + if err != nil { + return domain.CatalogSectionData{}, err + } + return domain.CatalogSectionData{ Animes: rerankRecommendationCandidates(candidates, resultLimit), }, nil