diff --git a/internal/playback/progress.go b/internal/playback/progress.go new file mode 100644 index 0000000..e9750de --- /dev/null +++ b/internal/playback/progress.go @@ -0,0 +1,150 @@ +package playback + +import ( + "context" + "database/sql" + "encoding/json" + "strconv" + + "github.com/google/uuid" + + "mal/internal/db" + "mal/internal/domain" + "mal/internal/observability" +) + +func (s *playbackService) loadWatchProgress(ctx context.Context, userID string, animeID int, totalEpisodes int, episode string) (float64, string, []int64) { + if userID == "" { + return 0, "", nil + } + + entry, err := s.repo.GetWatchListEntry(ctx, db.GetWatchListEntryParams{ + UserID: userID, + AnimeID: int64(animeID), + }) + + watchlistStatus := "" + var watchlistIDs []int64 + startTime := 0.0 + if err == nil { + watchlistStatus = entry.Status + watchlistIDs = []int64{entry.AnimeID} + if resumeTimeForEpisode(entry.CurrentEpisode, entry.CurrentTimeSeconds, totalEpisodes, episode) > 0 { + startTime = entry.CurrentTimeSeconds + } + } + + if startTime > 0 { + return startTime, watchlistStatus, watchlistIDs + } + + cwEntry, err := s.repo.GetContinueWatchingEntry(ctx, db.GetContinueWatchingEntryParams{ + UserID: userID, + AnimeID: int64(animeID), + }) + if err == nil { + startTime = resumeTimeForEpisode(cwEntry.CurrentEpisode, cwEntry.CurrentTimeSeconds, totalEpisodes, episode) + } + + return startTime, watchlistStatus, watchlistIDs +} + +func resumeTimeForEpisode(currentEpisode sql.NullInt64, currentTimeSeconds float64, totalEpisodes int, requestedEpisode string) float64 { + if !currentEpisode.Valid { + return 0 + } + + if strconv.FormatInt(currentEpisode.Int64, 10) == requestedEpisode { + return currentTimeSeconds + } + + if totalEpisodes > 0 && requestedEpisode == strconv.Itoa(totalEpisodes) && currentEpisode.Int64 == int64(totalEpisodes) { + return currentTimeSeconds + } + + return 0 +} + +func (s *playbackService) CompleteAnime(ctx context.Context, userID string, animeID int64) error { + if err := s.repo.InTx(ctx, func(txCtx context.Context, repo domain.PlaybackRepository) error { + entry, err := repo.GetWatchListEntry(txCtx, db.GetWatchListEntryParams{ + UserID: userID, + AnimeID: animeID, + }) + if err != nil || entry.Status != "completed" { + _, err = repo.UpsertWatchListEntry(txCtx, db.UpsertWatchListEntryParams{ + ID: uuid.New().String(), + UserID: userID, + AnimeID: animeID, + Status: "completed", + CurrentEpisode: entry.CurrentEpisode, + CurrentTimeSeconds: entry.CurrentTimeSeconds, + }) + if err != nil { + return err + } + } + + return nil + }); err != nil { + return err + } + + if err := s.auditSvc.Record(ctx, domain.AuditEvent{ + UserID: userID, + Action: "watch_completed", + ResourceType: "anime", + ResourceID: strconv.FormatInt(animeID, 10), + }); err != nil { + observability.Warn( + "audit_record_failed", + "playback", + "", + map[string]any{"user_id": userID, "anime_id": animeID, "action": "watch_completed"}, + err, + ) + } + return nil +} + +func (s *playbackService) SaveProgress(ctx context.Context, userID string, animeID int64, episode int, timeSeconds float64) error { + _, err := s.repo.UpsertContinueWatchingEntry(ctx, db.UpsertContinueWatchingEntryParams{ + ID: uuid.New().String(), + UserID: userID, + AnimeID: animeID, + CurrentEpisode: sql.NullInt64{Int64: int64(episode), Valid: true}, + CurrentTimeSeconds: timeSeconds, + DurationSeconds: sql.NullFloat64{Valid: false}, + }) + if err != nil { + return err + } + + metadataBytes, marshalErr := json.Marshal(struct { + Episode int `json:"episode"` + TimeSeconds float64 `json:"time_seconds"` + }{Episode: episode, TimeSeconds: timeSeconds}) + if marshalErr == nil { + _ = s.auditSvc.Record(ctx, domain.AuditEvent{ + UserID: userID, + Action: "watch_progress_saved", + ResourceType: "anime", + ResourceID: strconv.FormatInt(animeID, 10), + MetadataJSON: metadataBytes, + }) + } else { + _ = s.auditSvc.Record(ctx, domain.AuditEvent{ + UserID: userID, + Action: "watch_progress_saved", + ResourceType: "anime", + ResourceID: strconv.FormatInt(animeID, 10), + }) + } + observability.Info("watch_progress_saved", "playback", "", map[string]any{ + "anime_id": animeID, + "episode": episode, + "time_seconds": timeSeconds, + "user_id": userID, + }) + return nil +} diff --git a/internal/playback/service.go b/internal/playback/service.go index 1aee006..ac1f2c4 100644 --- a/internal/playback/service.go +++ b/internal/playback/service.go @@ -3,19 +3,12 @@ package playback import ( "context" - "database/sql" - "encoding/json" "fmt" "mal/integrations/jikan" - "mal/internal/db" "mal/internal/domain" - "mal/internal/observability" netutil "mal/pkg/net" "net/http" - "strconv" "time" - - "github.com/google/uuid" ) type playbackService struct { @@ -65,142 +58,6 @@ func (s *playbackService) ResolveProxyToken(token string, scope string) (string, return target.targetURL, target.referer, nil } -func (s *playbackService) loadWatchProgress(ctx context.Context, userID string, animeID int, totalEpisodes int, episode string) (float64, string, []int64) { - if userID == "" { - return 0, "", nil - } - - entry, err := s.repo.GetWatchListEntry(ctx, db.GetWatchListEntryParams{ - UserID: userID, - AnimeID: int64(animeID), - }) - - watchlistStatus := "" - var watchlistIDs []int64 - startTime := 0.0 - if err == nil { - watchlistStatus = entry.Status - watchlistIDs = []int64{entry.AnimeID} - if resumeTimeForEpisode(entry.CurrentEpisode, entry.CurrentTimeSeconds, totalEpisodes, episode) > 0 { - startTime = entry.CurrentTimeSeconds - } - } - - if startTime > 0 { - return startTime, watchlistStatus, watchlistIDs - } - - cwEntry, err := s.repo.GetContinueWatchingEntry(ctx, db.GetContinueWatchingEntryParams{ - UserID: userID, - AnimeID: int64(animeID), - }) - if err == nil { - startTime = resumeTimeForEpisode(cwEntry.CurrentEpisode, cwEntry.CurrentTimeSeconds, totalEpisodes, episode) - } - - return startTime, watchlistStatus, watchlistIDs -} - -func resumeTimeForEpisode(currentEpisode sql.NullInt64, currentTimeSeconds float64, totalEpisodes int, requestedEpisode string) float64 { - if !currentEpisode.Valid { - return 0 - } - - if strconv.FormatInt(currentEpisode.Int64, 10) == requestedEpisode { - return currentTimeSeconds - } - - if totalEpisodes > 0 && requestedEpisode == strconv.Itoa(totalEpisodes) && currentEpisode.Int64 == int64(totalEpisodes) { - return currentTimeSeconds - } - - return 0 -} - -func (s *playbackService) CompleteAnime(ctx context.Context, userID string, animeID int64) error { - if err := s.repo.InTx(ctx, func(txCtx context.Context, repo domain.PlaybackRepository) error { - entry, err := repo.GetWatchListEntry(txCtx, db.GetWatchListEntryParams{ - UserID: userID, - AnimeID: animeID, - }) - if err != nil || entry.Status != "completed" { - _, err = repo.UpsertWatchListEntry(txCtx, db.UpsertWatchListEntryParams{ - ID: uuid.New().String(), - UserID: userID, - AnimeID: animeID, - Status: "completed", - CurrentEpisode: entry.CurrentEpisode, - CurrentTimeSeconds: entry.CurrentTimeSeconds, - }) - if err != nil { - return err - } - } - - return nil - }); err != nil { - return err - } - - if err := s.auditSvc.Record(ctx, domain.AuditEvent{ - UserID: userID, - Action: "watch_completed", - ResourceType: "anime", - ResourceID: strconv.FormatInt(animeID, 10), - }); err != nil { - observability.Warn( - "audit_record_failed", - "playback", - "", - map[string]any{"user_id": userID, "anime_id": animeID, "action": "watch_completed"}, - err, - ) - } - return nil -} - -func (s *playbackService) SaveProgress(ctx context.Context, userID string, animeID int64, episode int, timeSeconds float64) error { - _, err := s.repo.UpsertContinueWatchingEntry(ctx, db.UpsertContinueWatchingEntryParams{ - ID: uuid.New().String(), - UserID: userID, - AnimeID: animeID, - CurrentEpisode: sql.NullInt64{Int64: int64(episode), Valid: true}, - CurrentTimeSeconds: timeSeconds, - DurationSeconds: sql.NullFloat64{Valid: false}, - }) - if err != nil { - return err - } - - metadataBytes, marshalErr := json.Marshal(struct { - Episode int `json:"episode"` - TimeSeconds float64 `json:"time_seconds"` - }{Episode: episode, TimeSeconds: timeSeconds}) - if marshalErr == nil { - _ = s.auditSvc.Record(ctx, domain.AuditEvent{ - UserID: userID, - Action: "watch_progress_saved", - ResourceType: "anime", - ResourceID: strconv.FormatInt(animeID, 10), - MetadataJSON: metadataBytes, - }) - } else { - _ = s.auditSvc.Record(ctx, domain.AuditEvent{ - UserID: userID, - Action: "watch_progress_saved", - ResourceType: "anime", - ResourceID: strconv.FormatInt(animeID, 10), - }) - } - observability.Info("watch_progress_saved", "playback", "", map[string]any{ - "anime_id": animeID, - "episode": episode, - "time_seconds": timeSeconds, - "user_id": userID, - }) - return nil -} - func (s *playbackService) warmStreamURL(targetURL, referer string) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel()