404 lines
10 KiB
Go
404 lines
10 KiB
Go
package playback
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"mal/internal/db"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/golang-lru/v2"
|
|
)
|
|
|
|
const (
|
|
providerProbeTimeout = 3 * time.Second
|
|
)
|
|
|
|
type Service struct {
|
|
allAnimeClient *allAnimeClient
|
|
httpClient *http.Client
|
|
sqlDB *sql.DB
|
|
db db.Querier
|
|
proxyTokens *proxyTokenSigner
|
|
proxyHostCache *lru.Cache[string, proxyHostCacheItem]
|
|
|
|
cacheMu sync.RWMutex
|
|
showResolution *lru.Cache[int, showResolutionCacheItem]
|
|
playbackDataCache *lru.Cache[string, playbackDataCacheItem]
|
|
}
|
|
|
|
type Config struct {
|
|
ProxyTokenSecret string
|
|
}
|
|
|
|
type sourceScore struct {
|
|
source StreamSource
|
|
total int
|
|
typeScore int
|
|
providerScore int
|
|
qualityScore int
|
|
refererScore int
|
|
}
|
|
|
|
type showResolutionCacheItem struct {
|
|
ShowID string
|
|
Title string
|
|
}
|
|
|
|
type playbackDataCacheItem struct {
|
|
Data playbackBaseData
|
|
}
|
|
|
|
type playbackBaseData struct {
|
|
Title string
|
|
AvailableModes []string
|
|
ModeSources map[string]ModeSource
|
|
Segments []SkipSegment
|
|
FallbackEpisodes map[string]int
|
|
}
|
|
|
|
type modeSourceResult struct {
|
|
Mode string
|
|
Source ModeSource
|
|
OK bool
|
|
}
|
|
|
|
type searchModeResult struct {
|
|
Mode string
|
|
Results []searchResult
|
|
Err error
|
|
}
|
|
|
|
type directProbeResult struct {
|
|
Playable bool
|
|
ContentType string
|
|
}
|
|
|
|
type proxyHostCacheItem struct {
|
|
Allowed bool
|
|
}
|
|
|
|
type userPlaybackState struct {
|
|
CurrentStatus string
|
|
StartTimeSeconds float64
|
|
}
|
|
|
|
// NewService initializes the playback service with db and sql connections.
|
|
func NewService(db db.Querier, sqlDB *sql.DB, cfg Config) (*Service, error) {
|
|
proxyTokens, err := newProxyTokenSigner(cfg.ProxyTokenSecret)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize proxy token signer: %w", err)
|
|
}
|
|
|
|
showResolution, err := lru.New[int, showResolutionCacheItem](5000)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
playbackDataCache, err := lru.New[string, playbackDataCacheItem](500)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
proxyHostCache, err := lru.New[string, proxyHostCacheItem](1000)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Service{
|
|
allAnimeClient: newAllAnimeClient(),
|
|
httpClient: &http.Client{Timeout: 12 * time.Second},
|
|
sqlDB: sqlDB,
|
|
db: db,
|
|
proxyTokens: proxyTokens,
|
|
proxyHostCache: proxyHostCache,
|
|
showResolution: showResolution,
|
|
playbackDataCache: playbackDataCache,
|
|
}, nil
|
|
}
|
|
|
|
// BuildWatchPageData resolves show metadata and sources for a given MAL ID and episode.
|
|
func (s *Service) BuildWatchPageData(ctx context.Context, malID int, titleCandidates []string, episode string, mode string, userID string) (WatchPageData, error) {
|
|
if malID <= 0 {
|
|
return WatchPageData{}, errors.New("invalid mal id")
|
|
}
|
|
|
|
normalizedMode := normalizeMode(mode)
|
|
if normalizedMode == "" {
|
|
normalizedMode = "dub"
|
|
}
|
|
|
|
normalizedEpisode := strings.TrimSpace(episode)
|
|
if normalizedEpisode == "" {
|
|
normalizedEpisode = "1"
|
|
}
|
|
|
|
userStateCh := s.fetchUserPlaybackStateAsync(ctx, userID, malID, normalizedEpisode)
|
|
|
|
cacheKey := playbackDataCacheKey(malID, normalizedEpisode)
|
|
baseData, cacheHit := s.getPlaybackBaseDataCache(cacheKey)
|
|
if !cacheHit {
|
|
showID, resolvedTitle, err := s.resolveShowCached(ctx, malID, titleCandidates)
|
|
if err != nil {
|
|
return WatchPageData{}, err
|
|
}
|
|
|
|
modeSources, segments := s.fetchPlaybackSourcesAndSegments(ctx, showID, malID, normalizedEpisode)
|
|
if len(modeSources) == 0 {
|
|
return WatchPageData{}, errors.New("no direct playable sources available")
|
|
}
|
|
|
|
fallbackEpisodes := make(map[string]int)
|
|
if counts, err := s.allAnimeClient.GetAvailableEpisodes(ctx, showID); err == nil {
|
|
fallbackEpisodes["sub"] = len(counts.Sub)
|
|
fallbackEpisodes["dub"] = len(counts.Dub)
|
|
fallbackEpisodes["raw"] = len(counts.Raw)
|
|
}
|
|
|
|
watchTitle := strings.TrimSpace(resolvedTitle)
|
|
if watchTitle == "" {
|
|
watchTitle = firstNonEmptyTitle(titleCandidates)
|
|
}
|
|
if watchTitle == "" {
|
|
watchTitle = fmt.Sprintf("MAL #%d", malID)
|
|
}
|
|
|
|
baseData = playbackBaseData{
|
|
Title: watchTitle,
|
|
AvailableModes: availableModes(modeSources),
|
|
ModeSources: modeSources,
|
|
Segments: segments,
|
|
FallbackEpisodes: fallbackEpisodes,
|
|
}
|
|
|
|
s.setPlaybackBaseDataCache(cacheKey, baseData)
|
|
}
|
|
|
|
initialMode := selectInitialMode(normalizedMode, baseData.ModeSources)
|
|
|
|
clientModeSources, err := s.buildClientModeSources(baseData.ModeSources)
|
|
if err != nil {
|
|
return WatchPageData{}, err
|
|
}
|
|
|
|
if _, ok := clientModeSources[initialMode]; !ok {
|
|
return WatchPageData{}, errors.New("stream mode unavailable")
|
|
}
|
|
|
|
segments := baseData.Segments
|
|
if segments == nil {
|
|
segments = []SkipSegment{}
|
|
}
|
|
|
|
userState := userPlaybackState{}
|
|
if userStateCh != nil {
|
|
userState = <-userStateCh
|
|
}
|
|
|
|
return WatchPageData{
|
|
MalID: malID,
|
|
Title: baseData.Title,
|
|
CurrentEpisode: normalizedEpisode,
|
|
StartTimeSeconds: userState.StartTimeSeconds,
|
|
CurrentStatus: userState.CurrentStatus,
|
|
InitialMode: initialMode,
|
|
AvailableModes: cloneSlice(baseData.AvailableModes),
|
|
ModeSources: clientModeSources,
|
|
Segments: cloneSlice(segments),
|
|
FallbackEpisodes: baseData.FallbackEpisodes,
|
|
}, nil
|
|
}
|
|
|
|
func playbackDataCacheKey(malID int, episode string) string {
|
|
return fmt.Sprintf("%d:%s", malID, episode)
|
|
}
|
|
|
|
func (s *Service) fetchUserPlaybackStateAsync(ctx context.Context, userID string, malID int, episode string) <-chan userPlaybackState {
|
|
if userID == "" || s.db == nil {
|
|
return nil
|
|
}
|
|
|
|
resultCh := make(chan userPlaybackState, 1)
|
|
go func() {
|
|
state := userPlaybackState{}
|
|
|
|
entry, err := s.db.GetWatchListEntry(ctx, db.GetWatchListEntryParams{
|
|
UserID: userID,
|
|
AnimeID: int64(malID),
|
|
})
|
|
if err == nil {
|
|
state.CurrentStatus = entry.Status
|
|
if entry.CurrentEpisode.Valid && strconv.FormatInt(entry.CurrentEpisode.Int64, 10) == episode && entry.CurrentTimeSeconds > 0 {
|
|
state.StartTimeSeconds = entry.CurrentTimeSeconds
|
|
}
|
|
}
|
|
|
|
if state.StartTimeSeconds <= 0 {
|
|
continueEntry, continueErr := s.db.GetContinueWatchingEntry(ctx, db.GetContinueWatchingEntryParams{
|
|
UserID: userID,
|
|
AnimeID: int64(malID),
|
|
})
|
|
if continueErr == nil && continueEntry.CurrentEpisode.Valid && strconv.FormatInt(continueEntry.CurrentEpisode.Int64, 10) == episode && continueEntry.CurrentTimeSeconds > 0 {
|
|
state.StartTimeSeconds = continueEntry.CurrentTimeSeconds
|
|
}
|
|
}
|
|
|
|
resultCh <- state
|
|
}()
|
|
|
|
return resultCh
|
|
}
|
|
|
|
func (s *Service) getPlaybackBaseDataCache(key string) (playbackBaseData, bool) {
|
|
item, ok := s.playbackDataCache.Get(key)
|
|
if !ok {
|
|
return playbackBaseData{}, false
|
|
}
|
|
return clonePlaybackBaseData(item.Data), true
|
|
}
|
|
|
|
func (s *Service) setPlaybackBaseDataCache(key string, data playbackBaseData) {
|
|
s.playbackDataCache.Add(key, playbackDataCacheItem{
|
|
Data: clonePlaybackBaseData(data),
|
|
})
|
|
}
|
|
|
|
func (s *Service) resolveShowCached(ctx context.Context, malID int, titleCandidates []string) (string, string, error) {
|
|
if item, ok := s.showResolution.Get(malID); ok && strings.TrimSpace(item.ShowID) != "" {
|
|
return item.ShowID, item.Title, nil
|
|
}
|
|
|
|
showID, resolvedTitle, err := s.resolveShow(ctx, malID, titleCandidates)
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
|
|
s.showResolution.Add(malID, showResolutionCacheItem{
|
|
ShowID: showID,
|
|
Title: resolvedTitle,
|
|
})
|
|
|
|
return showID, resolvedTitle, nil
|
|
}
|
|
|
|
// fetchPlaybackSourcesAndSegments resolves sources for both dub and sub modes concurrently.
|
|
func (s *Service) fetchPlaybackSourcesAndSegments(ctx context.Context, showID string, malID int, episode string) (map[string]ModeSource, []SkipSegment) {
|
|
modeCh := make(chan modeSourceResult, 2)
|
|
probeCache := make(map[string]directProbeResult)
|
|
probeCacheMu := sync.Mutex{}
|
|
|
|
// parallel fetch for both modes
|
|
for _, mode := range []string{"dub", "sub"} {
|
|
modeValue := mode
|
|
go func() {
|
|
resolved, err := s.resolveModeSourceWithCache(ctx, showID, episode, modeValue, "best", probeCache, &probeCacheMu)
|
|
if err != nil {
|
|
log.Printf("playback source resolution failed for mode=%s showID=%s episode=%s: %v", modeValue, showID, episode, err)
|
|
modeCh <- modeSourceResult{Mode: modeValue, OK: false}
|
|
return
|
|
}
|
|
|
|
if strings.ToLower(resolved.Type) == "embed" {
|
|
modeCh <- modeSourceResult{Mode: modeValue, OK: false}
|
|
return
|
|
}
|
|
|
|
modeCh <- modeSourceResult{
|
|
Mode: modeValue,
|
|
Source: ModeSource{
|
|
URL: resolved.URL,
|
|
Referer: resolved.Referer,
|
|
Subtitles: toSubtitleItems(resolved),
|
|
Qualities: toQualities(resolved.AvailableQualities),
|
|
},
|
|
OK: true,
|
|
}
|
|
}()
|
|
}
|
|
|
|
segmentsCh := make(chan []SkipSegment, 1)
|
|
go func() {
|
|
segmentsCh <- s.fetchSkipSegments(ctx, malID, episode)
|
|
}()
|
|
|
|
modeSources := make(map[string]ModeSource)
|
|
// collect results from both mode goroutines
|
|
for range 2 {
|
|
result := <-modeCh
|
|
if !result.OK {
|
|
continue
|
|
}
|
|
modeSources[result.Mode] = result.Source
|
|
}
|
|
|
|
segments := <-segmentsCh
|
|
return modeSources, segments
|
|
}
|
|
|
|
func clonePlaybackBaseData(data playbackBaseData) playbackBaseData {
|
|
return playbackBaseData{
|
|
Title: data.Title,
|
|
AvailableModes: cloneSlice(data.AvailableModes),
|
|
ModeSources: cloneModeSources(data.ModeSources),
|
|
Segments: cloneSlice(data.Segments),
|
|
FallbackEpisodes: data.FallbackEpisodes,
|
|
}
|
|
}
|
|
|
|
// GetEpisodeMetadata fetches episode notes and thumbnails from AllAnime.
|
|
func (s *Service) GetEpisodeMetadata(ctx context.Context, malID int, episode string) (map[string]any, error) {
|
|
showID, _, err := s.resolveShowCached(ctx, malID, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.allAnimeClient.GetEpisodeMetadata(ctx, showID, episode)
|
|
}
|
|
|
|
func toQualities(sources []StreamSource) []string {
|
|
seen := make(map[string]struct{})
|
|
var qualities []string
|
|
for _, s := range sources {
|
|
q := strings.TrimSpace(s.Quality)
|
|
if q == "" || q == "auto" {
|
|
continue
|
|
}
|
|
if _, ok := seen[q]; !ok {
|
|
seen[q] = struct{}{}
|
|
qualities = append(qualities, q)
|
|
}
|
|
}
|
|
return qualities
|
|
}
|
|
|
|
func cloneSlice[T any](items []T) []T {
|
|
if items == nil {
|
|
return []T{}
|
|
}
|
|
if len(items) == 0 {
|
|
return []T{}
|
|
}
|
|
cloned := make([]T, len(items))
|
|
copy(cloned, items)
|
|
return cloned
|
|
}
|
|
|
|
func cloneModeSources(modeSources map[string]ModeSource) map[string]ModeSource {
|
|
if len(modeSources) == 0 {
|
|
return nil
|
|
}
|
|
cloned := make(map[string]ModeSource, len(modeSources))
|
|
for mode, source := range modeSources {
|
|
cloned[mode] = ModeSource{
|
|
URL: source.URL,
|
|
Referer: source.Referer,
|
|
Subtitles: cloneSlice(source.Subtitles),
|
|
Qualities: cloneSlice(source.Qualities),
|
|
}
|
|
}
|
|
return cloned
|
|
}
|