diff --git a/internal/worker/relations.go b/internal/worker/relations.go deleted file mode 100644 index 5b6d13a..0000000 --- a/internal/worker/relations.go +++ /dev/null @@ -1,235 +0,0 @@ -package worker - -import ( - "context" - "database/sql" - "fmt" - "log" - "sync" - "time" - - "mal/integrations/jikan" - "mal/internal/db" -) - -type Worker struct { - db *db.Queries - client *jikan.Client -} - -func New(db *db.Queries, client *jikan.Client) *Worker { - return &Worker{ - db: db, - client: client, - } -} - -func (w *Worker) Start(ctx context.Context) { - log.Println("Starting relations sync worker...") - // ticker: regular sync; retryTicker: check for failed API retries - ticker := time.NewTicker(1 * time.Minute) - retryTicker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - defer retryTicker.Stop() - - // Run once immediately on startup - w.runAllTasks(ctx) - - cleanupCounter := 0 - - for { - select { - case <-ctx.Done(): - return - case <-w.client.RetrySignal(): - w.processAnimeFetchRetries(ctx) - case <-retryTicker.C: - w.processAnimeFetchRetries(ctx) - case <-ticker.C: - w.syncRelations(ctx) - - cleanupCounter++ - if cleanupCounter >= 60 { - w.cleanupCache(ctx) - cleanupCounter = 0 - } - } - } -} - -func (w *Worker) runAllTasks(ctx context.Context) { - var wg sync.WaitGroup - wg.Add(3) - - go func() { - defer wg.Done() - w.syncRelations(ctx) - }() - - go func() { - defer wg.Done() - w.processAnimeFetchRetries(ctx) - }() - - go func() { - defer wg.Done() - w.cleanupCache(ctx) - }() - - wg.Wait() -} - -// retryBackoff calculates the next retry delay, doubling up to 30 minutes max -func retryBackoff(attempts int64) string { - if attempts < 1 { - attempts = 1 - } - - delay := time.Minute - if attempts > 1 { - shift := min(attempts-1, 6) - delay = time.Minute * time.Duration(1< 30*time.Minute { - delay = 30 * time.Minute - } - - minutes := max(int(delay/time.Minute), 1) - return fmt.Sprintf("+%d minutes", minutes) -} - -// processAnimeFetchRetries retries failed Jikan API fetches for anime with pending entries -func (w *Worker) processAnimeFetchRetries(ctx context.Context) { - retries, err := w.db.GetDueAnimeFetchRetries(ctx, 20) - if err != nil { - log.Printf("worker: failed to load due anime fetch retries: %v", err) - return - } - - if len(retries) == 0 { - return - } - - var wg sync.WaitGroup - for _, retry := range retries { - wg.Add(1) - go func(r db.AnimeFetchRetry) { - defer wg.Done() - _, err := w.client.GetAnimeByID(ctx, int(r.AnimeID)) - if err != nil { - if !jikan.IsRetryableError(err) { - _ = w.db.DeleteAnimeFetchRetry(ctx, r.AnimeID) - return - } - - _ = w.db.MarkAnimeFetchRetryFailed(ctx, db.MarkAnimeFetchRetryFailedParams{ - Datetime: retryBackoff(r.Attempts + 1), - LastError: err.Error(), - AnimeID: r.AnimeID, - }) - return - } - _ = w.db.DeleteAnimeFetchRetry(ctx, r.AnimeID) - }(retry) - } - wg.Wait() -} - -func (w *Worker) cleanupCache(ctx context.Context) { - if err := w.db.DeleteExpiredJikanCache(ctx); err != nil { - log.Printf("worker: failed to clean up expired jikan cache: %v", err) - } -} - -// syncRelations fetches relation data for anime that need syncing via a worker pool -func (w *Worker) syncRelations(ctx context.Context) { - animes, err := w.db.GetAnimeNeedingRelationSync(ctx) - if err != nil { - log.Printf("worker error: failed to get anime needing sync: %v", err) - return - } - - if len(animes) == 0 { - return - } - - // Use a small worker pool for Jikan API calls to respect rate limits while maintaining concurrency - const workerCount = 2 - jobs := make(chan db.GetAnimeNeedingRelationSyncRow, len(animes)) - var wg sync.WaitGroup - - for range workerCount { - wg.Go(func() { - for a := range jobs { - w.syncSingleAnime(ctx, a.ID) - } - }) - } - - for _, a := range animes { - jobs <- a - } - close(jobs) - wg.Wait() -} - -// syncSingleAnime fetches relations for one anime, inserts them, and marks it synced. -// For sequels, also ensures the related anime exists in the DB to enable linking. -func (w *Worker) syncSingleAnime(ctx context.Context, id int64) { - animeData, err := w.client.GetAnimeByID(ctx, int(id)) - if err != nil { - log.Printf("worker: failed to fetch anime details for %d: %v", id, err) - return - } - - for _, rel := range animeData.Relations { - for _, entry := range rel.Entry { - if entry.Type == "anime" { - err := w.db.UpsertAnimeRelation(ctx, db.UpsertAnimeRelationParams{ - AnimeID: id, - RelatedAnimeID: int64(entry.MalID), - RelationType: rel.Relation, - }) - if err != nil { - log.Printf("worker: failed to insert relation %d -> %d: %v", id, entry.MalID, err) - } - - if rel.Relation == "Sequel" { - w.ensureAnimeExistsAndStatusUpdated(ctx, entry.MalID) - } - } - } - } - - _ = w.db.UpdateAnimeStatus(ctx, db.UpdateAnimeStatusParams{ - Status: sql.NullString{String: animeData.Status, Valid: true}, - ID: id, - }) - _ = w.db.MarkRelationsSynced(ctx, id) -} - -func (w *Worker) ensureAnimeExistsAndStatusUpdated(ctx context.Context, malID int) { - animeDetails, err := w.client.GetAnimeByID(ctx, malID) - if err != nil { - log.Printf("worker: failed to fetch related anime %d: %v", malID, err) - return - } - - _, err = w.db.UpsertAnime(ctx, db.UpsertAnimeParams{ - ID: int64(animeDetails.MalID), - TitleOriginal: animeDetails.Title, - TitleEnglish: sql.NullString{String: animeDetails.TitleEnglish, Valid: animeDetails.TitleEnglish != ""}, - TitleJapanese: sql.NullString{String: animeDetails.TitleJapanese, Valid: animeDetails.TitleJapanese != ""}, - ImageUrl: animeDetails.ImageURL(), - Airing: sql.NullBool{Bool: animeDetails.Airing, Valid: true}, - }) - if err != nil { - log.Printf("worker: failed to insert related anime %d: %v", malID, err) - } - - _ = w.db.UpdateAnimeStatus(ctx, db.UpdateAnimeStatusParams{ - Status: sql.NullString{String: animeDetails.Status, Valid: true}, - ID: int64(animeDetails.MalID), - }) -}