refactor: update imports to use new db package
This commit is contained in:
@@ -289,7 +289,7 @@ func (h *Handler) HandleAnimeDetails(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
if user != nil {
|
if user != nil {
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
entry, err := h.service.db.GetWatchListEntry(gCtx, database.GetWatchListEntryParams{
|
entry, err := h.service.db.GetWatchListEntry(gCtx, db.GetWatchListEntryParams{
|
||||||
UserID: user.ID,
|
UserID: user.ID,
|
||||||
AnimeID: int64(id),
|
AnimeID: int64(id),
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -10,18 +10,18 @@ import (
|
|||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
jikanClient *jikan.Client
|
jikanClient *jikan.Client
|
||||||
db database.Querier
|
db db.Querier
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(jikanClient *jikan.Client, db database.Querier) *Service {
|
func NewService(jikanClient *jikan.Client, db db.Querier) *Service {
|
||||||
return &Service{jikanClient: jikanClient, db: db}
|
return &Service{jikanClient: jikanClient, db: db}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) GetCatalogSection(ctx context.Context, userID string, section string) (map[string]any, error) {
|
func (s *Service) GetCatalogSection(ctx context.Context, userID string, section string) (map[string]any, error) {
|
||||||
var (
|
var (
|
||||||
res jikan.TopAnimeResult
|
res jikan.TopAnimeResult
|
||||||
cw []database.GetContinueWatchingEntriesRow
|
cw []db.GetContinueWatchingEntriesRow
|
||||||
watchlist []database.GetUserWatchListRow
|
watchlist []db.GetUserWatchListRow
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -77,7 +77,7 @@ func (s *Service) GetCatalogSection(ctx context.Context, userID string, section
|
|||||||
func (s *Service) GetDiscoverSection(ctx context.Context, userID string, section string) (map[string]any, error) {
|
func (s *Service) GetDiscoverSection(ctx context.Context, userID string, section string) (map[string]any, error) {
|
||||||
var (
|
var (
|
||||||
res jikan.TopAnimeResult
|
res jikan.TopAnimeResult
|
||||||
watchlist []database.GetUserWatchListRow
|
watchlist []db.GetUserWatchListRow
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -24,10 +24,10 @@ var (
|
|||||||
const bcryptCost = 12
|
const bcryptCost = 12
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
db database.Querier
|
db db.Querier
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(db database.Querier) *Service {
|
func NewService(db db.Querier) *Service {
|
||||||
return &Service{db: db}
|
return &Service{db: db}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -43,7 +43,7 @@ func generateSessionToken() (string, error) {
|
|||||||
return generateToken(32)
|
return generateToken(32)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Login(ctx context.Context, username, password string) (*database.Session, error) {
|
func (s *Service) Login(ctx context.Context, username, password string) (*db.Session, error) {
|
||||||
user, err := s.db.GetUserByUsername(ctx, username)
|
user, err := s.db.GetUserByUsername(ctx, username)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
@@ -62,7 +62,7 @@ func (s *Service) Login(ctx context.Context, username, password string) (*databa
|
|||||||
}
|
}
|
||||||
|
|
||||||
expiresAt := time.Now().Add(30 * 24 * time.Hour) // 30 days
|
expiresAt := time.Now().Add(30 * 24 * time.Hour) // 30 days
|
||||||
session, err := s.db.CreateSession(ctx, database.CreateSessionParams{
|
session, err := s.db.CreateSession(ctx, db.CreateSessionParams{
|
||||||
ID: token,
|
ID: token,
|
||||||
UserID: user.ID,
|
UserID: user.ID,
|
||||||
ExpiresAt: expiresAt,
|
ExpiresAt: expiresAt,
|
||||||
@@ -74,7 +74,7 @@ func (s *Service) Login(ctx context.Context, username, password string) (*databa
|
|||||||
return &session, nil
|
return &session, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) ValidateSession(ctx context.Context, sessionID string) (*database.User, error) {
|
func (s *Service) ValidateSession(ctx context.Context, sessionID string) (*db.User, error) {
|
||||||
session, err := s.db.GetSession(ctx, sessionID)
|
session, err := s.db.GetSession(ctx, sessionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"mal/integrations/jikan"
|
"mal/integrations/jikan"
|
||||||
database "mal/internal/db"
|
"mal/internal/db"
|
||||||
"mal/internal/middleware"
|
"mal/internal/middleware"
|
||||||
"mal/templates"
|
"mal/templates"
|
||||||
)
|
)
|
||||||
@@ -79,7 +79,7 @@ func (h *Handler) HandleWatchPage(w http.ResponseWriter, r *http.Request) {
|
|||||||
currentEpID := r.URL.Query().Get("ep")
|
currentEpID := r.URL.Query().Get("ep")
|
||||||
if currentEpID == "" {
|
if currentEpID == "" {
|
||||||
if user != nil {
|
if user != nil {
|
||||||
entry, err := h.svc.db.GetWatchListEntry(r.Context(), database.GetWatchListEntryParams{
|
entry, err := h.svc.db.GetWatchListEntry(r.Context(), db.GetWatchListEntryParams{
|
||||||
UserID: user.ID,
|
UserID: user.ID,
|
||||||
AnimeID: int64(id),
|
AnimeID: int64(id),
|
||||||
})
|
})
|
||||||
@@ -269,9 +269,9 @@ func (h *Handler) HandleSaveProgress(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// We fetch the anime info to seed the DB if it's the first time saving progress for this show
|
// We fetch the anime info to seed the DB if it's the first time saving progress for this show
|
||||||
anime, err := h.jikanClient.GetAnimeByID(r.Context(), int(req.MalID))
|
anime, err := h.jikanClient.GetAnimeByID(r.Context(), int(req.MalID))
|
||||||
var seed *database.UpsertAnimeParams
|
var seed *db.UpsertAnimeParams
|
||||||
if err == nil {
|
if err == nil {
|
||||||
seed = &database.UpsertAnimeParams{
|
seed = &db.UpsertAnimeParams{
|
||||||
ID: int64(anime.MalID),
|
ID: int64(anime.MalID),
|
||||||
TitleOriginal: anime.Title,
|
TitleOriginal: anime.Title,
|
||||||
TitleEnglish: sql.NullString{String: anime.TitleEnglish, Valid: anime.TitleEnglish != ""},
|
TitleEnglish: sql.NullString{String: anime.TitleEnglish, Valid: anime.TitleEnglish != ""},
|
||||||
@@ -315,9 +315,9 @@ func (h *Handler) HandleCompleteAnime(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// Seed anime info if needed
|
// Seed anime info if needed
|
||||||
anime, err := h.jikanClient.GetAnimeByID(r.Context(), int(req.MalID))
|
anime, err := h.jikanClient.GetAnimeByID(r.Context(), int(req.MalID))
|
||||||
var seed *database.UpsertAnimeParams
|
var seed *db.UpsertAnimeParams
|
||||||
if err == nil {
|
if err == nil {
|
||||||
seed = &database.UpsertAnimeParams{
|
seed = &db.UpsertAnimeParams{
|
||||||
ID: int64(anime.MalID),
|
ID: int64(anime.MalID),
|
||||||
TitleOriginal: anime.Title,
|
TitleOriginal: anime.Title,
|
||||||
TitleEnglish: sql.NullString{String: anime.TitleEnglish, Valid: anime.TitleEnglish != ""},
|
TitleEnglish: sql.NullString{String: anime.TitleEnglish, Valid: anime.TitleEnglish != ""},
|
||||||
|
|||||||
@@ -12,12 +12,12 @@ import (
|
|||||||
"mal/internal/db"
|
"mal/internal/db"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Service) SaveProgress(ctx context.Context, userID string, animeID int64, episode int, timeSeconds float64, animeSeed *database.UpsertAnimeParams) error {
|
func (s *Service) SaveProgress(ctx context.Context, userID string, animeID int64, episode int, timeSeconds float64, animeSeed *db.UpsertAnimeParams) error {
|
||||||
if strings.TrimSpace(userID) == "" || animeID <= 0 || episode <= 0 {
|
if strings.TrimSpace(userID) == "" || animeID <= 0 || episode <= 0 {
|
||||||
return errors.New("invalid save progress input")
|
return errors.New("invalid save progress input")
|
||||||
}
|
}
|
||||||
|
|
||||||
txQueries, tx, err := database.BeginTx(ctx, s.sqlDB)
|
txQueries, tx, err := db.BeginTx(ctx, s.sqlDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -30,7 +30,7 @@ func (s *Service) SaveProgress(ctx context.Context, userID string, animeID int64
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
watchListEntry, watchListErr := txQueries.GetWatchListEntry(ctx, database.GetWatchListEntryParams{
|
watchListEntry, watchListErr := txQueries.GetWatchListEntry(ctx, db.GetWatchListEntryParams{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
AnimeID: animeID,
|
AnimeID: animeID,
|
||||||
})
|
})
|
||||||
@@ -40,7 +40,7 @@ func (s *Service) SaveProgress(ctx context.Context, userID string, animeID int64
|
|||||||
|
|
||||||
isCompleted := watchListErr == nil && watchListEntry.Status == "completed"
|
isCompleted := watchListErr == nil && watchListEntry.Status == "completed"
|
||||||
if !isCompleted {
|
if !isCompleted {
|
||||||
if err := txQueries.SaveWatchProgress(ctx, database.SaveWatchProgressParams{
|
if err := txQueries.SaveWatchProgress(ctx, db.SaveWatchProgressParams{
|
||||||
CurrentEpisode: sql.NullInt64{Int64: int64(episode), Valid: true},
|
CurrentEpisode: sql.NullInt64{Int64: int64(episode), Valid: true},
|
||||||
CurrentTimeSeconds: timeSeconds,
|
CurrentTimeSeconds: timeSeconds,
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
@@ -55,7 +55,7 @@ func (s *Service) SaveProgress(ctx context.Context, userID string, animeID int64
|
|||||||
durationSeconds = animeSeed.DurationSeconds
|
durationSeconds = animeSeed.DurationSeconds
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := txQueries.UpsertContinueWatchingEntry(ctx, database.UpsertContinueWatchingEntryParams{
|
if _, err := txQueries.UpsertContinueWatchingEntry(ctx, db.UpsertContinueWatchingEntryParams{
|
||||||
ID: uuid.New().String(),
|
ID: uuid.New().String(),
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
AnimeID: animeID,
|
AnimeID: animeID,
|
||||||
@@ -73,19 +73,19 @@ func (s *Service) SaveProgress(ctx context.Context, userID string, animeID int64
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) CompleteAnime(ctx context.Context, userID string, animeID int64, episode int, animeSeed *database.UpsertAnimeParams) error {
|
func (s *Service) CompleteAnime(ctx context.Context, userID string, animeID int64, episode int, animeSeed *db.UpsertAnimeParams) error {
|
||||||
if strings.TrimSpace(userID) == "" || animeID <= 0 || episode <= 0 {
|
if strings.TrimSpace(userID) == "" || animeID <= 0 || episode <= 0 {
|
||||||
return errors.New("invalid complete anime input")
|
return errors.New("invalid complete anime input")
|
||||||
}
|
}
|
||||||
|
|
||||||
txQueries, tx, err := database.BeginTx(ctx, s.sqlDB)
|
txQueries, tx, err := db.BeginTx(ctx, s.sqlDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer tx.Rollback()
|
defer tx.Rollback()
|
||||||
|
|
||||||
watchListEntry, watchListErr := txQueries.GetWatchListEntry(ctx, database.GetWatchListEntryParams{
|
watchListEntry, watchListErr := txQueries.GetWatchListEntry(ctx, db.GetWatchListEntryParams{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
AnimeID: animeID,
|
AnimeID: animeID,
|
||||||
})
|
})
|
||||||
@@ -102,7 +102,7 @@ func (s *Service) CompleteAnime(ctx context.Context, userID string, animeID int6
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := txQueries.UpsertWatchListEntry(ctx, database.UpsertWatchListEntryParams{
|
if _, err := txQueries.UpsertWatchListEntry(ctx, db.UpsertWatchListEntryParams{
|
||||||
ID: uuid.New().String(),
|
ID: uuid.New().String(),
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
AnimeID: animeID,
|
AnimeID: animeID,
|
||||||
@@ -113,7 +113,7 @@ func (s *Service) CompleteAnime(ctx context.Context, userID string, animeID int6
|
|||||||
return fmt.Errorf("failed to mark watchlist as completed: %w", err)
|
return fmt.Errorf("failed to mark watchlist as completed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := txQueries.SaveWatchProgress(ctx, database.SaveWatchProgressParams{
|
if err := txQueries.SaveWatchProgress(ctx, db.SaveWatchProgressParams{
|
||||||
CurrentEpisode: sql.NullInt64{Int64: 0, Valid: false},
|
CurrentEpisode: sql.NullInt64{Int64: 0, Valid: false},
|
||||||
CurrentTimeSeconds: 0,
|
CurrentTimeSeconds: 0,
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
@@ -123,7 +123,7 @@ func (s *Service) CompleteAnime(ctx context.Context, userID string, animeID int6
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := txQueries.DeleteContinueWatchingEntry(ctx, database.DeleteContinueWatchingEntryParams{
|
if err := txQueries.DeleteContinueWatchingEntry(ctx, db.DeleteContinueWatchingEntryParams{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
AnimeID: animeID,
|
AnimeID: animeID,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ type Service struct {
|
|||||||
allAnimeClient *allAnimeClient
|
allAnimeClient *allAnimeClient
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
sqlDB *sql.DB
|
sqlDB *sql.DB
|
||||||
db database.Querier
|
db db.Querier
|
||||||
proxyTokens *proxyTokenSigner
|
proxyTokens *proxyTokenSigner
|
||||||
proxyHostMu sync.RWMutex
|
proxyHostMu sync.RWMutex
|
||||||
proxyHostCache map[string]proxyHostCacheItem
|
proxyHostCache map[string]proxyHostCacheItem
|
||||||
@@ -93,7 +93,7 @@ type userPlaybackState struct {
|
|||||||
StartTimeSeconds float64
|
StartTimeSeconds float64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(db database.Querier, sqlDB *sql.DB, cfg Config) *Service {
|
func NewService(db db.Querier, sqlDB *sql.DB, cfg Config) *Service {
|
||||||
proxyTokens, err := newProxyTokenSigner(cfg.ProxyTokenSecret)
|
proxyTokens, err := newProxyTokenSigner(cfg.ProxyTokenSecret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("failed to initialize proxy token signer: %v", err))
|
panic(fmt.Sprintf("failed to initialize proxy token signer: %v", err))
|
||||||
@@ -215,7 +215,7 @@ func (s *Service) fetchUserPlaybackStateAsync(ctx context.Context, userID string
|
|||||||
go func() {
|
go func() {
|
||||||
state := userPlaybackState{}
|
state := userPlaybackState{}
|
||||||
|
|
||||||
entry, err := s.db.GetWatchListEntry(ctx, database.GetWatchListEntryParams{
|
entry, err := s.db.GetWatchListEntry(ctx, db.GetWatchListEntryParams{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
AnimeID: int64(malID),
|
AnimeID: int64(malID),
|
||||||
})
|
})
|
||||||
@@ -227,7 +227,7 @@ func (s *Service) fetchUserPlaybackStateAsync(ctx context.Context, userID string
|
|||||||
}
|
}
|
||||||
|
|
||||||
if state.StartTimeSeconds <= 0 {
|
if state.StartTimeSeconds <= 0 {
|
||||||
continueEntry, continueErr := s.db.GetContinueWatchingEntry(ctx, database.GetContinueWatchingEntryParams{
|
continueEntry, continueErr := s.db.GetContinueWatchingEntry(ctx, db.GetContinueWatchingEntryParams{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
AnimeID: int64(malID),
|
AnimeID: int64(malID),
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
database "mal/internal/db"
|
"mal/internal/db"
|
||||||
"mal/internal/middleware"
|
"mal/internal/middleware"
|
||||||
"mal/templates"
|
"mal/templates"
|
||||||
)
|
)
|
||||||
@@ -123,8 +123,8 @@ func (h *Handler) HandleGetWatchlist(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
watchlistByStatus := make(map[string][]database.GetUserWatchListRow)
|
watchlistByStatus := make(map[string][]db.GetUserWatchListRow)
|
||||||
allEntries := make([]database.GetUserWatchListRow, 0)
|
allEntries := make([]db.GetUserWatchListRow, 0)
|
||||||
watchlistIDs := make([]int64, len(entries))
|
watchlistIDs := make([]int64, len(entries))
|
||||||
|
|
||||||
for i, entry := range entries {
|
for i, entry := range entries {
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
db database.Querier
|
db db.Querier
|
||||||
sqlDB *sql.DB
|
sqlDB *sql.DB
|
||||||
jikanClient *jikan.Client
|
jikanClient *jikan.Client
|
||||||
}
|
}
|
||||||
@@ -32,7 +32,7 @@ var validStatuses = map[string]struct{}{
|
|||||||
"on_hold": {},
|
"on_hold": {},
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(db database.Querier, sqlDB *sql.DB, jikanClient *jikan.Client) *Service {
|
func NewService(db db.Querier, sqlDB *sql.DB, jikanClient *jikan.Client) *Service {
|
||||||
return &Service{db: db, sqlDB: sqlDB, jikanClient: jikanClient}
|
return &Service{db: db, sqlDB: sqlDB, jikanClient: jikanClient}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -47,7 +47,7 @@ func (s *Service) ensureAnimeExists(ctx context.Context, animeID int64) error {
|
|||||||
return fmt.Errorf("failed to fetch anime from jikan: %w", err)
|
return fmt.Errorf("failed to fetch anime from jikan: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = s.db.UpsertAnime(ctx, database.UpsertAnimeParams{
|
_, err = s.db.UpsertAnime(ctx, db.UpsertAnimeParams{
|
||||||
ID: int64(anime.MalID),
|
ID: int64(anime.MalID),
|
||||||
TitleOriginal: anime.Title,
|
TitleOriginal: anime.Title,
|
||||||
TitleEnglish: sql.NullString{String: anime.TitleEnglish, Valid: anime.TitleEnglish != ""},
|
TitleEnglish: sql.NullString{String: anime.TitleEnglish, Valid: anime.TitleEnglish != ""},
|
||||||
@@ -86,7 +86,7 @@ func (s *Service) AddToWatchlist(ctx context.Context, userID string, animeID int
|
|||||||
}
|
}
|
||||||
|
|
||||||
entryID := uuid.New().String()
|
entryID := uuid.New().String()
|
||||||
_, err := s.db.UpsertWatchListEntry(ctx, database.UpsertWatchListEntryParams{
|
_, err := s.db.UpsertWatchListEntry(ctx, db.UpsertWatchListEntryParams{
|
||||||
ID: entryID,
|
ID: entryID,
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
AnimeID: animeID,
|
AnimeID: animeID,
|
||||||
@@ -101,28 +101,28 @@ func (s *Service) AddToWatchlist(ctx context.Context, userID string, animeID int
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) RemoveEntry(ctx context.Context, userID string, animeID int64) (database.Anime, error) {
|
func (s *Service) RemoveEntry(ctx context.Context, userID string, animeID int64) (db.Anime, error) {
|
||||||
if animeID <= 0 {
|
if animeID <= 0 {
|
||||||
return database.Anime{}, ErrInvalidAnimeID
|
return db.Anime{}, ErrInvalidAnimeID
|
||||||
}
|
}
|
||||||
|
|
||||||
anime, err := s.db.GetAnime(ctx, animeID)
|
anime, err := s.db.GetAnime(ctx, animeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return database.Anime{}, fmt.Errorf("anime not found: %w", err)
|
return db.Anime{}, fmt.Errorf("anime not found: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.db.DeleteWatchListEntry(ctx, database.DeleteWatchListEntryParams{
|
err = s.db.DeleteWatchListEntry(ctx, db.DeleteWatchListEntryParams{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
AnimeID: animeID,
|
AnimeID: animeID,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return database.Anime{}, fmt.Errorf("failed to delete from watchlist: %w", err)
|
return db.Anime{}, fmt.Errorf("failed to delete from watchlist: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return anime, nil
|
return anime, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) GetUserWatchlist(ctx context.Context, userID string) ([]database.GetUserWatchListRow, error) {
|
func (s *Service) GetUserWatchlist(ctx context.Context, userID string) ([]db.GetUserWatchListRow, error) {
|
||||||
entries, err := s.db.GetUserWatchList(ctx, userID)
|
entries, err := s.db.GetUserWatchList(ctx, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to fetch watchlist: %w", err)
|
return nil, fmt.Errorf("failed to fetch watchlist: %w", err)
|
||||||
@@ -130,7 +130,7 @@ func (s *Service) GetUserWatchlist(ctx context.Context, userID string) ([]databa
|
|||||||
return entries, nil
|
return entries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) GetContinueWatching(ctx context.Context, userID string) ([]database.GetContinueWatchingEntriesRow, error) {
|
func (s *Service) GetContinueWatching(ctx context.Context, userID string) ([]db.GetContinueWatchingEntriesRow, error) {
|
||||||
if strings.TrimSpace(userID) == "" {
|
if strings.TrimSpace(userID) == "" {
|
||||||
return nil, errors.New("invalid user id")
|
return nil, errors.New("invalid user id")
|
||||||
}
|
}
|
||||||
@@ -152,12 +152,12 @@ func (s *Service) DeleteContinueWatching(ctx context.Context, userID string, ani
|
|||||||
return ErrInvalidAnimeID
|
return ErrInvalidAnimeID
|
||||||
}
|
}
|
||||||
|
|
||||||
params := database.DeleteContinueWatchingEntryParams{
|
params := db.DeleteContinueWatchingEntryParams{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
AnimeID: animeID,
|
AnimeID: animeID,
|
||||||
}
|
}
|
||||||
|
|
||||||
clearProgress := database.SaveWatchProgressParams{
|
clearProgress := db.SaveWatchProgressParams{
|
||||||
CurrentEpisode: sql.NullInt64{Valid: false},
|
CurrentEpisode: sql.NullInt64{Valid: false},
|
||||||
CurrentTimeSeconds: 0,
|
CurrentTimeSeconds: 0,
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
@@ -171,7 +171,7 @@ func (s *Service) DeleteContinueWatching(ctx context.Context, userID string, ani
|
|||||||
return s.db.SaveWatchProgress(ctx, clearProgress)
|
return s.db.SaveWatchProgress(ctx, clearProgress)
|
||||||
}
|
}
|
||||||
|
|
||||||
txQueries, tx, err := database.BeginTx(ctx, s.sqlDB)
|
txQueries, tx, err := db.BeginTx(ctx, s.sqlDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ import (
|
|||||||
|
|
||||||
"mal/api/auth"
|
"mal/api/auth"
|
||||||
"mal/integrations/jikan"
|
"mal/integrations/jikan"
|
||||||
"mal/internal/db/sqlite"
|
"mal/internal/db"
|
||||||
"mal/internal/server"
|
"mal/internal/server"
|
||||||
"mal/internal/worker"
|
"mal/internal/worker"
|
||||||
"mal/pkg/middleware"
|
"mal/pkg/middleware"
|
||||||
@@ -23,13 +23,13 @@ import (
|
|||||||
func main() {
|
func main() {
|
||||||
_ = godotenv.Load()
|
_ = godotenv.Load()
|
||||||
|
|
||||||
db, err := sqlite.Open(sqlite.GetDBFile())
|
dbConn, err := db.Open(db.GetDBFile())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to open db: %v", err)
|
log.Fatalf("failed to open db: %v", err)
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer dbConn.Close()
|
||||||
|
|
||||||
queries, err := sqlite.Init(db)
|
queries, err := db.Init(dbConn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to initialize database: %v", err)
|
log.Fatalf("failed to initialize database: %v", err)
|
||||||
}
|
}
|
||||||
@@ -43,7 +43,7 @@ func main() {
|
|||||||
|
|
||||||
app := server.Config{
|
app := server.Config{
|
||||||
DB: queries,
|
DB: queries,
|
||||||
SQLDB: db,
|
SQLDB: dbConn,
|
||||||
JikanClient: jikanClient,
|
JikanClient: jikanClient,
|
||||||
AuthService: auth.NewService(queries),
|
AuthService: auth.NewService(queries),
|
||||||
PlaybackProxySecret: playbackSecret(),
|
PlaybackProxySecret: playbackSecret(),
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"golang.org/x/crypto/bcrypt"
|
"golang.org/x/crypto/bcrypt"
|
||||||
"mal/internal/db/sqlite"
|
"mal/internal/db"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -21,14 +21,14 @@ func main() {
|
|||||||
username := os.Args[1]
|
username := os.Args[1]
|
||||||
password := os.Args[2]
|
password := os.Args[2]
|
||||||
|
|
||||||
db, err := sqlite.Open(sqlite.GetDBFile())
|
dbConn, err := db.Open(db.GetDBFile())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to open db: %v", err)
|
log.Fatalf("failed to open db: %v", err)
|
||||||
}
|
}
|
||||||
defer db.Close()
|
defer dbConn.Close()
|
||||||
|
|
||||||
var existingID string
|
var existingID string
|
||||||
err = db.QueryRow("SELECT id FROM user WHERE username = ?", username).Scan(&existingID)
|
err = dbConn.QueryRow("SELECT id FROM user WHERE username = ?", username).Scan(&existingID)
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
log.Fatalf("database error: %v", err)
|
log.Fatalf("database error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -49,7 +49,7 @@ func main() {
|
|||||||
log.Fatalf("failed to hash password: %v", err)
|
log.Fatalf("failed to hash password: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec("UPDATE user SET password_hash = ? WHERE id = ?", string(hash), existingID)
|
_, err = dbConn.Exec("UPDATE user SET password_hash = ? WHERE id = ?", string(hash), existingID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to update user: %v", err)
|
log.Fatalf("failed to update user: %v", err)
|
||||||
}
|
}
|
||||||
@@ -64,7 +64,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
id := uuid.New().String()
|
id := uuid.New().String()
|
||||||
_, err = db.Exec("INSERT INTO user (id, username, password_hash) VALUES (?, ?, ?)", id, username, string(hash))
|
_, err = dbConn.Exec("INSERT INTO user (id, username, password_hash) VALUES (?, ?, ?)", id, username, string(hash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to create user: %v", err)
|
log.Fatalf("failed to create user: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,13 +19,13 @@ import (
|
|||||||
type Client struct {
|
type Client struct {
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
baseURL string
|
baseURL string
|
||||||
db database.Querier
|
db db.Querier
|
||||||
retrySignal chan struct{}
|
retrySignal chan struct{}
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
lastReqTime time.Time
|
lastReqTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(db database.Querier) *Client {
|
func NewClient(db db.Querier) *Client {
|
||||||
return &Client{
|
return &Client{
|
||||||
httpClient: &http.Client{
|
httpClient: &http.Client{
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
@@ -157,7 +157,7 @@ func (c *Client) EnqueueAnimeFetchRetry(parentCtx context.Context, animeID int,
|
|||||||
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
err := c.db.EnqueueAnimeFetchRetry(ctx, database.EnqueueAnimeFetchRetryParams{
|
err := c.db.EnqueueAnimeFetchRetry(ctx, db.EnqueueAnimeFetchRetryParams{
|
||||||
AnimeID: int64(animeID),
|
AnimeID: int64(animeID),
|
||||||
LastError: truncateErrorMessage(cause.Error()),
|
LastError: truncateErrorMessage(cause.Error()),
|
||||||
})
|
})
|
||||||
@@ -228,7 +228,7 @@ func (c *Client) setCache(parentCtx context.Context, key string, data any, ttl t
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = c.db.SetJikanCache(ctx, database.SetJikanCacheParams{
|
_ = c.db.SetJikanCache(ctx, db.SetJikanCacheParams{
|
||||||
Key: key,
|
Key: key,
|
||||||
Data: string(bytes),
|
Data: string(bytes),
|
||||||
ExpiresAt: time.Now().Add(ttl),
|
ExpiresAt: time.Now().Add(ttl),
|
||||||
|
|||||||
@@ -73,8 +73,8 @@ func RequireAuth(next http.Handler) http.Handler {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetUser(ctx context.Context) *database.User {
|
func GetUser(ctx context.Context) *db.User {
|
||||||
user, ok := ctx.Value(ctxpkg.UserKey).(*database.User)
|
user, ok := ctx.Value(ctxpkg.UserKey).(*db.User)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
DB *database.Queries
|
DB *db.Queries
|
||||||
SQLDB *sql.DB
|
SQLDB *sql.DB
|
||||||
JikanClient *jikan.Client
|
JikanClient *jikan.Client
|
||||||
AuthService *auth.Service
|
AuthService *auth.Service
|
||||||
|
|||||||
@@ -13,11 +13,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Worker struct {
|
type Worker struct {
|
||||||
db *database.Queries
|
db *db.Queries
|
||||||
client *jikan.Client
|
client *jikan.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(db *database.Queries, client *jikan.Client) *Worker {
|
func New(db *db.Queries, client *jikan.Client) *Worker {
|
||||||
return &Worker{
|
return &Worker{
|
||||||
db: db,
|
db: db,
|
||||||
client: client,
|
client: client,
|
||||||
@@ -111,7 +111,7 @@ func (w *Worker) processAnimeFetchRetries(ctx context.Context) {
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, retry := range retries {
|
for _, retry := range retries {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(r database.AnimeFetchRetry) {
|
go func(r db.AnimeFetchRetry) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
_, err := w.client.GetAnimeByID(ctx, int(r.AnimeID))
|
_, err := w.client.GetAnimeByID(ctx, int(r.AnimeID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -120,7 +120,7 @@ func (w *Worker) processAnimeFetchRetries(ctx context.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = w.db.MarkAnimeFetchRetryFailed(ctx, database.MarkAnimeFetchRetryFailedParams{
|
_ = w.db.MarkAnimeFetchRetryFailed(ctx, db.MarkAnimeFetchRetryFailedParams{
|
||||||
Datetime: retryBackoff(r.Attempts + 1),
|
Datetime: retryBackoff(r.Attempts + 1),
|
||||||
LastError: err.Error(),
|
LastError: err.Error(),
|
||||||
AnimeID: r.AnimeID,
|
AnimeID: r.AnimeID,
|
||||||
@@ -152,15 +152,17 @@ func (w *Worker) syncRelations(ctx context.Context) {
|
|||||||
|
|
||||||
// Use a small worker pool for Jikan API calls to respect rate limits while maintaining concurrency
|
// Use a small worker pool for Jikan API calls to respect rate limits while maintaining concurrency
|
||||||
const workerCount = 2
|
const workerCount = 2
|
||||||
jobs := make(chan database.GetAnimeNeedingRelationSyncRow, len(animes))
|
jobs := make(chan db.GetAnimeNeedingRelationSyncRow, len(animes))
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for range workerCount {
|
for range workerCount {
|
||||||
wg.Go(func() {
|
go func() {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
for a := range jobs {
|
for a := range jobs {
|
||||||
w.syncSingleAnime(ctx, a.ID)
|
w.syncSingleAnime(ctx, a.ID)
|
||||||
}
|
}
|
||||||
})
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, a := range animes {
|
for _, a := range animes {
|
||||||
@@ -180,7 +182,7 @@ func (w *Worker) syncSingleAnime(ctx context.Context, id int64) {
|
|||||||
for _, rel := range animeData.Relations {
|
for _, rel := range animeData.Relations {
|
||||||
for _, entry := range rel.Entry {
|
for _, entry := range rel.Entry {
|
||||||
if entry.Type == "anime" {
|
if entry.Type == "anime" {
|
||||||
err := w.db.UpsertAnimeRelation(ctx, database.UpsertAnimeRelationParams{
|
err := w.db.UpsertAnimeRelation(ctx, db.UpsertAnimeRelationParams{
|
||||||
AnimeID: id,
|
AnimeID: id,
|
||||||
RelatedAnimeID: int64(entry.MalID),
|
RelatedAnimeID: int64(entry.MalID),
|
||||||
RelationType: rel.Relation,
|
RelationType: rel.Relation,
|
||||||
@@ -196,7 +198,7 @@ func (w *Worker) syncSingleAnime(ctx context.Context, id int64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{
|
_ = w.db.UpdateAnimeStatus(ctx, db.UpdateAnimeStatusParams{
|
||||||
Status: sql.NullString{String: animeData.Status, Valid: true},
|
Status: sql.NullString{String: animeData.Status, Valid: true},
|
||||||
ID: id,
|
ID: id,
|
||||||
})
|
})
|
||||||
@@ -210,7 +212,7 @@ func (w *Worker) ensureAnimeExistsAndStatusUpdated(ctx context.Context, malID in
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = w.db.UpsertAnime(ctx, database.UpsertAnimeParams{
|
_, err = w.db.UpsertAnime(ctx, db.UpsertAnimeParams{
|
||||||
ID: int64(animeDetails.MalID),
|
ID: int64(animeDetails.MalID),
|
||||||
TitleOriginal: animeDetails.Title,
|
TitleOriginal: animeDetails.Title,
|
||||||
TitleEnglish: sql.NullString{String: animeDetails.TitleEnglish, Valid: animeDetails.TitleEnglish != ""},
|
TitleEnglish: sql.NullString{String: animeDetails.TitleEnglish, Valid: animeDetails.TitleEnglish != ""},
|
||||||
@@ -222,7 +224,7 @@ func (w *Worker) ensureAnimeExistsAndStatusUpdated(ctx context.Context, malID in
|
|||||||
log.Printf("worker: failed to insert related anime %d: %v", malID, err)
|
log.Printf("worker: failed to insert related anime %d: %v", malID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{
|
_ = w.db.UpdateAnimeStatus(ctx, db.UpdateAnimeStatusParams{
|
||||||
Status: sql.NullString{String: animeDetails.Status, Valid: true},
|
Status: sql.NullString{String: animeDetails.Status, Valid: true},
|
||||||
ID: int64(animeDetails.MalID),
|
ID: int64(animeDetails.MalID),
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user