chore: remove unreachable worker package
This commit is contained in:
@@ -1,235 +0,0 @@
|
|||||||
package worker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"mal/integrations/jikan"
|
|
||||||
"mal/internal/db"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Worker struct {
|
|
||||||
db *db.Queries
|
|
||||||
client *jikan.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(db *db.Queries, client *jikan.Client) *Worker {
|
|
||||||
return &Worker{
|
|
||||||
db: db,
|
|
||||||
client: client,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) Start(ctx context.Context) {
|
|
||||||
log.Println("Starting relations sync worker...")
|
|
||||||
// ticker: regular sync; retryTicker: check for failed API retries
|
|
||||||
ticker := time.NewTicker(1 * time.Minute)
|
|
||||||
retryTicker := time.NewTicker(30 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
defer retryTicker.Stop()
|
|
||||||
|
|
||||||
// Run once immediately on startup
|
|
||||||
w.runAllTasks(ctx)
|
|
||||||
|
|
||||||
cleanupCounter := 0
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-w.client.RetrySignal():
|
|
||||||
w.processAnimeFetchRetries(ctx)
|
|
||||||
case <-retryTicker.C:
|
|
||||||
w.processAnimeFetchRetries(ctx)
|
|
||||||
case <-ticker.C:
|
|
||||||
w.syncRelations(ctx)
|
|
||||||
|
|
||||||
cleanupCounter++
|
|
||||||
if cleanupCounter >= 60 {
|
|
||||||
w.cleanupCache(ctx)
|
|
||||||
cleanupCounter = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) runAllTasks(ctx context.Context) {
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(3)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
w.syncRelations(ctx)
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
w.processAnimeFetchRetries(ctx)
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
w.cleanupCache(ctx)
|
|
||||||
}()
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// retryBackoff calculates the next retry delay, doubling up to 30 minutes max
|
|
||||||
func retryBackoff(attempts int64) string {
|
|
||||||
if attempts < 1 {
|
|
||||||
attempts = 1
|
|
||||||
}
|
|
||||||
|
|
||||||
delay := time.Minute
|
|
||||||
if attempts > 1 {
|
|
||||||
shift := min(attempts-1, 6)
|
|
||||||
delay = time.Minute * time.Duration(1<<shift)
|
|
||||||
}
|
|
||||||
|
|
||||||
if delay > 30*time.Minute {
|
|
||||||
delay = 30 * time.Minute
|
|
||||||
}
|
|
||||||
|
|
||||||
minutes := max(int(delay/time.Minute), 1)
|
|
||||||
return fmt.Sprintf("+%d minutes", minutes)
|
|
||||||
}
|
|
||||||
|
|
||||||
// processAnimeFetchRetries retries failed Jikan API fetches for anime with pending entries
|
|
||||||
func (w *Worker) processAnimeFetchRetries(ctx context.Context) {
|
|
||||||
retries, err := w.db.GetDueAnimeFetchRetries(ctx, 20)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("worker: failed to load due anime fetch retries: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(retries) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
for _, retry := range retries {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(r db.AnimeFetchRetry) {
|
|
||||||
defer wg.Done()
|
|
||||||
_, err := w.client.GetAnimeByID(ctx, int(r.AnimeID))
|
|
||||||
if err != nil {
|
|
||||||
if !jikan.IsRetryableError(err) {
|
|
||||||
_ = w.db.DeleteAnimeFetchRetry(ctx, r.AnimeID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = w.db.MarkAnimeFetchRetryFailed(ctx, db.MarkAnimeFetchRetryFailedParams{
|
|
||||||
Datetime: retryBackoff(r.Attempts + 1),
|
|
||||||
LastError: err.Error(),
|
|
||||||
AnimeID: r.AnimeID,
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_ = w.db.DeleteAnimeFetchRetry(ctx, r.AnimeID)
|
|
||||||
}(retry)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) cleanupCache(ctx context.Context) {
|
|
||||||
if err := w.db.DeleteExpiredJikanCache(ctx); err != nil {
|
|
||||||
log.Printf("worker: failed to clean up expired jikan cache: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// syncRelations fetches relation data for anime that need syncing via a worker pool
|
|
||||||
func (w *Worker) syncRelations(ctx context.Context) {
|
|
||||||
animes, err := w.db.GetAnimeNeedingRelationSync(ctx)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("worker error: failed to get anime needing sync: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(animes) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use a small worker pool for Jikan API calls to respect rate limits while maintaining concurrency
|
|
||||||
const workerCount = 2
|
|
||||||
jobs := make(chan db.GetAnimeNeedingRelationSyncRow, len(animes))
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
|
|
||||||
for range workerCount {
|
|
||||||
wg.Go(func() {
|
|
||||||
for a := range jobs {
|
|
||||||
w.syncSingleAnime(ctx, a.ID)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, a := range animes {
|
|
||||||
jobs <- a
|
|
||||||
}
|
|
||||||
close(jobs)
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// syncSingleAnime fetches relations for one anime, inserts them, and marks it synced.
|
|
||||||
// For sequels, also ensures the related anime exists in the DB to enable linking.
|
|
||||||
func (w *Worker) syncSingleAnime(ctx context.Context, id int64) {
|
|
||||||
animeData, err := w.client.GetAnimeByID(ctx, int(id))
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("worker: failed to fetch anime details for %d: %v", id, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, rel := range animeData.Relations {
|
|
||||||
for _, entry := range rel.Entry {
|
|
||||||
if entry.Type == "anime" {
|
|
||||||
err := w.db.UpsertAnimeRelation(ctx, db.UpsertAnimeRelationParams{
|
|
||||||
AnimeID: id,
|
|
||||||
RelatedAnimeID: int64(entry.MalID),
|
|
||||||
RelationType: rel.Relation,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("worker: failed to insert relation %d -> %d: %v", id, entry.MalID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if rel.Relation == "Sequel" {
|
|
||||||
w.ensureAnimeExistsAndStatusUpdated(ctx, entry.MalID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = w.db.UpdateAnimeStatus(ctx, db.UpdateAnimeStatusParams{
|
|
||||||
Status: sql.NullString{String: animeData.Status, Valid: true},
|
|
||||||
ID: id,
|
|
||||||
})
|
|
||||||
_ = w.db.MarkRelationsSynced(ctx, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) ensureAnimeExistsAndStatusUpdated(ctx context.Context, malID int) {
|
|
||||||
animeDetails, err := w.client.GetAnimeByID(ctx, malID)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("worker: failed to fetch related anime %d: %v", malID, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = w.db.UpsertAnime(ctx, db.UpsertAnimeParams{
|
|
||||||
ID: int64(animeDetails.MalID),
|
|
||||||
TitleOriginal: animeDetails.Title,
|
|
||||||
TitleEnglish: sql.NullString{String: animeDetails.TitleEnglish, Valid: animeDetails.TitleEnglish != ""},
|
|
||||||
TitleJapanese: sql.NullString{String: animeDetails.TitleJapanese, Valid: animeDetails.TitleJapanese != ""},
|
|
||||||
ImageUrl: animeDetails.ImageURL(),
|
|
||||||
Airing: sql.NullBool{Bool: animeDetails.Airing, Valid: true},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("worker: failed to insert related anime %d: %v", malID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = w.db.UpdateAnimeStatus(ctx, db.UpdateAnimeStatusParams{
|
|
||||||
Status: sql.NullString{String: animeDetails.Status, Valid: true},
|
|
||||||
ID: int64(animeDetails.MalID),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user