diff --git a/api/anime/handler.go b/api/anime/handler.go index d1e36c3..a1904fd 100644 --- a/api/anime/handler.go +++ b/api/anime/handler.go @@ -12,7 +12,6 @@ import ( "time" "mal/integrations/jikan" - ctxpkg "mal/internal/context" "mal/internal/db" "mal/internal/middleware" "mal/templates" @@ -21,8 +20,11 @@ import ( ) type Handler struct { - jikanClient *jikan.Client - db database.Querier + service *Service +} + +func NewHandler(service *Service) *Handler { + return &Handler{service: service} } type quickSearchResult struct { @@ -32,9 +34,16 @@ type quickSearchResult struct { Image string `json:"image"` } -func renderNotFoundPage(r *http.Request, w http.ResponseWriter) { - w.WriteHeader(http.StatusNotFound) - if err := templates.GetRenderer().ExecuteTemplate(r.Context(), w, "not_found.gohtml", map[string]any{ +func (h *Handler) HandleCatalog(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + renderNotFoundPage(r, w) + return + } + + user := middleware.GetUser(r.Context()) + + if err := templates.GetRenderer().ExecuteTemplate(r.Context(), w, "index.gohtml", map[string]any{ + "User": user, "CurrentPath": r.URL.Path, }); err != nil { log.Printf("render error: %v", err) @@ -42,102 +51,88 @@ func renderNotFoundPage(r *http.Request, w http.ResponseWriter) { } } -func writeInlineLoadError(w http.ResponseWriter, message string) { - w.Header().Set("Content-Type", "text/html") - _, _ = w.Write([]byte(`
` + html.EscapeString(message) + `
`)) +func (h *Handler) HandleCatalogAiring(w http.ResponseWriter, r *http.Request) { + h.renderCatalogSection(w, r, "Airing") } -func parsePageParam(r *http.Request) int { - page, _ := strconv.Atoi(r.URL.Query().Get("page")) - if page < 1 { - return 1 - } - return page +func (h *Handler) HandleCatalogPopular(w http.ResponseWriter, r *http.Request) { + h.renderCatalogSection(w, r, "Popular") } -func NewHandler(jikanClient *jikan.Client, db database.Querier) *Handler { - return &Handler{jikanClient: jikanClient, db: db} +func (h *Handler) HandleCatalogContinue(w http.ResponseWriter, r *http.Request) { + h.renderCatalogSection(w, r, "Continue") } -func (h *Handler) HandleCatalog(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/" { - renderNotFoundPage(r, w) - return - } - - var ( - animes jikan.TopAnimeResult - currentlyAiring jikan.TopAnimeResult - cw []database.GetContinueWatchingEntriesRow - watchlist []database.GetUserWatchListRow - ) - - g, gCtx := errgroup.WithContext(r.Context()) - - g.Go(func() error { - var err error - animes, err = h.jikanClient.GetTopAnime(gCtx, 1) - return err - }) - - g.Go(func() error { - var err error - currentlyAiring, err = h.jikanClient.GetSeasonsNow(gCtx, 1) - if err != nil { - log.Printf("seasons now error: %v", err) - return nil // non-fatal - } - return nil - }) - +func (h *Handler) renderCatalogSection(w http.ResponseWriter, r *http.Request, section string) { user := middleware.GetUser(r.Context()) + userID := "" if user != nil { - g.Go(func() error { - var err error - cw, err = h.db.GetContinueWatchingEntries(gCtx, user.ID) - return err - }) - g.Go(func() error { - var err error - watchlist, err = h.db.GetUserWatchList(gCtx, user.ID) - return err - }) + userID = user.ID } - if err := g.Wait(); err != nil { - log.Printf("catalog fetch error: %v", err) - http.Error(w, "Failed to fetch catalog data", http.StatusInternalServerError) + data, err := h.service.GetCatalogSection(r.Context(), userID, section) + if err != nil { + log.Printf("catalog %s error: %v", section, err) + if section != "Continue" { + writeInlineLoadError(w, "Failed to load "+section) + } return } - if len(animes.Animes) > 6 { - animes.Animes = animes.Animes[:6] - } - if len(currentlyAiring.Animes) > 6 { - currentlyAiring.Animes = currentlyAiring.Animes[:6] - } + data["User"] = user + data["Section"] = section - watchlistMap := make(map[int64]bool) - watchlistIDs := make([]int64, len(watchlist)) - for i, entry := range watchlist { - watchlistMap[entry.AnimeID] = true - watchlistIDs[i] = entry.AnimeID + if err := templates.GetRenderer().ExecuteFragment(r.Context(), w, "index.gohtml", "catalog_section", data); err != nil { + log.Printf("fragment render error: %v", err) } +} - if err := templates.GetRenderer().ExecuteTemplate(r.Context(), w, "index.gohtml", map[string]any{ - "MostPopular": animes.Animes, - "CurrentlyAiring": currentlyAiring.Animes, - "ContinueWatching": cw, - "User": user, - "CurrentPath": r.URL.Path, - "WatchlistMap": watchlistMap, - "WatchlistIDs": watchlistIDs, +func (h *Handler) HandleDiscover(w http.ResponseWriter, r *http.Request) { + user := middleware.GetUser(r.Context()) + + if err := templates.GetRenderer().ExecuteTemplate(r.Context(), w, "discover.gohtml", map[string]any{ + "User": user, + "CurrentPath": r.URL.Path, }); err != nil { log.Printf("render error: %v", err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) } } +func (h *Handler) HandleDiscoverTrending(w http.ResponseWriter, r *http.Request) { + h.renderDiscoverSection(w, r, "Trending") +} + +func (h *Handler) HandleDiscoverUpcoming(w http.ResponseWriter, r *http.Request) { + h.renderDiscoverSection(w, r, "Upcoming") +} + +func (h *Handler) HandleDiscoverTop(w http.ResponseWriter, r *http.Request) { + h.renderDiscoverSection(w, r, "Top") +} + +func (h *Handler) renderDiscoverSection(w http.ResponseWriter, r *http.Request, section string) { + user := middleware.GetUser(r.Context()) + userID := "" + if user != nil { + userID = user.ID + } + + data, err := h.service.GetDiscoverSection(r.Context(), userID, section) + if err != nil { + log.Printf("discover %s error: %v", section, err) + writeInlineLoadError(w, "Failed to load "+section) + return + } + + data["User"] = user + data["Section"] = section + + if err := templates.GetRenderer().ExecuteFragment(r.Context(), w, "discover.gohtml", "discover_section", data); err != nil { + log.Printf("fragment render error: %v", err) + } +} + func (h *Handler) HandleBrowse(w http.ResponseWriter, r *http.Request) { user := middleware.GetUser(r.Context()) @@ -155,16 +150,12 @@ func (h *Handler) HandleBrowse(w http.ResponseWriter, r *http.Request) { } } - pageStr := r.URL.Query().Get("page") - page, _ := strconv.Atoi(pageStr) - if page < 1 { - page = 1 - } + page := parsePageParam(r) ctx, cancel := context.WithTimeout(r.Context(), 20*time.Second) defer cancel() - res, err := h.jikanClient.SearchAdvanced(ctx, q, animeType, status, orderBy, sort, genres, page, 24) + res, err := h.service.jikanClient.SearchAdvanced(ctx, q, animeType, status, orderBy, sort, genres, page, 24) if err != nil { if errors.Is(err, context.Canceled) { return @@ -175,7 +166,7 @@ func (h *Handler) HandleBrowse(w http.ResponseWriter, r *http.Request) { if r.Header.Get("HX-Request") == "true" { watchlistMap := make(map[int]bool) if user != nil { - watchlist, _ := h.db.GetUserWatchList(ctx, user.ID) + watchlist, _ := h.service.db.GetUserWatchList(ctx, user.ID) for _, entry := range watchlist { watchlistMap[int(entry.AnimeID)] = true } @@ -202,7 +193,7 @@ func (h *Handler) HandleBrowse(w http.ResponseWriter, r *http.Request) { return } - genresList, err := h.jikanClient.GetAnimeGenres(ctx) + genresList, err := h.service.jikanClient.GetAnimeGenres(ctx) if err != nil { if !errors.Is(err, context.Canceled) { log.Printf("genres error: %v", err) @@ -212,7 +203,7 @@ func (h *Handler) HandleBrowse(w http.ResponseWriter, r *http.Request) { watchlistMap := make(map[int]bool) var watchlistIDs []int64 if user != nil { - watchlist, _ := h.db.GetUserWatchList(ctx, user.ID) + watchlist, _ := h.service.db.GetUserWatchList(ctx, user.ID) watchlistIDs = make([]int64, len(watchlist)) for i, entry := range watchlist { watchlistMap[int(entry.AnimeID)] = true @@ -243,18 +234,6 @@ func (h *Handler) HandleBrowse(w http.ResponseWriter, r *http.Request) { } } -func (h *Handler) HandleSearch(w http.ResponseWriter, r *http.Request) { - renderNotFoundPage(r, w) -} - -func (h *Handler) HandleAPISearch(w http.ResponseWriter, r *http.Request) { - http.Error(w, "Not implemented yet", http.StatusNotImplemented) -} - -func (h *Handler) HandleAPICatalog(w http.ResponseWriter, r *http.Request) { - http.Error(w, "Not implemented yet", http.StatusNotImplemented) -} - func (h *Handler) HandleAnimeDetails(w http.ResponseWriter, r *http.Request) { idStr := strings.TrimPrefix(r.URL.Path, "/anime/") idStr = strings.TrimSuffix(idStr, "/") @@ -264,28 +243,32 @@ func (h *Handler) HandleAnimeDetails(w http.ResponseWriter, r *http.Request) { return } + user := middleware.GetUser(r.Context()) + + // If it's an HTMX request for a specific section, handle it + section := r.URL.Query().Get("section") + if section != "" && r.Header.Get("HX-Request") == "true" { + h.renderAnimeDetailsSection(w, r, id, section) + return + } + var ( - anime jikan.Anime - characters []jikan.CharacterEntry - recommendations []jikan.RecommendationEntry - watchlist []database.GetUserWatchListRow - status string - episodesCount int + anime jikan.Anime + status string + episodesCount int + watchlistIDs []int64 ) g, gCtx := errgroup.WithContext(r.Context()) g.Go(func() error { var err error - anime, err = h.jikanClient.GetAnimeByID(gCtx, id) + anime, err = h.service.jikanClient.GetAnimeByID(gCtx, id) if err == nil && anime.Airing { - // If airing, we want to know how many episodes are released so far. - // The episodes endpoint with page 1 gives us the last visible page in pagination. - eps, err := h.jikanClient.GetEpisodes(gCtx, id, 1) + eps, err := h.service.jikanClient.GetEpisodes(gCtx, id, 1) if err == nil { if eps.Pagination.LastVisiblePage > 1 { - // Fetch last page to get the true count - lastEps, err := h.jikanClient.GetEpisodes(gCtx, id, eps.Pagination.LastVisiblePage) + lastEps, err := h.service.jikanClient.GetEpisodes(gCtx, id, eps.Pagination.LastVisiblePage) if err == nil && len(lastEps.Data) > 0 { lastEp := lastEps.Data[len(lastEps.Data)-1] count, _ := strconv.Atoi(lastEp.Episode) @@ -301,28 +284,9 @@ func (h *Handler) HandleAnimeDetails(w http.ResponseWriter, r *http.Request) { return err }) - g.Go(func() error { - var err error - characters, err = h.jikanClient.GetAnimeCharacters(gCtx, id) - if err != nil { - log.Printf("characters fetch error: %v", err) - } - return nil - }) - - g.Go(func() error { - var err error - recommendations, err = h.jikanClient.GetAnimeRecommendations(gCtx, id) - if err != nil { - log.Printf("recommendations fetch error: %v", err) - } - return nil - }) - - user := middleware.GetUser(r.Context()) if user != nil { g.Go(func() error { - entry, err := h.db.GetWatchListEntry(gCtx, database.GetWatchListEntryParams{ + entry, err := h.service.db.GetWatchListEntry(gCtx, database.GetWatchListEntryParams{ UserID: user.ID, AnimeID: int64(id), }) @@ -332,9 +296,14 @@ func (h *Handler) HandleAnimeDetails(w http.ResponseWriter, r *http.Request) { return nil }) g.Go(func() error { - var err error - watchlist, err = h.db.GetUserWatchList(gCtx, user.ID) - return err + watchlist, err := h.service.db.GetUserWatchList(gCtx, user.ID) + if err == nil { + watchlistIDs = make([]int64, len(watchlist)) + for i, e := range watchlist { + watchlistIDs[i] = e.AnimeID + } + } + return nil }) } @@ -344,26 +313,50 @@ func (h *Handler) HandleAnimeDetails(w http.ResponseWriter, r *http.Request) { return } - watchlistIDs := make([]int64, len(watchlist)) - for i, e := range watchlist { - watchlistIDs[i] = e.AnimeID - } - if err := templates.GetRenderer().ExecuteTemplate(r.Context(), w, "anime.gohtml", map[string]any{ - "Anime": anime, - "Characters": characters, - "Recommendations": recommendations, - "User": user, - "Status": status, - "CurrentPath": r.URL.Path, - "WatchlistIDs": watchlistIDs, - "EpisodesCount": episodesCount, + "Anime": anime, + "User": user, + "Status": status, + "CurrentPath": r.URL.Path, + "WatchlistIDs": watchlistIDs, + "EpisodesCount": episodesCount, }); err != nil { log.Printf("render error: %v", err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) } } +func (h *Handler) renderAnimeDetailsSection(w http.ResponseWriter, r *http.Request, id int, section string) { + ctx := r.Context() + var data any + var err error + + switch section { + case "characters": + data, err = h.service.jikanClient.GetAnimeCharacters(ctx, id) + case "recommendations": + data, err = h.service.jikanClient.GetAnimeRecommendations(ctx, id) + default: + http.Error(w, "Invalid section", http.StatusBadRequest) + return + } + + if err != nil { + log.Printf("anime details %s error: %v", section, err) + writeInlineLoadError(w, "Failed to load "+section) + return + } + + tplName := "anime_characters" + if section == "recommendations" { + tplName = "anime_recommendations" + } + + if err := templates.GetRenderer().ExecuteFragment(ctx, w, "anime.gohtml", tplName, data); err != nil { + log.Printf("fragment render error: %v", err) + } +} + func (h *Handler) HandleHTMLWatchOrder(w http.ResponseWriter, r *http.Request) { animeIdStr := r.URL.Query().Get("animeId") id, err := strconv.Atoi(animeIdStr) @@ -372,7 +365,7 @@ func (h *Handler) HandleHTMLWatchOrder(w http.ResponseWriter, r *http.Request) { return } - relations, err := h.jikanClient.GetFullRelations(r.Context(), id) + relations, err := h.service.jikanClient.GetFullRelations(r.Context(), id) if err != nil { log.Printf("watch order error: %v", err) http.Error(w, `` + html.EscapeString(message) + `
`)) +} + +func parsePageParam(r *http.Request) int { + page, _ := strconv.Atoi(r.URL.Query().Get("page")) + if page < 1 { + return 1 + } + return page +} diff --git a/api/anime/service.go b/api/anime/service.go new file mode 100644 index 0000000..0ff80c0 --- /dev/null +++ b/api/anime/service.go @@ -0,0 +1,138 @@ +package anime + +import ( + "context" + "mal/integrations/jikan" + "mal/internal/db" + + "golang.org/x/sync/errgroup" +) + +type Service struct { + jikanClient *jikan.Client + db database.Querier +} + +func NewService(jikanClient *jikan.Client, db database.Querier) *Service { + return &Service{jikanClient: jikanClient, db: db} +} + +func (s *Service) GetCatalogSection(ctx context.Context, userID string, section string) (map[string]any, error) { + var ( + res jikan.TopAnimeResult + cw []database.GetContinueWatchingEntriesRow + watchlist []database.GetUserWatchListRow + err error + ) + + g, gCtx := errgroup.WithContext(ctx) + + g.Go(func() error { + switch section { + case "Airing": + res, err = s.jikanClient.GetSeasonsNow(gCtx, 1) + case "Popular": + res, err = s.jikanClient.GetTopAnime(gCtx, 1) + } + return err + }) + + if userID != "" { + g.Go(func() error { + if section == "Continue" { + var err error + cw, err = s.db.GetContinueWatchingEntries(gCtx, userID) + return err + } + return nil + }) + g.Go(func() error { + var err error + watchlist, err = s.db.GetUserWatchList(gCtx, userID) + return err + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + animes := res.Animes + if len(animes) > 6 { + animes = animes[:6] + } + + watchlistMap := make(map[int64]bool) + for _, entry := range watchlist { + watchlistMap[entry.AnimeID] = true + } + + return map[string]any{ + "Animes": animes, + "ContinueWatching": cw, + "WatchlistMap": watchlistMap, + }, nil +} + +func (s *Service) GetDiscoverSection(ctx context.Context, userID string, section string) (map[string]any, error) { + var ( + res jikan.TopAnimeResult + watchlist []database.GetUserWatchListRow + err error + ) + + g, gCtx := errgroup.WithContext(ctx) + + g.Go(func() error { + switch section { + case "Trending": + res, err = s.jikanClient.GetSeasonsNow(gCtx, 1) + case "Upcoming": + res, err = s.jikanClient.GetSeasonsUpcoming(gCtx, 1) + case "Top": + res, err = s.jikanClient.GetTopAnime(gCtx, 1) + } + return err + }) + + if userID != "" { + g.Go(func() error { + var err error + watchlist, err = s.db.GetUserWatchList(gCtx, userID) + return err + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + animes := res.Animes + if len(animes) > 8 { + animes = animes[:8] + } + + watchlistMap := make(map[int64]bool) + for _, entry := range watchlist { + watchlistMap[entry.AnimeID] = true + } + + return map[string]any{ + "Animes": animes, + "WatchlistMap": watchlistMap, + }, nil +} + +func (s *Service) filterUnique(animes []jikan.Anime, seen map[int]bool, limit int) []jikan.Anime { + unique := make([]jikan.Anime, 0) + for _, a := range animes { + if !seen[a.MalID] { + seen[a.MalID] = true + unique = append(unique, a) + } + if len(unique) >= limit { + break + } + } + return unique +} diff --git a/internal/server/routes.go b/internal/server/routes.go index 1905dea..3ccf8aa 100644 --- a/internal/server/routes.go +++ b/internal/server/routes.go @@ -51,7 +51,8 @@ func NewRouter(cfg Config) http.Handler { middleware.InitAuth(cfg.AuthService) - animeHandler := anime.NewHandler(cfg.JikanClient, cfg.DB) + animeSvc := anime.NewService(cfg.JikanClient, cfg.DB) + animeHandler := anime.NewHandler(animeSvc) playbackSvc := playback.NewService(cfg.DB, cfg.SQLDB, playback.Config{ ProxyTokenSecret: cfg.PlaybackProxySecret, @@ -67,9 +68,15 @@ func NewRouter(cfg Config) http.Handler { mux.Handle("/dist/", http.StripPrefix("/dist/", withMimeTypes(dist))) mux.HandleFunc("/", animeHandler.HandleCatalog) + mux.HandleFunc("/api/catalog/airing", animeHandler.HandleCatalogAiring) + mux.HandleFunc("/api/catalog/popular", animeHandler.HandleCatalogPopular) + mux.HandleFunc("/api/catalog/continue", animeHandler.HandleCatalogContinue) mux.HandleFunc("/search", animeHandler.HandleSearch) mux.HandleFunc("/browse", animeHandler.HandleBrowse) mux.HandleFunc("/discover", animeHandler.HandleDiscover) + mux.HandleFunc("/api/discover/trending", animeHandler.HandleDiscoverTrending) + mux.HandleFunc("/api/discover/upcoming", animeHandler.HandleDiscoverUpcoming) + mux.HandleFunc("/api/discover/top", animeHandler.HandleDiscoverTop) mux.HandleFunc("/api/search-quick", animeHandler.HandleQuickSearch) mux.HandleFunc("/api/jikan/random/anime", animeHandler.HandleRandomAnime) mux.HandleFunc("/anime/", func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/worker/relations.go b/internal/worker/relations.go index db44887..15ab8c2 100644 --- a/internal/worker/relations.go +++ b/internal/worker/relations.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "log" + "sync" "time" "mal/integrations/jikan" @@ -31,9 +32,7 @@ func (w *Worker) Start(ctx context.Context) { defer retryTicker.Stop() // Run once immediately - w.syncRelations(ctx) - w.processAnimeFetchRetries(ctx) - w.cleanupCache(ctx) + w.runAllTasks(ctx) cleanupCounter := 0 @@ -48,7 +47,6 @@ func (w *Worker) Start(ctx context.Context) { case <-ticker.C: w.syncRelations(ctx) - // Clean up cache every 60 runs (approx 1 hour) cleanupCounter++ if cleanupCounter >= 60 { w.cleanupCache(ctx) @@ -58,6 +56,28 @@ func (w *Worker) Start(ctx context.Context) { } } +func (w *Worker) runAllTasks(ctx context.Context) { + var wg sync.WaitGroup + wg.Add(3) + + go func() { + defer wg.Done() + w.syncRelations(ctx) + }() + + go func() { + defer wg.Done() + w.processAnimeFetchRetries(ctx) + }() + + go func() { + defer wg.Done() + w.cleanupCache(ctx) + }() + + wg.Wait() +} + func retryBackoff(attempts int64) string { if attempts < 1 { attempts = 1 @@ -88,44 +108,38 @@ func (w *Worker) processAnimeFetchRetries(ctx context.Context) { return } + var wg sync.WaitGroup for _, retry := range retries { - _, err := w.client.GetAnimeByID(ctx, int(retry.AnimeID)) - if err != nil { - if !jikan.IsRetryableError(err) { - deleteErr := w.db.DeleteAnimeFetchRetry(ctx, retry.AnimeID) - if deleteErr != nil { - log.Printf("worker: failed deleting non-retryable anime retry %d: %v", retry.AnimeID, deleteErr) + wg.Add(1) + go func(r database.AnimeFetchRetry) { + defer wg.Done() + _, err := w.client.GetAnimeByID(ctx, int(r.AnimeID)) + if err != nil { + if !jikan.IsRetryableError(err) { + _ = w.db.DeleteAnimeFetchRetry(ctx, r.AnimeID) + return } - continue - } - updateErr := w.db.MarkAnimeFetchRetryFailed(ctx, database.MarkAnimeFetchRetryFailedParams{ - Datetime: retryBackoff(retry.Attempts + 1), - LastError: err.Error(), - AnimeID: retry.AnimeID, - }) - if updateErr != nil { - log.Printf("worker: failed updating anime fetch retry %d: %v", retry.AnimeID, updateErr) + _ = w.db.MarkAnimeFetchRetryFailed(ctx, database.MarkAnimeFetchRetryFailedParams{ + Datetime: retryBackoff(r.Attempts + 1), + LastError: err.Error(), + AnimeID: r.AnimeID, + }) + return } - continue - } - - deleteErr := w.db.DeleteAnimeFetchRetry(ctx, retry.AnimeID) - if deleteErr != nil { - log.Printf("worker: failed deleting successful anime retry %d: %v", retry.AnimeID, deleteErr) - } + _ = w.db.DeleteAnimeFetchRetry(ctx, r.AnimeID) + }(retry) } + wg.Wait() } func (w *Worker) cleanupCache(ctx context.Context) { - err := w.db.DeleteExpiredJikanCache(ctx) - if err != nil { + if err := w.db.DeleteExpiredJikanCache(ctx); err != nil { log.Printf("worker: failed to clean up expired jikan cache: %v", err) } } func (w *Worker) syncRelations(ctx context.Context) { - // Find up to 20 anime that need their relations synced animes, err := w.db.GetAnimeNeedingRelationSync(ctx) if err != nil { log.Printf("worker error: failed to get anime needing sync: %v", err) @@ -133,108 +147,85 @@ func (w *Worker) syncRelations(ctx context.Context) { } if len(animes) == 0 { - return // silent heartbeat + return } - for _, a := range animes { - func() { - animeData, err := w.client.GetAnimeByID(ctx, int(a.ID)) - if err != nil { - log.Printf("worker: failed to fetch anime details for %d: %v", a.ID, err) - // Sleep a bit on error to respect rate limits, but DO NOT mark as synced - // so it will be retried on the next worker run. - time.Sleep(2 * time.Second) - return - } + // Use a small worker pool for Jikan API calls to respect rate limits while maintaining concurrency + const workerCount = 2 + jobs := make(chan database.GetAnimeNeedingRelationSyncRow, len(animes)) + var wg sync.WaitGroup - // If we got here, we successfully fetched the data, so we mark it as synced. - defer func() { - err := w.db.MarkRelationsSynced(ctx, a.ID) - if err != nil { - log.Printf("worker: failed to mark relations synced for %d: %v", a.ID, err) - } - time.Sleep(400 * time.Millisecond) - }() - - for _, rel := range animeData.Relations { - for _, entry := range rel.Entry { - if entry.Type == "anime" { - // We just insert the relation. - err := w.db.UpsertAnimeRelation(ctx, database.UpsertAnimeRelationParams{ - AnimeID: a.ID, - RelatedAnimeID: int64(entry.MalID), - RelationType: rel.Relation, - }) - if err != nil { - log.Printf("worker: failed to insert relation %d -> %d: %v", a.ID, entry.MalID, err) - } - - // If it's a Sequel, we should also make sure the related anime is tracked - if rel.Relation == "Sequel" { - w.ensureAnimeExistsAndStatusUpdated(ctx, entry.MalID) - } - } - } - } - - err = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{ - Status: sql.NullString{String: animeData.Status, Valid: true}, - ID: a.ID, - }) - if err != nil { - log.Printf("worker: failed to update status for %d: %v", a.ID, err) + for i := 0; i < workerCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for a := range jobs { + w.syncSingleAnime(ctx, a.ID) } }() } + + for _, a := range animes { + jobs <- a + } + close(jobs) + wg.Wait() +} + +func (w *Worker) syncSingleAnime(ctx context.Context, id int64) { + animeData, err := w.client.GetAnimeByID(ctx, int(id)) + if err != nil { + log.Printf("worker: failed to fetch anime details for %d: %v", id, err) + return + } + + for _, rel := range animeData.Relations { + for _, entry := range rel.Entry { + if entry.Type == "anime" { + err := w.db.UpsertAnimeRelation(ctx, database.UpsertAnimeRelationParams{ + AnimeID: id, + RelatedAnimeID: int64(entry.MalID), + RelationType: rel.Relation, + }) + if err != nil { + log.Printf("worker: failed to insert relation %d -> %d: %v", id, entry.MalID, err) + } + + if rel.Relation == "Sequel" { + w.ensureAnimeExistsAndStatusUpdated(ctx, entry.MalID) + } + } + } + } + + _ = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{ + Status: sql.NullString{String: animeData.Status, Valid: true}, + ID: id, + }) + _ = w.db.MarkRelationsSynced(ctx, id) } func (w *Worker) ensureAnimeExistsAndStatusUpdated(ctx context.Context, malID int) { - // check if we have it - _, err := w.db.GetAnime(ctx, int64(malID)) + animeDetails, err := w.client.GetAnimeByID(ctx, malID) if err != nil { - // we don't have it, let's fetch it - animeDetails, err := w.client.GetAnimeByID(ctx, malID) - if err != nil { - log.Printf("worker: failed to fetch related anime %d: %v", malID, err) - return - } - - _, err = w.db.UpsertAnime(ctx, database.UpsertAnimeParams{ - ID: int64(animeDetails.MalID), - TitleOriginal: animeDetails.Title, - TitleEnglish: sql.NullString{String: animeDetails.TitleEnglish, Valid: animeDetails.TitleEnglish != ""}, - TitleJapanese: sql.NullString{String: animeDetails.TitleJapanese, Valid: animeDetails.TitleJapanese != ""}, - ImageUrl: animeDetails.ImageURL(), - Airing: sql.NullBool{Bool: animeDetails.Airing, Valid: true}, - }) - if err != nil { - log.Printf("worker: failed to insert related anime %d: %v", malID, err) - return - } - - err = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{ - Status: sql.NullString{String: animeDetails.Status, Valid: true}, - ID: int64(animeDetails.MalID), - }) - if err != nil { - log.Printf("worker: failed to update status for related anime %d: %v", malID, err) - } - - time.Sleep(400 * time.Millisecond) - } else { - // We have it, but maybe status is outdated. Fetching every time might be too much, - // but since it's a Sequel to something they watched, we could fetch it. - // For now, let's just let the worker naturally pick it up if it gets added to watchlist, - // OR we can explicitly fetch its details to keep sequels up to date. - animeDetails, err := w.client.GetAnimeByID(ctx, malID) - if err == nil { - if err := w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{ - Status: sql.NullString{String: animeDetails.Status, Valid: true}, - ID: int64(animeDetails.MalID), - }); err != nil { - log.Printf("worker: failed to update status for anime %d: %v", animeDetails.MalID, err) - } - } - time.Sleep(400 * time.Millisecond) + log.Printf("worker: failed to fetch related anime %d: %v", malID, err) + return } + + _, err = w.db.UpsertAnime(ctx, database.UpsertAnimeParams{ + ID: int64(animeDetails.MalID), + TitleOriginal: animeDetails.Title, + TitleEnglish: sql.NullString{String: animeDetails.TitleEnglish, Valid: animeDetails.TitleEnglish != ""}, + TitleJapanese: sql.NullString{String: animeDetails.TitleJapanese, Valid: animeDetails.TitleJapanese != ""}, + ImageUrl: animeDetails.ImageURL(), + Airing: sql.NullBool{Bool: animeDetails.Airing, Valid: true}, + }) + if err != nil { + log.Printf("worker: failed to insert related anime %d: %v", malID, err) + } + + _ = w.db.UpdateAnimeStatus(ctx, database.UpdateAnimeStatusParams{ + Status: sql.NullString{String: animeDetails.Status, Valid: true}, + ID: int64(animeDetails.MalID), + }) } diff --git a/templates/renderer.go b/templates/renderer.go index e0f8ff7..7406d33 100644 --- a/templates/renderer.go +++ b/templates/renderer.go @@ -85,10 +85,19 @@ func GetRenderer() *Renderer { "toFloat": func(a int) float64 { return float64(a) }, - "seq": func(start, end int) []int { - res := make([]int, 0, end-start) - for i := start; i < end; i++ { - res = append(res, i) + "seq": func(v any) []int { + var count int + switch n := v.(type) { + case int: + count = n + case int64: + count = int(n) + default: + count = 0 + } + res := make([]int, count) + for i := 0; i < count; i++ { + res[i] = i } return res },