perf(playback): speed up watch data build

This commit is contained in:
2026-04-19 01:11:27 +02:00
parent 3384ac1ecc
commit 14cb3b3a0f
3 changed files with 416 additions and 156 deletions

View File

@@ -14,16 +14,24 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
)
"mal/internal/jikan"
const (
showResolutionCacheTTL = 12 * time.Hour
playbackDataCacheTTL = 2 * time.Minute
providerProbeTimeout = 3 * time.Second
)
type Service struct {
allAnimeClient *allAnimeClient
jikanClient *jikan.Client
httpClient *http.Client
db database.Querier
cacheMu sync.RWMutex
showResolution map[int]showResolutionCacheItem
playbackDataCache map[string]playbackDataCacheItem
}
type sourceScore struct {
@@ -35,12 +43,53 @@ type sourceScore struct {
refererScore int
}
func NewService(jikanClient *jikan.Client, db database.Querier) *Service {
type showResolutionCacheItem struct {
ShowID string
Title string
ExpiresAt time.Time
}
type playbackDataCacheItem struct {
Data playbackBaseData
ExpiresAt time.Time
}
type playbackBaseData struct {
Title string
AvailableModes []string
ModeSources map[string]ModeSource
Segments []SkipSegment
}
type modeSourceResult struct {
Mode string
Source ModeSource
OK bool
}
type searchModeResult struct {
Mode string
Results []searchResult
Err error
}
type directProbeResult struct {
Playable bool
ContentType string
}
type userPlaybackState struct {
CurrentStatus string
StartTimeSeconds float64
}
func NewService(db database.Querier) *Service {
return &Service{
allAnimeClient: newAllAnimeClient(),
jikanClient: jikanClient,
httpClient: &http.Client{Timeout: 12 * time.Second},
db: db,
allAnimeClient: newAllAnimeClient(),
httpClient: &http.Client{Timeout: 12 * time.Second},
db: db,
showResolution: make(map[int]showResolutionCacheItem),
playbackDataCache: make(map[string]playbackDataCacheItem),
}
}
@@ -59,101 +108,293 @@ func (s *Service) BuildWatchPageData(ctx context.Context, malID int, title strin
normalizedEpisode = "1"
}
showID, resolvedTitle, err := s.resolveShow(ctx, malID, title)
if err != nil {
return WatchPageData{}, err
}
userStateCh := s.fetchUserPlaybackStateAsync(ctx, userID, malID, normalizedEpisode)
modeSources := make(map[string]ModeSource)
for _, sourceMode := range []string{"dub", "sub"} {
resolved, resolveErr := s.resolveModeSource(ctx, showID, normalizedEpisode, sourceMode, "best")
if resolveErr != nil {
continue
cacheKey := playbackDataCacheKey(malID, normalizedEpisode)
baseData, cacheHit := s.getPlaybackBaseDataCache(cacheKey)
if !cacheHit {
showID, resolvedTitle, err := s.resolveShowCached(ctx, malID, title)
if err != nil {
return WatchPageData{}, err
}
if strings.ToLower(resolved.Type) == "embed" {
continue
modeSources, segments := s.fetchPlaybackSourcesAndSegments(ctx, showID, malID, normalizedEpisode)
if len(modeSources) == 0 {
return WatchPageData{}, errors.New("no direct playable sources available")
}
modeSources[sourceMode] = ModeSource{
URL: resolved.URL,
Referer: resolved.Referer,
Subtitles: toSubtitleItems(resolved),
watchTitle := strings.TrimSpace(resolvedTitle)
if watchTitle == "" {
watchTitle = strings.TrimSpace(title)
}
if watchTitle == "" {
watchTitle = fmt.Sprintf("MAL #%d", malID)
}
baseData = playbackBaseData{
Title: watchTitle,
AvailableModes: availableModes(modeSources),
ModeSources: modeSources,
Segments: segments,
}
s.setPlaybackBaseDataCache(cacheKey, baseData)
}
if len(modeSources) == 0 {
return WatchPageData{}, errors.New("no direct playable sources available")
initialMode := selectInitialMode(normalizedMode, baseData.ModeSources)
userState := userPlaybackState{}
if userStateCh != nil {
userState = <-userStateCh
}
availableModes := availableModes(modeSources)
initialMode := selectInitialMode(normalizedMode, modeSources)
return WatchPageData{
MalID: malID,
Title: baseData.Title,
CurrentEpisode: normalizedEpisode,
StartTimeSeconds: userState.StartTimeSeconds,
CurrentStatus: userState.CurrentStatus,
InitialMode: initialMode,
AvailableModes: cloneStringSlice(baseData.AvailableModes),
ModeSources: cloneModeSources(baseData.ModeSources),
Segments: cloneSegments(baseData.Segments),
}, nil
}
episodes := s.fetchEpisodeList(ctx, malID)
if len(episodes) == 0 {
episodeNumbers := s.fetchModeEpisodes(ctx, showID, initialMode)
episodes = fallbackEpisodeList(episodeNumbers)
func playbackDataCacheKey(malID int, episode string) string {
return fmt.Sprintf("%d:%s", malID, episode)
}
func (s *Service) fetchUserPlaybackStateAsync(ctx context.Context, userID string, malID int, episode string) <-chan userPlaybackState {
if userID == "" || s.db == nil {
return nil
}
segments := s.fetchSkipSegments(ctx, malID, normalizedEpisode)
resultCh := make(chan userPlaybackState, 1)
go func() {
state := userPlaybackState{}
currentStatus := ""
startTimeSeconds := 0.0
if userID != "" && s.db != nil {
entry, err := s.db.GetWatchListEntry(ctx, database.GetWatchListEntryParams{
UserID: userID,
AnimeID: int64(malID),
})
if err == nil {
currentStatus = entry.Status
if entry.CurrentEpisode.Valid && strconv.FormatInt(entry.CurrentEpisode.Int64, 10) == normalizedEpisode && entry.CurrentTimeSeconds > 0 {
startTimeSeconds = entry.CurrentTimeSeconds
state.CurrentStatus = entry.Status
if entry.CurrentEpisode.Valid && strconv.FormatInt(entry.CurrentEpisode.Int64, 10) == episode && entry.CurrentTimeSeconds > 0 {
state.StartTimeSeconds = entry.CurrentTimeSeconds
}
}
if startTimeSeconds <= 0 {
if state.StartTimeSeconds <= 0 {
continueEntry, continueErr := s.db.GetContinueWatchingEntry(ctx, database.GetContinueWatchingEntryParams{
UserID: userID,
AnimeID: int64(malID),
})
if continueErr == nil && continueEntry.CurrentEpisode.Valid && strconv.FormatInt(continueEntry.CurrentEpisode.Int64, 10) == normalizedEpisode && continueEntry.CurrentTimeSeconds > 0 {
startTimeSeconds = continueEntry.CurrentTimeSeconds
if continueErr == nil && continueEntry.CurrentEpisode.Valid && strconv.FormatInt(continueEntry.CurrentEpisode.Int64, 10) == episode && continueEntry.CurrentTimeSeconds > 0 {
state.StartTimeSeconds = continueEntry.CurrentTimeSeconds
}
}
resultCh <- state
}()
return resultCh
}
func (s *Service) getPlaybackBaseDataCache(key string) (playbackBaseData, bool) {
now := time.Now()
s.cacheMu.RLock()
item, ok := s.playbackDataCache[key]
s.cacheMu.RUnlock()
if !ok {
return playbackBaseData{}, false
}
if now.After(item.ExpiresAt) {
s.cacheMu.Lock()
current, exists := s.playbackDataCache[key]
if exists && time.Now().After(current.ExpiresAt) {
delete(s.playbackDataCache, key)
}
s.cacheMu.Unlock()
return playbackBaseData{}, false
}
return clonePlaybackBaseData(item.Data), true
}
func (s *Service) setPlaybackBaseDataCache(key string, data playbackBaseData) {
s.cacheMu.Lock()
s.playbackDataCache[key] = playbackDataCacheItem{
Data: clonePlaybackBaseData(data),
ExpiresAt: time.Now().Add(playbackDataCacheTTL),
}
s.cacheMu.Unlock()
}
func (s *Service) resolveShowCached(ctx context.Context, malID int, title string) (string, string, error) {
now := time.Now()
s.cacheMu.RLock()
item, ok := s.showResolution[malID]
s.cacheMu.RUnlock()
if ok && now.Before(item.ExpiresAt) && strings.TrimSpace(item.ShowID) != "" {
return item.ShowID, item.Title, nil
}
showID, resolvedTitle, err := s.resolveShow(ctx, malID, title)
if err != nil {
return "", "", err
}
s.cacheMu.Lock()
s.showResolution[malID] = showResolutionCacheItem{
ShowID: showID,
Title: resolvedTitle,
ExpiresAt: time.Now().Add(showResolutionCacheTTL),
}
s.cacheMu.Unlock()
return showID, resolvedTitle, nil
}
func (s *Service) fetchPlaybackSourcesAndSegments(ctx context.Context, showID string, malID int, episode string) (map[string]ModeSource, []SkipSegment) {
modeCh := make(chan modeSourceResult, 2)
probeCache := make(map[string]directProbeResult)
probeCacheMu := sync.Mutex{}
for _, mode := range []string{"dub", "sub"} {
modeValue := mode
go func() {
resolved, err := s.resolveModeSourceWithCache(ctx, showID, episode, modeValue, "best", probeCache, &probeCacheMu)
if err != nil {
modeCh <- modeSourceResult{Mode: modeValue, OK: false}
return
}
if strings.ToLower(resolved.Type) == "embed" {
modeCh <- modeSourceResult{Mode: modeValue, OK: false}
return
}
modeCh <- modeSourceResult{
Mode: modeValue,
Source: ModeSource{
URL: resolved.URL,
Referer: resolved.Referer,
Subtitles: toSubtitleItems(resolved),
},
OK: true,
}
}()
}
segmentsCh := make(chan []SkipSegment, 1)
go func() {
segmentsCh <- s.fetchSkipSegments(ctx, malID, episode)
}()
modeSources := make(map[string]ModeSource)
for range 2 {
result := <-modeCh
if !result.OK {
continue
}
modeSources[result.Mode] = result.Source
}
segments := <-segmentsCh
return modeSources, segments
}
func clonePlaybackBaseData(data playbackBaseData) playbackBaseData {
return playbackBaseData{
Title: data.Title,
AvailableModes: cloneStringSlice(data.AvailableModes),
ModeSources: cloneModeSources(data.ModeSources),
Segments: cloneSegments(data.Segments),
}
}
func cloneStringSlice(items []string) []string {
if len(items) == 0 {
return nil
}
cloned := make([]string, len(items))
copy(cloned, items)
return cloned
}
func cloneModeSources(modeSources map[string]ModeSource) map[string]ModeSource {
if len(modeSources) == 0 {
return nil
}
cloned := make(map[string]ModeSource, len(modeSources))
for mode, source := range modeSources {
cloned[mode] = ModeSource{
URL: source.URL,
Referer: source.Referer,
Subtitles: cloneSubtitleItems(source.Subtitles),
}
}
watchTitle := strings.TrimSpace(resolvedTitle)
if watchTitle == "" {
watchTitle = strings.TrimSpace(title)
}
if watchTitle == "" {
watchTitle = fmt.Sprintf("MAL #%d", malID)
return cloned
}
func cloneSubtitleItems(items []SubtitleItem) []SubtitleItem {
if len(items) == 0 {
return nil
}
return WatchPageData{
MalID: malID,
Title: watchTitle,
CurrentEpisode: normalizedEpisode,
StartTimeSeconds: startTimeSeconds,
CurrentStatus: currentStatus,
InitialMode: initialMode,
AvailableModes: availableModes,
ModeSources: modeSources,
Episodes: episodes,
Segments: segments,
}, nil
cloned := make([]SubtitleItem, len(items))
copy(cloned, items)
return cloned
}
func cloneSegments(segments []SkipSegment) []SkipSegment {
if len(segments) == 0 {
return nil
}
cloned := make([]SkipSegment, len(segments))
copy(cloned, segments)
return cloned
}
func (s *Service) resolveShow(ctx context.Context, malID int, title string) (string, string, error) {
malText := strconv.Itoa(malID)
modeCandidates := []string{"sub", "dub"}
resultsByMode := make(map[string][]searchResult, len(modeCandidates))
searchCh := make(chan searchModeResult, len(modeCandidates))
var wg sync.WaitGroup
for _, mode := range modeCandidates {
results, err := s.allAnimeClient.Search(ctx, title, mode)
if err != nil {
modeValue := mode
wg.Add(1)
go func() {
defer wg.Done()
results, err := s.allAnimeClient.Search(ctx, title, modeValue)
searchCh <- searchModeResult{Mode: modeValue, Results: results, Err: err}
}()
}
wg.Wait()
close(searchCh)
for result := range searchCh {
if result.Err != nil {
continue
}
resultsByMode[result.Mode] = result.Results
}
for _, result := range results {
for _, mode := range modeCandidates {
for _, result := range resultsByMode[mode] {
if strings.TrimSpace(result.MalID) == malText && strings.TrimSpace(result.ID) != "" {
return result.ID, result.Name, nil
}
@@ -162,8 +403,8 @@ func (s *Service) resolveShow(ctx context.Context, malID int, title string) (str
if strings.TrimSpace(title) != "" {
for _, mode := range modeCandidates {
results, err := s.allAnimeClient.Search(ctx, title, mode)
if err != nil || len(results) == 0 {
results := resultsByMode[mode]
if len(results) == 0 {
continue
}
@@ -196,6 +437,33 @@ func (s *Service) resolveModeSource(ctx context.Context, showID string, episode
return selected, nil
}
func (s *Service) resolveModeSourceWithCache(
ctx context.Context,
showID string,
episode string,
mode string,
quality string,
probeCache map[string]directProbeResult,
probeCacheMu *sync.Mutex,
) (StreamSource, error) {
sources, err := s.allAnimeClient.GetEpisodeSources(ctx, showID, episode, mode)
if err != nil {
return StreamSource{}, err
}
ranked, err := rankSources(sources, quality)
if err != nil {
return StreamSource{}, err
}
selected, _, err := s.choosePlaybackSourceWithCache(ctx, ranked, probeCache, probeCacheMu)
if err != nil {
return StreamSource{}, err
}
return selected, nil
}
func (s *Service) choosePlaybackSource(ctx context.Context, ranked []sourceScore) (StreamSource, string, error) {
if len(ranked) == 0 {
return StreamSource{}, "", errors.New("no ranked sources available")
@@ -232,8 +500,79 @@ func (s *Service) choosePlaybackSource(ctx context.Context, ranked []sourceScore
return ranked[0].source, "ranked-fallback", nil
}
func (s *Service) choosePlaybackSourceWithCache(
ctx context.Context,
ranked []sourceScore,
probeCache map[string]directProbeResult,
probeCacheMu *sync.Mutex,
) (StreamSource, string, error) {
if len(ranked) == 0 {
return StreamSource{}, "", errors.New("no ranked sources available")
}
embedCandidates := make([]StreamSource, 0)
for _, candidate := range ranked {
source := candidate.source
sourceType := strings.ToLower(source.Type)
switch sourceType {
case "mp4", "m3u8":
return source, "direct-media", nil
case "embed":
embedCandidates = append(embedCandidates, source)
default:
playable, contentType := s.probeDirectMediaCached(ctx, source, probeCache, probeCacheMu)
if playable {
return normalizeSourceTypeFromProbe(source, contentType), "probed-media", nil
}
}
}
for _, embed := range embedCandidates {
if s.probeEmbedSource(ctx, embed) {
return embed, "embed-probed", nil
}
}
if len(embedCandidates) > 0 {
return embedCandidates[0], "embed-fallback", nil
}
return ranked[0].source, "ranked-fallback", nil
}
func (s *Service) probeDirectMediaCached(
ctx context.Context,
source StreamSource,
probeCache map[string]directProbeResult,
probeCacheMu *sync.Mutex,
) (bool, string) {
cacheKey := strings.TrimSpace(source.URL)
if cacheKey == "" {
return s.probeDirectMedia(ctx, source)
}
probeCacheMu.Lock()
cached, ok := probeCache[cacheKey]
probeCacheMu.Unlock()
if ok {
return cached.Playable, cached.ContentType
}
playable, contentType := s.probeDirectMedia(ctx, source)
probeCacheMu.Lock()
probeCache[cacheKey] = directProbeResult{Playable: playable, ContentType: contentType}
probeCacheMu.Unlock()
return playable, contentType
}
func (s *Service) probeDirectMedia(ctx context.Context, source StreamSource) (bool, string) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, source.URL, nil)
probeCtx, cancel := context.WithTimeout(ctx, providerProbeTimeout)
defer cancel()
req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, source.URL, nil)
if err != nil {
return false, ""
}
@@ -278,7 +617,10 @@ func (s *Service) probeDirectMedia(ctx context.Context, source StreamSource) (bo
}
func (s *Service) probeEmbedSource(ctx context.Context, source StreamSource) bool {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, source.URL, nil)
probeCtx, cancel := context.WithTimeout(ctx, providerProbeTimeout)
defer cancel()
req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, source.URL, nil)
if err != nil {
return false
}
@@ -388,59 +730,6 @@ func (s *Service) fetchSkipSegments(ctx context.Context, malID int, episode stri
return segments
}
func (s *Service) fetchEpisodeList(ctx context.Context, malID int) []EpisodeListItem {
if malID <= 0 {
return nil
}
items := make([]EpisodeListItem, 0)
for page := 1; page <= 20; page++ {
result, err := s.jikanClient.GetEpisodes(ctx, malID, page)
if err != nil {
return items
}
for _, episode := range result.Data {
if episode.MalID <= 0 {
continue
}
items = append(items, EpisodeListItem{
Number: strconv.Itoa(episode.MalID),
Title: strings.TrimSpace(episode.Title),
Filler: episode.Filler,
Recap: episode.Recap,
Order: episode.MalID,
})
}
if !result.Pagination.HasNextPage {
break
}
}
return items
}
func (s *Service) fetchModeEpisodes(ctx context.Context, showID string, mode string) []string {
episodes, err := s.allAnimeClient.GetEpisodes(ctx, showID, mode)
if err == nil && len(episodes) > 0 {
return episodes
}
fallbackMode := "sub"
if mode == "sub" {
fallbackMode = "dub"
}
fallbackEpisodes, fallbackErr := s.allAnimeClient.GetEpisodes(ctx, showID, fallbackMode)
if fallbackErr != nil {
return nil
}
return fallbackEpisodes
}
func (s *Service) ProxyStream(ctx context.Context, targetURL string, referer string, rangeHeader string) (int, http.Header, []byte, io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, targetURL, nil)
if err != nil {
@@ -481,26 +770,6 @@ func (s *Service) ProxyStream(ctx context.Context, targetURL string, referer str
return resp.StatusCode, headers, nil, resp.Body, nil
}
func fallbackEpisodeList(episodeNumbers []string) []EpisodeListItem {
items := make([]EpisodeListItem, 0, len(episodeNumbers))
for idx, number := range episodeNumbers {
trimmed := strings.TrimSpace(number)
if trimmed == "" {
continue
}
items = append(items, EpisodeListItem{
Number: trimmed,
Title: "",
Filler: false,
Recap: false,
Order: idx + 1,
})
}
return items
}
func normalizeMode(raw string) string {
lower := strings.ToLower(strings.TrimSpace(raw))
if lower == "sub" || lower == "dub" {

View File

@@ -26,14 +26,6 @@ type SubtitleItem struct {
Referer string `json:"referer"`
}
type EpisodeListItem struct {
Number string `json:"number"`
Title string `json:"title"`
Filler bool `json:"filler"`
Recap bool `json:"recap"`
Order int `json:"order"`
}
type SkipSegment struct {
Type string `json:"type"`
Start float64 `json:"start"`
@@ -41,14 +33,13 @@ type SkipSegment struct {
}
type WatchPageData struct {
MalID int
Title string
CurrentEpisode string
MalID int
Title string
CurrentEpisode string
StartTimeSeconds float64
CurrentStatus string
InitialMode string
AvailableModes []string
ModeSources map[string]ModeSource
Episodes []EpisodeListItem
Segments []SkipSegment
CurrentStatus string
InitialMode string
AvailableModes []string
ModeSources map[string]ModeSource
Segments []SkipSegment
}

View File

@@ -28,7 +28,7 @@ func NewRouter(cfg Config) http.Handler {
animeSvc := anime.NewService(cfg.JikanClient, cfg.DB)
animeHandler := anime.NewHandler(animeSvc)
playbackSvc := playback.NewService(cfg.JikanClient, cfg.DB)
playbackSvc := playback.NewService(cfg.DB)
playbackHandler := playback.NewHandler(playbackSvc, cfg.JikanClient)
// Serve static files