feat: add proxy transport with dns caching, subtitle handler, and stream pre-warm
This commit is contained in:
@@ -5,8 +5,11 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"mal/internal/domain"
|
"mal/internal/domain"
|
||||||
|
"mal/pkg/net/proxytransport"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
@@ -14,10 +17,19 @@ import (
|
|||||||
type PlaybackHandler struct {
|
type PlaybackHandler struct {
|
||||||
svc domain.PlaybackService
|
svc domain.PlaybackService
|
||||||
animeSvc domain.AnimeService
|
animeSvc domain.AnimeService
|
||||||
|
|
||||||
|
proxyClient *http.Client
|
||||||
|
streamingClient *http.Client
|
||||||
|
subtitleCache sync.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPlaybackHandler(svc domain.PlaybackService, animeSvc domain.AnimeService) *PlaybackHandler {
|
func NewPlaybackHandler(svc domain.PlaybackService, animeSvc domain.AnimeService) *PlaybackHandler {
|
||||||
return &PlaybackHandler{svc: svc, animeSvc: animeSvc}
|
return &PlaybackHandler{
|
||||||
|
svc: svc,
|
||||||
|
animeSvc: animeSvc,
|
||||||
|
proxyClient: proxytransport.NewClient(),
|
||||||
|
streamingClient: proxytransport.NewStreamingClient(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *PlaybackHandler) Register(r *gin.Engine) {
|
func (h *PlaybackHandler) Register(r *gin.Engine) {
|
||||||
@@ -26,6 +38,7 @@ func (h *PlaybackHandler) Register(r *gin.Engine) {
|
|||||||
r.POST("/api/watch-progress", h.HandleSaveProgress)
|
r.POST("/api/watch-progress", h.HandleSaveProgress)
|
||||||
r.GET("/api/watch/thumbnails/:animeId", h.HandleEpisodeThumbnails)
|
r.GET("/api/watch/thumbnails/:animeId", h.HandleEpisodeThumbnails)
|
||||||
r.GET("/watch/proxy/stream", h.HandleProxyStream)
|
r.GET("/watch/proxy/stream", h.HandleProxyStream)
|
||||||
|
r.GET("/watch/proxy/subtitle", h.HandleProxySubtitle)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *PlaybackHandler) HandleWatchPage(c *gin.Context) {
|
func (h *PlaybackHandler) HandleWatchPage(c *gin.Context) {
|
||||||
@@ -170,9 +183,9 @@ func (h *PlaybackHandler) HandleProxyStream(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0")
|
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0")
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
resp, err := h.streamingClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("proxy fetch error: %v", err)
|
log.Printf("proxy stream fetch error: %v", err)
|
||||||
c.Status(http.StatusBadGateway)
|
c.Status(http.StatusBadGateway)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -184,3 +197,77 @@ func (h *PlaybackHandler) HandleProxyStream(c *gin.Context) {
|
|||||||
c.Status(resp.StatusCode)
|
c.Status(resp.StatusCode)
|
||||||
_, _ = io.Copy(c.Writer, resp.Body)
|
_, _ = io.Copy(c.Writer, resp.Body)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type cachedSubtitle struct {
|
||||||
|
data []byte
|
||||||
|
contentType string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *PlaybackHandler) HandleProxySubtitle(c *gin.Context) {
|
||||||
|
token := c.Query("token")
|
||||||
|
if token == "" {
|
||||||
|
c.Status(http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
targetURL, referer, err := h.svc.ResolveProxyToken(token)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("proxy subtitle token error: %v", err)
|
||||||
|
c.Status(http.StatusForbidden)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if cached, ok := h.subtitleCache.Load(targetURL); ok {
|
||||||
|
entry := cached.(cachedSubtitle)
|
||||||
|
c.Data(http.StatusOK, entry.contentType, entry.data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(c.Request.Context(), http.MethodGet, targetURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
c.Status(http.StatusBadGateway)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if referer != "" {
|
||||||
|
req.Header.Set("Referer", referer)
|
||||||
|
}
|
||||||
|
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0")
|
||||||
|
|
||||||
|
resp, err := h.proxyClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("proxy subtitle fetch error: %v", err)
|
||||||
|
c.Status(http.StatusBadGateway)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
|
||||||
|
body, err := io.ReadAll(io.LimitReader(resp.Body, 2*1024*1024))
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("proxy subtitle read error: %v", err)
|
||||||
|
c.Status(http.StatusBadGateway)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
contentType := resp.Header.Get("Content-Type")
|
||||||
|
if contentType == "" {
|
||||||
|
contentType = detectSubtitleType(targetURL)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.subtitleCache.Store(targetURL, cachedSubtitle{data: body, contentType: contentType})
|
||||||
|
|
||||||
|
c.Data(http.StatusOK, contentType, body)
|
||||||
|
}
|
||||||
|
|
||||||
|
func detectSubtitleType(url string) string {
|
||||||
|
lower := strings.ToLower(url)
|
||||||
|
switch {
|
||||||
|
case strings.Contains(lower, ".vtt"):
|
||||||
|
return "text/vtt"
|
||||||
|
case strings.Contains(lower, ".srt"):
|
||||||
|
return "text/plain; charset=utf-8"
|
||||||
|
case strings.Contains(lower, ".ass") || strings.Contains(lower, ".ssa"):
|
||||||
|
return "text/plain; charset=utf-8"
|
||||||
|
default:
|
||||||
|
return "text/plain; charset=utf-8"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -222,9 +222,10 @@ func (s *playbackService) BuildWatchData(ctx context.Context, animeID int, title
|
|||||||
|
|
||||||
var subtitleItems []SubtitleItem
|
var subtitleItems []SubtitleItem
|
||||||
for _, sub := range result.Subtitles {
|
for _, sub := range result.Subtitles {
|
||||||
|
subToken, _ := s.SignProxyToken(sub.URL, result.Referer, "subtitle")
|
||||||
subtitleItems = append(subtitleItems, SubtitleItem{
|
subtitleItems = append(subtitleItems, SubtitleItem{
|
||||||
Lang: sub.Label,
|
Lang: sub.Label,
|
||||||
URL: sub.URL,
|
Token: subToken,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -239,6 +240,8 @@ func (s *playbackService) BuildWatchData(ctx context.Context, animeID int, title
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go s.warmStreamURL(result.URL, result.Referer)
|
||||||
|
|
||||||
// 6. Resolve relations/seasons
|
// 6. Resolve relations/seasons
|
||||||
relations, _ := s.jikan.GetFullRelations(ctx, animeID)
|
relations, _ := s.jikan.GetFullRelations(ctx, animeID)
|
||||||
type SeasonEntry struct {
|
type SeasonEntry struct {
|
||||||
@@ -368,3 +371,24 @@ func (s *playbackService) fetchSkipSegments(ctx context.Context, malID int, epis
|
|||||||
|
|
||||||
return segments
|
return segments
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *playbackService) warmStreamURL(targetURL, referer string) {
|
||||||
|
req, err := http.NewRequest(http.MethodGet, targetURL, nil)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if referer != "" {
|
||||||
|
req.Header.Set("Referer", referer)
|
||||||
|
}
|
||||||
|
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
resp, err := s.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
}
|
||||||
|
|||||||
86
pkg/net/proxytransport/transport.go
Normal file
86
pkg/net/proxytransport/transport.go
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
package proxytransport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var dnsCache sync.Map
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
time.Sleep(5 * time.Minute)
|
||||||
|
dnsCache.Range(func(key, _ any) bool {
|
||||||
|
dnsCache.Delete(key)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTransport(dialTimeout, tlsTimeout, headerTimeout time.Duration) *http.Transport {
|
||||||
|
return &http.Transport{
|
||||||
|
MaxIdleConns: 100,
|
||||||
|
MaxIdleConnsPerHost: 10,
|
||||||
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
TLSHandshakeTimeout: tlsTimeout,
|
||||||
|
ResponseHeaderTimeout: headerTimeout,
|
||||||
|
ExpectContinueTimeout: 1 * time.Second,
|
||||||
|
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||||
|
host, port, err := net.SplitHostPort(addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ips, ok := dnsCache.Load(host)
|
||||||
|
if !ok {
|
||||||
|
resolved, err := net.DefaultResolver.LookupIPAddr(ctx, host)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("proxy dns lookup: %w", err)
|
||||||
|
}
|
||||||
|
dnsCache.Store(host, resolved)
|
||||||
|
ips = resolved
|
||||||
|
}
|
||||||
|
|
||||||
|
return dialIPs(ctx, network, host, port, ips.([]net.IPAddr), dialTimeout)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func dialIPs(ctx context.Context, network, host, port string, ips []net.IPAddr, timeout time.Duration) (net.Conn, error) {
|
||||||
|
var firstErr error
|
||||||
|
for _, ip := range ips {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
dialer := net.Dialer{Timeout: timeout}
|
||||||
|
conn, err := dialer.DialContext(ctx, network, net.JoinHostPort(ip.String(), port))
|
||||||
|
if err == nil {
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
if firstErr == nil {
|
||||||
|
firstErr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("proxy dial %s: %w", host, firstErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClient() *http.Client {
|
||||||
|
return &http.Client{
|
||||||
|
Transport: newTransport(10*time.Second, 10*time.Second, 30*time.Second),
|
||||||
|
Timeout: 60 * time.Second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStreamingClient() *http.Client {
|
||||||
|
return &http.Client{
|
||||||
|
Transport: newTransport(10*time.Second, 10*time.Second, 15*time.Second),
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user