refactor: extract anime service layer and optimize API calls
This commit is contained in:
@@ -51,7 +51,8 @@ func NewRouter(cfg Config) http.Handler {
|
||||
|
||||
middleware.InitAuth(cfg.AuthService)
|
||||
|
||||
animeHandler := anime.NewHandler(cfg.JikanClient, cfg.DB)
|
||||
animeSvc := anime.NewService(cfg.JikanClient, cfg.DB)
|
||||
animeHandler := anime.NewHandler(animeSvc)
|
||||
|
||||
playbackSvc := playback.NewService(cfg.DB, cfg.SQLDB, playback.Config{
|
||||
ProxyTokenSecret: cfg.PlaybackProxySecret,
|
||||
@@ -67,9 +68,15 @@ func NewRouter(cfg Config) http.Handler {
|
||||
mux.Handle("/dist/", http.StripPrefix("/dist/", withMimeTypes(dist)))
|
||||
|
||||
mux.HandleFunc("/", animeHandler.HandleCatalog)
|
||||
mux.HandleFunc("/api/catalog/airing", animeHandler.HandleCatalogAiring)
|
||||
mux.HandleFunc("/api/catalog/popular", animeHandler.HandleCatalogPopular)
|
||||
mux.HandleFunc("/api/catalog/continue", animeHandler.HandleCatalogContinue)
|
||||
mux.HandleFunc("/search", animeHandler.HandleSearch)
|
||||
mux.HandleFunc("/browse", animeHandler.HandleBrowse)
|
||||
mux.HandleFunc("/discover", animeHandler.HandleDiscover)
|
||||
mux.HandleFunc("/api/discover/trending", animeHandler.HandleDiscoverTrending)
|
||||
mux.HandleFunc("/api/discover/upcoming", animeHandler.HandleDiscoverUpcoming)
|
||||
mux.HandleFunc("/api/discover/top", animeHandler.HandleDiscoverTop)
|
||||
mux.HandleFunc("/api/search-quick", animeHandler.HandleQuickSearch)
|
||||
mux.HandleFunc("/api/jikan/random/anime", animeHandler.HandleRandomAnime)
|
||||
mux.HandleFunc("/anime/", func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"mal/integrations/jikan"
|
||||
@@ -31,9 +32,7 @@ func (w *Worker) Start(ctx context.Context) {
|
||||
defer retryTicker.Stop()
|
||||
|
||||
// Run once immediately
|
||||
w.syncRelations(ctx)
|
||||
w.processAnimeFetchRetries(ctx)
|
||||
w.cleanupCache(ctx)
|
||||
w.runAllTasks(ctx)
|
||||
|
||||
cleanupCounter := 0
|
||||
|
||||
@@ -48,7 +47,6 @@ func (w *Worker) Start(ctx context.Context) {
|
||||
case <-ticker.C:
|
||||
w.syncRelations(ctx)
|
||||
|
||||
// Clean up cache every 60 runs (approx 1 hour)
|
||||
cleanupCounter++
|
||||
if cleanupCounter >= 60 {
|
||||
w.cleanupCache(ctx)
|
||||
@@ -58,6 +56,28 @@ func (w *Worker) Start(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) runAllTasks(ctx context.Context) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
w.syncRelations(ctx)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
w.processAnimeFetchRetries(ctx)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
w.cleanupCache(ctx)
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func retryBackoff(attempts int64) string {
|
||||
if attempts < 1 {
|
||||
attempts = 1
|
||||
@@ -88,44 +108,38 @@ func (w *Worker) processAnimeFetchRetries(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, retry := range retries {
|
||||
_, err := w.client.GetAnimeByID(ctx, int(retry.AnimeID))
|
||||
if err != nil {
|
||||
if !jikan.IsRetryableError(err) {
|
||||
deleteErr := w.db.DeleteAnimeFetchRetry(ctx, retry.AnimeID)
|
||||
if deleteErr != nil {
|
||||
log.Printf("worker: failed deleting non-retryable anime retry %d: %v", retry.AnimeID, deleteErr)
|
||||
wg.Add(1)
|
||||
go func(r database.AnimeFetchRetry) {
|
||||
defer wg.Done()
|
||||
_, err := w.client.GetAnimeByID(ctx, int(r.AnimeID))
|
||||
if err != nil {
|
||||
if !jikan.IsRetryableError(err) {
|
||||
_ = w.db.DeleteAnimeFetchRetry(ctx, r.AnimeID)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
updateErr := w.db.MarkAnimeFetchRetryFailed(ctx, database.MarkAnimeFetchRetryFailedParams{
|
||||
Datetime: retryBackoff(retry.Attempts + 1),
|
||||
LastError: err.Error(),
|
||||
AnimeID: retry.AnimeID,
|
||||
})
|
||||
if updateErr != nil {
|
||||
log.Printf("worker: failed updating anime fetch retry %d: %v", retry.AnimeID, updateErr)
|
||||
_ = w.db.MarkAnimeFetchRetryFailed(ctx, database.MarkAnimeFetchRetryFailedParams{
|
||||
Datetime: retryBackoff(r.Attempts + 1),
|
||||
LastError: err.Error(),
|
||||
AnimeID: r.AnimeID,
|
||||
})
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
deleteErr := w.db.DeleteAnimeFetchRetry(ctx, retry.AnimeID)
|
||||
if deleteErr != nil {
|
||||
log.Printf("worker: failed deleting successful anime retry %d: %v", retry.AnimeID, deleteErr)
|
||||
}
|
||||
_ = w.db.DeleteAnimeFetchRetry(ctx, r.AnimeID)
|
||||
}(retry)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (w *Worker) cleanupCache(ctx context.Context) {
|
||||
err := w.db.DeleteExpiredJikanCache(ctx)
|
||||
if err != nil {
|
||||
if err := w.db.DeleteExpiredJikanCache(ctx); err != nil {
|
||||
log.Printf("worker: failed to clean up expired jikan cache: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) syncRelations(ctx context.Context) {
|
||||
// Find up to 20 anime that need their relations synced
|
||||
animes, err := w.db.GetAnimeNeedingRelationSync(ctx)
|
||||
if err != nil {
|
||||
log.Printf("worker error: failed to get anime needing sync: %v", err)
|
||||
@@ -133,108 +147,85 @@ func (w *Worker) syncRelations(ctx context.Context) {
|
||||
}
|
||||
|
||||
if len(animes) == 0 {
|
||||
return // silent heartbeat
|
||||
return
|
||||
}
|
||||
|
||||
for _, a := range animes {
|
||||
func() {
|
||||
animeData, err := w.client.GetAnimeByID(ctx, int(a.ID))
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to fetch anime details for %d: %v", a.ID, err)
|
||||
// Sleep a bit on error to respect rate limits, but DO NOT mark as synced
|
||||
// so it will be retried on the next worker run.
|
||||
time.Sleep(2 * time.Second)
|
||||
return
|
||||
}
|
||||
// Use a small worker pool for Jikan API calls to respect rate limits while maintaining concurrency
|
||||
const workerCount = 2
|
||||
jobs := make(chan database.GetAnimeNeedingRelationSyncRow, len(animes))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// If we got here, we successfully fetched the data, so we mark it as synced.
|
||||
defer func() {
|
||||
err := w.db.MarkRelationsSynced(ctx, a.ID)
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to mark relations synced for %d: %v", a.ID, err)
|
||||
}
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
}()
|
||||
|
||||
for _, rel := range animeData.Relations {
|
||||
for _, entry := range rel.Entry {
|
||||
if entry.Type == "anime" {
|
||||
// We just insert the relation.
|
||||
err := w.db.UpsertAnimeRelation(ctx, database.UpsertAnimeRelationParams{
|
||||
AnimeID: a.ID,
|
||||
RelatedAnimeID: int64(entry.MalID),
|
||||
RelationType: rel.Relation,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to insert relation %d -> %d: %v", a.ID, entry.MalID, err)
|
||||
}
|
||||
|
||||
// If it's a Sequel, we should also make sure the related anime is tracked
|
||||
if rel.Relation == "Sequel" {
|
||||
w.ensureAnimeExistsAndStatusUpdated(ctx, entry.MalID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{
|
||||
Status: sql.NullString{String: animeData.Status, Valid: true},
|
||||
ID: a.ID,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to update status for %d: %v", a.ID, err)
|
||||
for i := 0; i < workerCount; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for a := range jobs {
|
||||
w.syncSingleAnime(ctx, a.ID)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for _, a := range animes {
|
||||
jobs <- a
|
||||
}
|
||||
close(jobs)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (w *Worker) syncSingleAnime(ctx context.Context, id int64) {
|
||||
animeData, err := w.client.GetAnimeByID(ctx, int(id))
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to fetch anime details for %d: %v", id, err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, rel := range animeData.Relations {
|
||||
for _, entry := range rel.Entry {
|
||||
if entry.Type == "anime" {
|
||||
err := w.db.UpsertAnimeRelation(ctx, database.UpsertAnimeRelationParams{
|
||||
AnimeID: id,
|
||||
RelatedAnimeID: int64(entry.MalID),
|
||||
RelationType: rel.Relation,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to insert relation %d -> %d: %v", id, entry.MalID, err)
|
||||
}
|
||||
|
||||
if rel.Relation == "Sequel" {
|
||||
w.ensureAnimeExistsAndStatusUpdated(ctx, entry.MalID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{
|
||||
Status: sql.NullString{String: animeData.Status, Valid: true},
|
||||
ID: id,
|
||||
})
|
||||
_ = w.db.MarkRelationsSynced(ctx, id)
|
||||
}
|
||||
|
||||
func (w *Worker) ensureAnimeExistsAndStatusUpdated(ctx context.Context, malID int) {
|
||||
// check if we have it
|
||||
_, err := w.db.GetAnime(ctx, int64(malID))
|
||||
animeDetails, err := w.client.GetAnimeByID(ctx, malID)
|
||||
if err != nil {
|
||||
// we don't have it, let's fetch it
|
||||
animeDetails, err := w.client.GetAnimeByID(ctx, malID)
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to fetch related anime %d: %v", malID, err)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = w.db.UpsertAnime(ctx, database.UpsertAnimeParams{
|
||||
ID: int64(animeDetails.MalID),
|
||||
TitleOriginal: animeDetails.Title,
|
||||
TitleEnglish: sql.NullString{String: animeDetails.TitleEnglish, Valid: animeDetails.TitleEnglish != ""},
|
||||
TitleJapanese: sql.NullString{String: animeDetails.TitleJapanese, Valid: animeDetails.TitleJapanese != ""},
|
||||
ImageUrl: animeDetails.ImageURL(),
|
||||
Airing: sql.NullBool{Bool: animeDetails.Airing, Valid: true},
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to insert related anime %d: %v", malID, err)
|
||||
return
|
||||
}
|
||||
|
||||
err = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{
|
||||
Status: sql.NullString{String: animeDetails.Status, Valid: true},
|
||||
ID: int64(animeDetails.MalID),
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to update status for related anime %d: %v", malID, err)
|
||||
}
|
||||
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
} else {
|
||||
// We have it, but maybe status is outdated. Fetching every time might be too much,
|
||||
// but since it's a Sequel to something they watched, we could fetch it.
|
||||
// For now, let's just let the worker naturally pick it up if it gets added to watchlist,
|
||||
// OR we can explicitly fetch its details to keep sequels up to date.
|
||||
animeDetails, err := w.client.GetAnimeByID(ctx, malID)
|
||||
if err == nil {
|
||||
if err := w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{
|
||||
Status: sql.NullString{String: animeDetails.Status, Valid: true},
|
||||
ID: int64(animeDetails.MalID),
|
||||
}); err != nil {
|
||||
log.Printf("worker: failed to update status for anime %d: %v", animeDetails.MalID, err)
|
||||
}
|
||||
}
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
log.Printf("worker: failed to fetch related anime %d: %v", malID, err)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = w.db.UpsertAnime(ctx, database.UpsertAnimeParams{
|
||||
ID: int64(animeDetails.MalID),
|
||||
TitleOriginal: animeDetails.Title,
|
||||
TitleEnglish: sql.NullString{String: animeDetails.TitleEnglish, Valid: animeDetails.TitleEnglish != ""},
|
||||
TitleJapanese: sql.NullString{String: animeDetails.TitleJapanese, Valid: animeDetails.TitleJapanese != ""},
|
||||
ImageUrl: animeDetails.ImageURL(),
|
||||
Airing: sql.NullBool{Bool: animeDetails.Airing, Valid: true},
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to insert related anime %d: %v", malID, err)
|
||||
}
|
||||
|
||||
_ = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{
|
||||
Status: sql.NullString{String: animeDetails.Status, Valid: true},
|
||||
ID: int64(animeDetails.MalID),
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user