Files
mal/internal/streaming/hls.go
Mikkel Elvers a25e8f1655 feat: torrent streaming with hls transcoding (#1)
* feat: add ffmpeg for hls streaming

* feat: torrent streaming with hls transcoding

- add nyaa.si torrent search client
- add streaming service using anacrolix/torrent
- add hls transcoding via ffmpeg for browser playback
- add watch page with episode selection
- add socks5 proxy support via TORRENT_PROXY env
- switch to modernc.org/sqlite (pure go, no cgo conflicts)
- update dockerfile with ffmpeg
2026-04-07 13:23:08 +02:00

249 lines
5.7 KiB
Go

package streaming
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"path/filepath"
"sync"
"time"
)
// HLSTranscoder manages on-demand HLS transcoding sessions
type HLSTranscoder struct {
logger *slog.Logger
sessions map[string]*HLSSession
mu sync.RWMutex
baseDir string
}
// HLSSession represents an active transcoding session
type HLSSession struct {
InfoHash string
OutputDir string
Playlist string
cmd *exec.Cmd
cancel context.CancelFunc
ready chan struct{}
err error
lastAccess time.Time
mu sync.Mutex
}
// NewHLSTranscoder creates a new HLS transcoder
func NewHLSTranscoder(logger *slog.Logger) (*HLSTranscoder, error) {
// Check ffmpeg is available
if _, err := exec.LookPath("ffmpeg"); err != nil {
return nil, fmt.Errorf("ffmpeg not found in PATH: %w", err)
}
baseDir := filepath.Join(os.TempDir(), "mal-hls")
if err := os.MkdirAll(baseDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create HLS temp dir: %w", err)
}
t := &HLSTranscoder{
logger: logger,
sessions: make(map[string]*HLSSession),
baseDir: baseDir,
}
// Start cleanup goroutine
go t.cleanupLoop()
return t, nil
}
// StartSessionWithReader starts transcoding from a reader (piped input) to HLS
func (t *HLSTranscoder) StartSessionWithReader(infoHash string, reader io.Reader) (*HLSSession, error) {
t.mu.Lock()
defer t.mu.Unlock()
// Return existing session if available
if session, ok := t.sessions[infoHash]; ok {
session.mu.Lock()
session.lastAccess = time.Now()
session.mu.Unlock()
return session, nil
}
// Create output directory
outputDir := filepath.Join(t.baseDir, infoHash)
if err := os.MkdirAll(outputDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create output dir: %w", err)
}
playlist := filepath.Join(outputDir, "stream.m3u8")
ctx, cancel := context.WithCancel(context.Background())
session := &HLSSession{
InfoHash: infoHash,
OutputDir: outputDir,
Playlist: playlist,
cancel: cancel,
ready: make(chan struct{}),
lastAccess: time.Now(),
}
// Start ffmpeg with pipe input
// Use browser-compatible H.264 baseline/main profile and AAC-LC
cmd := exec.CommandContext(ctx, "ffmpeg",
"-i", "pipe:0", // Read from stdin
// Video: H.264 with browser-compatible settings
"-c:v", "libx264",
"-preset", "veryfast",
"-crf", "23",
"-profile:v", "main",
"-level", "4.0",
"-pix_fmt", "yuv420p",
// Audio: AAC-LC stereo
"-c:a", "aac",
"-b:a", "128k",
"-ac", "2",
"-ar", "44100",
// HLS output
"-f", "hls",
"-hls_time", "4",
"-hls_list_size", "0",
"-hls_flags", "append_list+independent_segments",
"-hls_segment_type", "mpegts",
"-hls_segment_filename", filepath.Join(outputDir, "segment_%03d.ts"),
"-start_number", "0",
playlist,
)
// Pipe the reader to ffmpeg's stdin
cmd.Stdin = reader
session.cmd = cmd
// Capture stderr for debugging
cmd.Stderr = &ffmpegLogger{logger: t.logger, infoHash: infoHash}
if err := cmd.Start(); err != nil {
cancel()
os.RemoveAll(outputDir)
return nil, fmt.Errorf("failed to start ffmpeg: %w", err)
}
t.sessions[infoHash] = session
// Wait for playlist to be created
go func() {
defer close(session.ready)
for i := 0; i < 120; i++ { // Wait up to 60 seconds
if _, err := os.Stat(playlist); err == nil {
// Check if at least one segment exists
segments, _ := filepath.Glob(filepath.Join(outputDir, "segment_*.ts"))
if len(segments) > 0 {
t.logger.Info("HLS segments ready", "hash", infoHash, "segments", len(segments))
return
}
}
time.Sleep(500 * time.Millisecond)
}
session.err = fmt.Errorf("timeout waiting for HLS segments")
cancel()
}()
// Monitor process
go func() {
err := cmd.Wait()
if err != nil && ctx.Err() == nil {
t.logger.Error("ffmpeg exited with error", "hash", infoHash, "error", err)
} else {
t.logger.Info("ffmpeg finished", "hash", infoHash)
}
}()
return session, nil
}
// GetSession returns an existing session
func (t *HLSTranscoder) GetSession(infoHash string) (*HLSSession, bool) {
t.mu.RLock()
defer t.mu.RUnlock()
session, ok := t.sessions[infoHash]
if ok {
session.mu.Lock()
session.lastAccess = time.Now()
session.mu.Unlock()
}
return session, ok
}
// StopSession stops a transcoding session
func (t *HLSTranscoder) StopSession(infoHash string) {
t.mu.Lock()
defer t.mu.Unlock()
if session, ok := t.sessions[infoHash]; ok {
session.cancel()
os.RemoveAll(session.OutputDir)
delete(t.sessions, infoHash)
}
}
// WaitReady waits for the session to have segments ready
func (s *HLSSession) WaitReady(ctx context.Context) error {
select {
case <-s.ready:
return s.err
case <-ctx.Done():
return ctx.Err()
}
}
// cleanupLoop removes stale sessions
func (t *HLSTranscoder) cleanupLoop() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
t.mu.Lock()
now := time.Now()
for hash, session := range t.sessions {
session.mu.Lock()
if now.Sub(session.lastAccess) > 10*time.Minute {
session.cancel()
os.RemoveAll(session.OutputDir)
delete(t.sessions, hash)
t.logger.Info("cleaned up stale HLS session", "hash", hash)
}
session.mu.Unlock()
}
t.mu.Unlock()
}
}
// Shutdown stops all sessions
func (t *HLSTranscoder) Shutdown() {
t.mu.Lock()
defer t.mu.Unlock()
for hash, session := range t.sessions {
session.cancel()
os.RemoveAll(session.OutputDir)
delete(t.sessions, hash)
}
os.RemoveAll(t.baseDir)
}
// ffmpegLogger logs ffmpeg stderr output
type ffmpegLogger struct {
logger *slog.Logger
infoHash string
}
func (l *ffmpegLogger) Write(p []byte) (n int, err error) {
l.logger.Debug("ffmpeg", "hash", l.infoHash, "output", string(p))
return len(p), nil
}