From 14cb3b3a0ff16741fd3bfa25d7648c5ff6072e17 Mon Sep 17 00:00:00 2001 From: mkelvers Date: Sun, 19 Apr 2026 01:11:27 +0200 Subject: [PATCH] perf(playback): speed up watch data build --- internal/features/playback/service.go | 545 +++++++++++++++++++------- internal/features/playback/types.go | 25 +- internal/server/routes.go | 2 +- 3 files changed, 416 insertions(+), 156 deletions(-) diff --git a/internal/features/playback/service.go b/internal/features/playback/service.go index ab72865..7283bbc 100644 --- a/internal/features/playback/service.go +++ b/internal/features/playback/service.go @@ -14,16 +14,24 @@ import ( "sort" "strconv" "strings" + "sync" "time" +) - "mal/internal/jikan" +const ( + showResolutionCacheTTL = 12 * time.Hour + playbackDataCacheTTL = 2 * time.Minute + providerProbeTimeout = 3 * time.Second ) type Service struct { allAnimeClient *allAnimeClient - jikanClient *jikan.Client httpClient *http.Client db database.Querier + + cacheMu sync.RWMutex + showResolution map[int]showResolutionCacheItem + playbackDataCache map[string]playbackDataCacheItem } type sourceScore struct { @@ -35,12 +43,53 @@ type sourceScore struct { refererScore int } -func NewService(jikanClient *jikan.Client, db database.Querier) *Service { +type showResolutionCacheItem struct { + ShowID string + Title string + ExpiresAt time.Time +} + +type playbackDataCacheItem struct { + Data playbackBaseData + ExpiresAt time.Time +} + +type playbackBaseData struct { + Title string + AvailableModes []string + ModeSources map[string]ModeSource + Segments []SkipSegment +} + +type modeSourceResult struct { + Mode string + Source ModeSource + OK bool +} + +type searchModeResult struct { + Mode string + Results []searchResult + Err error +} + +type directProbeResult struct { + Playable bool + ContentType string +} + +type userPlaybackState struct { + CurrentStatus string + StartTimeSeconds float64 +} + +func NewService(db database.Querier) *Service { return &Service{ - allAnimeClient: newAllAnimeClient(), - jikanClient: jikanClient, - httpClient: &http.Client{Timeout: 12 * time.Second}, - db: db, + allAnimeClient: newAllAnimeClient(), + httpClient: &http.Client{Timeout: 12 * time.Second}, + db: db, + showResolution: make(map[int]showResolutionCacheItem), + playbackDataCache: make(map[string]playbackDataCacheItem), } } @@ -59,101 +108,293 @@ func (s *Service) BuildWatchPageData(ctx context.Context, malID int, title strin normalizedEpisode = "1" } - showID, resolvedTitle, err := s.resolveShow(ctx, malID, title) - if err != nil { - return WatchPageData{}, err - } + userStateCh := s.fetchUserPlaybackStateAsync(ctx, userID, malID, normalizedEpisode) - modeSources := make(map[string]ModeSource) - for _, sourceMode := range []string{"dub", "sub"} { - resolved, resolveErr := s.resolveModeSource(ctx, showID, normalizedEpisode, sourceMode, "best") - if resolveErr != nil { - continue + cacheKey := playbackDataCacheKey(malID, normalizedEpisode) + baseData, cacheHit := s.getPlaybackBaseDataCache(cacheKey) + if !cacheHit { + showID, resolvedTitle, err := s.resolveShowCached(ctx, malID, title) + if err != nil { + return WatchPageData{}, err } - if strings.ToLower(resolved.Type) == "embed" { - continue + modeSources, segments := s.fetchPlaybackSourcesAndSegments(ctx, showID, malID, normalizedEpisode) + if len(modeSources) == 0 { + return WatchPageData{}, errors.New("no direct playable sources available") } - modeSources[sourceMode] = ModeSource{ - URL: resolved.URL, - Referer: resolved.Referer, - Subtitles: toSubtitleItems(resolved), + watchTitle := strings.TrimSpace(resolvedTitle) + if watchTitle == "" { + watchTitle = strings.TrimSpace(title) } + if watchTitle == "" { + watchTitle = fmt.Sprintf("MAL #%d", malID) + } + + baseData = playbackBaseData{ + Title: watchTitle, + AvailableModes: availableModes(modeSources), + ModeSources: modeSources, + Segments: segments, + } + + s.setPlaybackBaseDataCache(cacheKey, baseData) } - if len(modeSources) == 0 { - return WatchPageData{}, errors.New("no direct playable sources available") + initialMode := selectInitialMode(normalizedMode, baseData.ModeSources) + + userState := userPlaybackState{} + if userStateCh != nil { + userState = <-userStateCh } - availableModes := availableModes(modeSources) - initialMode := selectInitialMode(normalizedMode, modeSources) + return WatchPageData{ + MalID: malID, + Title: baseData.Title, + CurrentEpisode: normalizedEpisode, + StartTimeSeconds: userState.StartTimeSeconds, + CurrentStatus: userState.CurrentStatus, + InitialMode: initialMode, + AvailableModes: cloneStringSlice(baseData.AvailableModes), + ModeSources: cloneModeSources(baseData.ModeSources), + Segments: cloneSegments(baseData.Segments), + }, nil +} - episodes := s.fetchEpisodeList(ctx, malID) - if len(episodes) == 0 { - episodeNumbers := s.fetchModeEpisodes(ctx, showID, initialMode) - episodes = fallbackEpisodeList(episodeNumbers) +func playbackDataCacheKey(malID int, episode string) string { + return fmt.Sprintf("%d:%s", malID, episode) +} + +func (s *Service) fetchUserPlaybackStateAsync(ctx context.Context, userID string, malID int, episode string) <-chan userPlaybackState { + if userID == "" || s.db == nil { + return nil } - segments := s.fetchSkipSegments(ctx, malID, normalizedEpisode) + resultCh := make(chan userPlaybackState, 1) + go func() { + state := userPlaybackState{} - currentStatus := "" - startTimeSeconds := 0.0 - if userID != "" && s.db != nil { entry, err := s.db.GetWatchListEntry(ctx, database.GetWatchListEntryParams{ UserID: userID, AnimeID: int64(malID), }) if err == nil { - currentStatus = entry.Status - if entry.CurrentEpisode.Valid && strconv.FormatInt(entry.CurrentEpisode.Int64, 10) == normalizedEpisode && entry.CurrentTimeSeconds > 0 { - startTimeSeconds = entry.CurrentTimeSeconds + state.CurrentStatus = entry.Status + if entry.CurrentEpisode.Valid && strconv.FormatInt(entry.CurrentEpisode.Int64, 10) == episode && entry.CurrentTimeSeconds > 0 { + state.StartTimeSeconds = entry.CurrentTimeSeconds } } - if startTimeSeconds <= 0 { + if state.StartTimeSeconds <= 0 { continueEntry, continueErr := s.db.GetContinueWatchingEntry(ctx, database.GetContinueWatchingEntryParams{ UserID: userID, AnimeID: int64(malID), }) - if continueErr == nil && continueEntry.CurrentEpisode.Valid && strconv.FormatInt(continueEntry.CurrentEpisode.Int64, 10) == normalizedEpisode && continueEntry.CurrentTimeSeconds > 0 { - startTimeSeconds = continueEntry.CurrentTimeSeconds + if continueErr == nil && continueEntry.CurrentEpisode.Valid && strconv.FormatInt(continueEntry.CurrentEpisode.Int64, 10) == episode && continueEntry.CurrentTimeSeconds > 0 { + state.StartTimeSeconds = continueEntry.CurrentTimeSeconds } } + + resultCh <- state + }() + + return resultCh +} + +func (s *Service) getPlaybackBaseDataCache(key string) (playbackBaseData, bool) { + now := time.Now() + + s.cacheMu.RLock() + item, ok := s.playbackDataCache[key] + s.cacheMu.RUnlock() + if !ok { + return playbackBaseData{}, false + } + + if now.After(item.ExpiresAt) { + s.cacheMu.Lock() + current, exists := s.playbackDataCache[key] + if exists && time.Now().After(current.ExpiresAt) { + delete(s.playbackDataCache, key) + } + s.cacheMu.Unlock() + return playbackBaseData{}, false + } + + return clonePlaybackBaseData(item.Data), true +} + +func (s *Service) setPlaybackBaseDataCache(key string, data playbackBaseData) { + s.cacheMu.Lock() + s.playbackDataCache[key] = playbackDataCacheItem{ + Data: clonePlaybackBaseData(data), + ExpiresAt: time.Now().Add(playbackDataCacheTTL), + } + s.cacheMu.Unlock() +} + +func (s *Service) resolveShowCached(ctx context.Context, malID int, title string) (string, string, error) { + now := time.Now() + + s.cacheMu.RLock() + item, ok := s.showResolution[malID] + s.cacheMu.RUnlock() + + if ok && now.Before(item.ExpiresAt) && strings.TrimSpace(item.ShowID) != "" { + return item.ShowID, item.Title, nil + } + + showID, resolvedTitle, err := s.resolveShow(ctx, malID, title) + if err != nil { + return "", "", err + } + + s.cacheMu.Lock() + s.showResolution[malID] = showResolutionCacheItem{ + ShowID: showID, + Title: resolvedTitle, + ExpiresAt: time.Now().Add(showResolutionCacheTTL), + } + s.cacheMu.Unlock() + + return showID, resolvedTitle, nil +} + +func (s *Service) fetchPlaybackSourcesAndSegments(ctx context.Context, showID string, malID int, episode string) (map[string]ModeSource, []SkipSegment) { + modeCh := make(chan modeSourceResult, 2) + probeCache := make(map[string]directProbeResult) + probeCacheMu := sync.Mutex{} + + for _, mode := range []string{"dub", "sub"} { + modeValue := mode + go func() { + resolved, err := s.resolveModeSourceWithCache(ctx, showID, episode, modeValue, "best", probeCache, &probeCacheMu) + if err != nil { + modeCh <- modeSourceResult{Mode: modeValue, OK: false} + return + } + + if strings.ToLower(resolved.Type) == "embed" { + modeCh <- modeSourceResult{Mode: modeValue, OK: false} + return + } + + modeCh <- modeSourceResult{ + Mode: modeValue, + Source: ModeSource{ + URL: resolved.URL, + Referer: resolved.Referer, + Subtitles: toSubtitleItems(resolved), + }, + OK: true, + } + }() + } + + segmentsCh := make(chan []SkipSegment, 1) + go func() { + segmentsCh <- s.fetchSkipSegments(ctx, malID, episode) + }() + + modeSources := make(map[string]ModeSource) + for range 2 { + result := <-modeCh + if !result.OK { + continue + } + modeSources[result.Mode] = result.Source + } + + segments := <-segmentsCh + return modeSources, segments +} + +func clonePlaybackBaseData(data playbackBaseData) playbackBaseData { + return playbackBaseData{ + Title: data.Title, + AvailableModes: cloneStringSlice(data.AvailableModes), + ModeSources: cloneModeSources(data.ModeSources), + Segments: cloneSegments(data.Segments), + } +} + +func cloneStringSlice(items []string) []string { + if len(items) == 0 { + return nil + } + + cloned := make([]string, len(items)) + copy(cloned, items) + return cloned +} + +func cloneModeSources(modeSources map[string]ModeSource) map[string]ModeSource { + if len(modeSources) == 0 { + return nil + } + + cloned := make(map[string]ModeSource, len(modeSources)) + for mode, source := range modeSources { + cloned[mode] = ModeSource{ + URL: source.URL, + Referer: source.Referer, + Subtitles: cloneSubtitleItems(source.Subtitles), + } } - watchTitle := strings.TrimSpace(resolvedTitle) - if watchTitle == "" { - watchTitle = strings.TrimSpace(title) - } - if watchTitle == "" { - watchTitle = fmt.Sprintf("MAL #%d", malID) + return cloned +} + +func cloneSubtitleItems(items []SubtitleItem) []SubtitleItem { + if len(items) == 0 { + return nil } - return WatchPageData{ - MalID: malID, - Title: watchTitle, - CurrentEpisode: normalizedEpisode, - StartTimeSeconds: startTimeSeconds, - CurrentStatus: currentStatus, - InitialMode: initialMode, - AvailableModes: availableModes, - ModeSources: modeSources, - Episodes: episodes, - Segments: segments, - }, nil + cloned := make([]SubtitleItem, len(items)) + copy(cloned, items) + return cloned +} + +func cloneSegments(segments []SkipSegment) []SkipSegment { + if len(segments) == 0 { + return nil + } + + cloned := make([]SkipSegment, len(segments)) + copy(cloned, segments) + return cloned } func (s *Service) resolveShow(ctx context.Context, malID int, title string) (string, string, error) { malText := strconv.Itoa(malID) modeCandidates := []string{"sub", "dub"} + + resultsByMode := make(map[string][]searchResult, len(modeCandidates)) + searchCh := make(chan searchModeResult, len(modeCandidates)) + + var wg sync.WaitGroup for _, mode := range modeCandidates { - results, err := s.allAnimeClient.Search(ctx, title, mode) - if err != nil { + modeValue := mode + wg.Add(1) + go func() { + defer wg.Done() + results, err := s.allAnimeClient.Search(ctx, title, modeValue) + searchCh <- searchModeResult{Mode: modeValue, Results: results, Err: err} + }() + } + + wg.Wait() + close(searchCh) + + for result := range searchCh { + if result.Err != nil { continue } + resultsByMode[result.Mode] = result.Results + } - for _, result := range results { + for _, mode := range modeCandidates { + for _, result := range resultsByMode[mode] { if strings.TrimSpace(result.MalID) == malText && strings.TrimSpace(result.ID) != "" { return result.ID, result.Name, nil } @@ -162,8 +403,8 @@ func (s *Service) resolveShow(ctx context.Context, malID int, title string) (str if strings.TrimSpace(title) != "" { for _, mode := range modeCandidates { - results, err := s.allAnimeClient.Search(ctx, title, mode) - if err != nil || len(results) == 0 { + results := resultsByMode[mode] + if len(results) == 0 { continue } @@ -196,6 +437,33 @@ func (s *Service) resolveModeSource(ctx context.Context, showID string, episode return selected, nil } +func (s *Service) resolveModeSourceWithCache( + ctx context.Context, + showID string, + episode string, + mode string, + quality string, + probeCache map[string]directProbeResult, + probeCacheMu *sync.Mutex, +) (StreamSource, error) { + sources, err := s.allAnimeClient.GetEpisodeSources(ctx, showID, episode, mode) + if err != nil { + return StreamSource{}, err + } + + ranked, err := rankSources(sources, quality) + if err != nil { + return StreamSource{}, err + } + + selected, _, err := s.choosePlaybackSourceWithCache(ctx, ranked, probeCache, probeCacheMu) + if err != nil { + return StreamSource{}, err + } + + return selected, nil +} + func (s *Service) choosePlaybackSource(ctx context.Context, ranked []sourceScore) (StreamSource, string, error) { if len(ranked) == 0 { return StreamSource{}, "", errors.New("no ranked sources available") @@ -232,8 +500,79 @@ func (s *Service) choosePlaybackSource(ctx context.Context, ranked []sourceScore return ranked[0].source, "ranked-fallback", nil } +func (s *Service) choosePlaybackSourceWithCache( + ctx context.Context, + ranked []sourceScore, + probeCache map[string]directProbeResult, + probeCacheMu *sync.Mutex, +) (StreamSource, string, error) { + if len(ranked) == 0 { + return StreamSource{}, "", errors.New("no ranked sources available") + } + + embedCandidates := make([]StreamSource, 0) + for _, candidate := range ranked { + source := candidate.source + sourceType := strings.ToLower(source.Type) + + switch sourceType { + case "mp4", "m3u8": + return source, "direct-media", nil + case "embed": + embedCandidates = append(embedCandidates, source) + default: + playable, contentType := s.probeDirectMediaCached(ctx, source, probeCache, probeCacheMu) + if playable { + return normalizeSourceTypeFromProbe(source, contentType), "probed-media", nil + } + } + } + + for _, embed := range embedCandidates { + if s.probeEmbedSource(ctx, embed) { + return embed, "embed-probed", nil + } + } + + if len(embedCandidates) > 0 { + return embedCandidates[0], "embed-fallback", nil + } + + return ranked[0].source, "ranked-fallback", nil +} + +func (s *Service) probeDirectMediaCached( + ctx context.Context, + source StreamSource, + probeCache map[string]directProbeResult, + probeCacheMu *sync.Mutex, +) (bool, string) { + cacheKey := strings.TrimSpace(source.URL) + if cacheKey == "" { + return s.probeDirectMedia(ctx, source) + } + + probeCacheMu.Lock() + cached, ok := probeCache[cacheKey] + probeCacheMu.Unlock() + if ok { + return cached.Playable, cached.ContentType + } + + playable, contentType := s.probeDirectMedia(ctx, source) + + probeCacheMu.Lock() + probeCache[cacheKey] = directProbeResult{Playable: playable, ContentType: contentType} + probeCacheMu.Unlock() + + return playable, contentType +} + func (s *Service) probeDirectMedia(ctx context.Context, source StreamSource) (bool, string) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, source.URL, nil) + probeCtx, cancel := context.WithTimeout(ctx, providerProbeTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, source.URL, nil) if err != nil { return false, "" } @@ -278,7 +617,10 @@ func (s *Service) probeDirectMedia(ctx context.Context, source StreamSource) (bo } func (s *Service) probeEmbedSource(ctx context.Context, source StreamSource) bool { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, source.URL, nil) + probeCtx, cancel := context.WithTimeout(ctx, providerProbeTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, source.URL, nil) if err != nil { return false } @@ -388,59 +730,6 @@ func (s *Service) fetchSkipSegments(ctx context.Context, malID int, episode stri return segments } -func (s *Service) fetchEpisodeList(ctx context.Context, malID int) []EpisodeListItem { - if malID <= 0 { - return nil - } - - items := make([]EpisodeListItem, 0) - for page := 1; page <= 20; page++ { - result, err := s.jikanClient.GetEpisodes(ctx, malID, page) - if err != nil { - return items - } - - for _, episode := range result.Data { - if episode.MalID <= 0 { - continue - } - - items = append(items, EpisodeListItem{ - Number: strconv.Itoa(episode.MalID), - Title: strings.TrimSpace(episode.Title), - Filler: episode.Filler, - Recap: episode.Recap, - Order: episode.MalID, - }) - } - - if !result.Pagination.HasNextPage { - break - } - } - - return items -} - -func (s *Service) fetchModeEpisodes(ctx context.Context, showID string, mode string) []string { - episodes, err := s.allAnimeClient.GetEpisodes(ctx, showID, mode) - if err == nil && len(episodes) > 0 { - return episodes - } - - fallbackMode := "sub" - if mode == "sub" { - fallbackMode = "dub" - } - - fallbackEpisodes, fallbackErr := s.allAnimeClient.GetEpisodes(ctx, showID, fallbackMode) - if fallbackErr != nil { - return nil - } - - return fallbackEpisodes -} - func (s *Service) ProxyStream(ctx context.Context, targetURL string, referer string, rangeHeader string) (int, http.Header, []byte, io.ReadCloser, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, targetURL, nil) if err != nil { @@ -481,26 +770,6 @@ func (s *Service) ProxyStream(ctx context.Context, targetURL string, referer str return resp.StatusCode, headers, nil, resp.Body, nil } -func fallbackEpisodeList(episodeNumbers []string) []EpisodeListItem { - items := make([]EpisodeListItem, 0, len(episodeNumbers)) - for idx, number := range episodeNumbers { - trimmed := strings.TrimSpace(number) - if trimmed == "" { - continue - } - - items = append(items, EpisodeListItem{ - Number: trimmed, - Title: "", - Filler: false, - Recap: false, - Order: idx + 1, - }) - } - - return items -} - func normalizeMode(raw string) string { lower := strings.ToLower(strings.TrimSpace(raw)) if lower == "sub" || lower == "dub" { diff --git a/internal/features/playback/types.go b/internal/features/playback/types.go index b1c916c..8ac7159 100644 --- a/internal/features/playback/types.go +++ b/internal/features/playback/types.go @@ -26,14 +26,6 @@ type SubtitleItem struct { Referer string `json:"referer"` } -type EpisodeListItem struct { - Number string `json:"number"` - Title string `json:"title"` - Filler bool `json:"filler"` - Recap bool `json:"recap"` - Order int `json:"order"` -} - type SkipSegment struct { Type string `json:"type"` Start float64 `json:"start"` @@ -41,14 +33,13 @@ type SkipSegment struct { } type WatchPageData struct { - MalID int - Title string - CurrentEpisode string + MalID int + Title string + CurrentEpisode string StartTimeSeconds float64 - CurrentStatus string - InitialMode string - AvailableModes []string - ModeSources map[string]ModeSource - Episodes []EpisodeListItem - Segments []SkipSegment + CurrentStatus string + InitialMode string + AvailableModes []string + ModeSources map[string]ModeSource + Segments []SkipSegment } diff --git a/internal/server/routes.go b/internal/server/routes.go index 95e36ce..fb2013e 100644 --- a/internal/server/routes.go +++ b/internal/server/routes.go @@ -28,7 +28,7 @@ func NewRouter(cfg Config) http.Handler { animeSvc := anime.NewService(cfg.JikanClient, cfg.DB) animeHandler := anime.NewHandler(animeSvc) - playbackSvc := playback.NewService(cfg.JikanClient, cfg.DB) + playbackSvc := playback.NewService(cfg.DB) playbackHandler := playback.NewHandler(playbackSvc, cfg.JikanClient) // Serve static files