diff --git a/cmd/server/main.go b/cmd/server/main.go index 7d5c4ca..5ef8547 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -36,7 +36,7 @@ func main() { queries := database.New(db) authService := auth.NewService(queries) - jikanClient := jikan.NewClient() + jikanClient := jikan.NewClient(queries) // Start background workers relationsWorker := worker.New(queries, jikanClient) diff --git a/internal/database/models.go b/internal/database/models.go index 00aaab4..34cd7d0 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -35,6 +35,13 @@ type AnimeRelation struct { RelationType string `json:"relation_type"` } +type JikanCache struct { + Key string `json:"key"` + Data string `json:"data"` + ExpiresAt time.Time `json:"expires_at"` + CreatedAt time.Time `json:"created_at"` +} + type NotificationPreference struct { ID string `json:"id"` UserID string `json:"user_id"` diff --git a/internal/database/querier.go b/internal/database/querier.go index e335c76..4d94fcc 100644 --- a/internal/database/querier.go +++ b/internal/database/querier.go @@ -11,11 +11,13 @@ import ( type Querier interface { CreateSession(ctx context.Context, arg CreateSessionParams) (Session, error) CreateUser(ctx context.Context, arg CreateUserParams) (User, error) + DeleteExpiredJikanCache(ctx context.Context) error DeleteSession(ctx context.Context, id string) error DeleteUserSessions(ctx context.Context, userID string) error DeleteWatchListEntry(ctx context.Context, arg DeleteWatchListEntryParams) error GetAnime(ctx context.Context, id int64) (Anime, error) GetAnimeNeedingRelationSync(ctx context.Context) ([]GetAnimeNeedingRelationSyncRow, error) + GetJikanCache(ctx context.Context, key string) (string, error) GetSession(ctx context.Context, id string) (Session, error) GetUpcomingSeasons(ctx context.Context, userID string) ([]GetUpcomingSeasonsRow, error) GetUser(ctx context.Context, id string) (User, error) @@ -24,6 +26,7 @@ type Querier interface { GetWatchListEntry(ctx context.Context, arg GetWatchListEntryParams) (WatchListEntry, error) GetWatchingAnime(ctx context.Context, userID string) ([]GetWatchingAnimeRow, error) MarkRelationsSynced(ctx context.Context, id int64) error + SetJikanCache(ctx context.Context, arg SetJikanCacheParams) error UpdateAnimeStatus(ctx context.Context, arg UpdateAnimeStatusParams) error UpsertAnime(ctx context.Context, arg UpsertAnimeParams) (Anime, error) UpsertAnimeRelation(ctx context.Context, arg UpsertAnimeRelationParams) error diff --git a/internal/database/queries.sql b/internal/database/queries.sql index 095c6d5..293f698 100644 --- a/internal/database/queries.sql +++ b/internal/database/queries.sql @@ -151,3 +151,18 @@ WHERE related.status IN ('Not yet aired', 'Currently Airing') WHERE we.user_id = sc.user_id AND we.anime_id = related.id ) ORDER BY related.id DESC; + +-- name: GetJikanCache :one +SELECT data FROM jikan_cache +WHERE key = ? AND expires_at > CURRENT_TIMESTAMP LIMIT 1; + +-- name: SetJikanCache :exec +INSERT INTO jikan_cache (key, data, expires_at) +VALUES (?, ?, ?) +ON CONFLICT (key) DO UPDATE SET + data = excluded.data, + expires_at = excluded.expires_at, + created_at = CURRENT_TIMESTAMP; + +-- name: DeleteExpiredJikanCache :exec +DELETE FROM jikan_cache WHERE expires_at <= CURRENT_TIMESTAMP; diff --git a/internal/database/queries.sql.go b/internal/database/queries.sql.go index 60f7f7c..e2bd9cc 100644 --- a/internal/database/queries.sql.go +++ b/internal/database/queries.sql.go @@ -59,6 +59,15 @@ func (q *Queries) CreateUser(ctx context.Context, arg CreateUserParams) (User, e return i, err } +const deleteExpiredJikanCache = `-- name: DeleteExpiredJikanCache :exec +DELETE FROM jikan_cache WHERE expires_at <= CURRENT_TIMESTAMP +` + +func (q *Queries) DeleteExpiredJikanCache(ctx context.Context) error { + _, err := q.db.ExecContext(ctx, deleteExpiredJikanCache) + return err +} + const deleteSession = `-- name: DeleteSession :exec DELETE FROM session WHERE id = ? ` @@ -164,6 +173,18 @@ func (q *Queries) GetAnimeNeedingRelationSync(ctx context.Context) ([]GetAnimeNe return items, nil } +const getJikanCache = `-- name: GetJikanCache :one +SELECT data FROM jikan_cache +WHERE key = ? AND expires_at > CURRENT_TIMESTAMP LIMIT 1 +` + +func (q *Queries) GetJikanCache(ctx context.Context, key string) (string, error) { + row := q.db.QueryRowContext(ctx, getJikanCache, key) + var data string + err := row.Scan(&data) + return data, err +} + const getSession = `-- name: GetSession :one SELECT id, user_id, expires_at, created_at FROM session WHERE id = ? LIMIT 1 ` @@ -468,6 +489,26 @@ func (q *Queries) MarkRelationsSynced(ctx context.Context, id int64) error { return err } +const setJikanCache = `-- name: SetJikanCache :exec +INSERT INTO jikan_cache (key, data, expires_at) +VALUES (?, ?, ?) +ON CONFLICT (key) DO UPDATE SET + data = excluded.data, + expires_at = excluded.expires_at, + created_at = CURRENT_TIMESTAMP +` + +type SetJikanCacheParams struct { + Key string `json:"key"` + Data string `json:"data"` + ExpiresAt time.Time `json:"expires_at"` +} + +func (q *Queries) SetJikanCache(ctx context.Context, arg SetJikanCacheParams) error { + _, err := q.db.ExecContext(ctx, setJikanCache, arg.Key, arg.Data, arg.ExpiresAt) + return err +} + const updateAnimeStatus = `-- name: UpdateAnimeStatus :exec UPDATE anime SET status = ? WHERE id = ? ` diff --git a/internal/jikan/anime.go b/internal/jikan/anime.go index bd44838..5e974df 100644 --- a/internal/jikan/anime.go +++ b/internal/jikan/anime.go @@ -1,10 +1,15 @@ package jikan -import "fmt" +import ( + "fmt" + "time" +) // GetAnimeByID fetches full details for a single anime func (c *Client) GetAnimeByID(id int) (Anime, error) { - if cached, ok := c.animeCache.Get(id); ok { + cacheKey := fmt.Sprintf("anime:%d", id) + var cached Anime + if c.getCache(cacheKey, &cached) { return cached, nil } @@ -14,6 +19,6 @@ func (c *Client) GetAnimeByID(id int) (Anime, error) { return Anime{}, err } - c.animeCache.Add(id, result.Data) + c.setCache(cacheKey, result.Data, time.Hour*24) return result.Data, nil } diff --git a/internal/jikan/client.go b/internal/jikan/client.go index b5f0f38..07917b7 100644 --- a/internal/jikan/client.go +++ b/internal/jikan/client.go @@ -1,51 +1,58 @@ package jikan import ( + "context" "encoding/json" "fmt" "net/http" "time" - "github.com/hashicorp/golang-lru/v2/expirable" + "mal/internal/database" ) type Client struct { - httpClient *http.Client - baseURL string - cache *expirable.LRU[string, SearchResult] - topCache *expirable.LRU[int, TopAnimeResult] - airingCache *expirable.LRU[int, TopAnimeResult] - upcomingCache *expirable.LRU[int, TopAnimeResult] - animeCache *expirable.LRU[int, Anime] - relationsCache *expirable.LRU[int, JikanRelationsResponse] - scheduleCache *expirable.LRU[string, ScheduleResult] - recsCache *expirable.LRU[int, []Anime] + httpClient *http.Client + baseURL string + db database.Querier } -func NewClient() *Client { - cache := expirable.NewLRU[string, SearchResult](500, nil, time.Hour*1) - topCache := expirable.NewLRU[int, TopAnimeResult](100, nil, time.Hour*1) - airingCache := expirable.NewLRU[int, TopAnimeResult](100, nil, time.Hour*1) - upcomingCache := expirable.NewLRU[int, TopAnimeResult](100, nil, time.Hour*1) - animeCache := expirable.NewLRU[int, Anime](1000, nil, time.Hour*24) - relationsCache := expirable.NewLRU[int, JikanRelationsResponse](1000, nil, time.Hour*24) - scheduleCache := expirable.NewLRU[string, ScheduleResult](50, nil, time.Hour*1) - recsCache := expirable.NewLRU[int, []Anime](500, nil, time.Hour*24) - +func NewClient(db database.Querier) *Client { return &Client{ - httpClient: &http.Client{Timeout: 10 * time.Second}, - baseURL: "https://api.jikan.moe/v4", - cache: cache, - topCache: topCache, - airingCache: airingCache, - upcomingCache: upcomingCache, - animeCache: animeCache, - relationsCache: relationsCache, - scheduleCache: scheduleCache, - recsCache: recsCache, + httpClient: &http.Client{Timeout: 10 * time.Second}, + baseURL: "https://api.jikan.moe/v4", + db: db, } } +func (c *Client) getCache(key string, out interface{}) bool { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + data, err := c.db.GetJikanCache(ctx, key) + if err != nil { + return false + } + + err = json.Unmarshal([]byte(data), out) + return err == nil +} + +func (c *Client) setCache(key string, data interface{}, ttl time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + bytes, err := json.Marshal(data) + if err != nil { + return + } + + _ = c.db.SetJikanCache(ctx, database.SetJikanCacheParams{ + Key: key, + Data: string(bytes), + ExpiresAt: time.Now().Add(ttl), + }) +} + // fetchWithRetry provides robust fetching respecting Jikan's strict 3 req/sec rate limit func (c *Client) fetchWithRetry(urlStr string, out interface{}) error { maxRetries := 3 diff --git a/internal/jikan/recommendations.go b/internal/jikan/recommendations.go index 62078f9..674e36c 100644 --- a/internal/jikan/recommendations.go +++ b/internal/jikan/recommendations.go @@ -1,6 +1,9 @@ package jikan -import "fmt" +import ( + "fmt" + "time" +) // RecommendationEntry represents a single recommendation type RecommendationEntry struct { @@ -23,7 +26,9 @@ type RecommendationsResponse struct { // GetRecommendations fetches full details for the top recommended anime func (c *Client) GetRecommendations(animeID int, limit int) ([]Anime, error) { - if cached, ok := c.recsCache.Get(animeID); ok { + cacheKey := fmt.Sprintf("recs:%d", animeID) + var cached []Anime + if c.getCache(cacheKey, &cached) { if len(cached) > limit { return cached[:limit], nil } @@ -71,6 +76,6 @@ func (c *Client) GetRecommendations(animeID int, limit int) ([]Anime, error) { } } - c.recsCache.Add(animeID, animes) + c.setCache(cacheKey, animes, time.Hour*24) return animes, nil } diff --git a/internal/jikan/relations.go b/internal/jikan/relations.go index 2eef6fd..4d3cc24 100644 --- a/internal/jikan/relations.go +++ b/internal/jikan/relations.go @@ -1,10 +1,15 @@ package jikan -import "fmt" +import ( + "fmt" + "time" +) // GetRelationsData fetches the raw relationships for an anime func (c *Client) GetRelationsData(id int) (JikanRelationsResponse, error) { - if cached, ok := c.relationsCache.Get(id); ok { + cacheKey := fmt.Sprintf("relations:%d", id) + var cached JikanRelationsResponse + if c.getCache(cacheKey, &cached) { return cached, nil } @@ -14,7 +19,7 @@ func (c *Client) GetRelationsData(id int) (JikanRelationsResponse, error) { return JikanRelationsResponse{}, err } - c.relationsCache.Add(id, result) + c.setCache(cacheKey, result, time.Hour*24) return result, nil } diff --git a/internal/jikan/search.go b/internal/jikan/search.go index 94999e8..3d814ad 100644 --- a/internal/jikan/search.go +++ b/internal/jikan/search.go @@ -3,6 +3,7 @@ package jikan import ( "fmt" "net/url" + "time" ) // Search returns the anime list with pagination support @@ -15,7 +16,8 @@ func (c *Client) Search(query string, page int) (SearchResult, error) { } cacheKey := fmt.Sprintf("search:%s:%d", query, page) - if cached, ok := c.cache.Get(cacheKey); ok { + var cached SearchResult + if c.getCache(cacheKey, &cached) { return cached, nil } @@ -30,7 +32,7 @@ func (c *Client) Search(query string, page int) (SearchResult, error) { HasNextPage: result.Pagination.HasNextPage, } - c.cache.Add(cacheKey, res) + c.setCache(cacheKey, res, time.Hour*1) return res, nil } @@ -39,7 +41,9 @@ func (c *Client) GetTopAnime(page int) (TopAnimeResult, error) { if page < 1 { page = 1 } - if cached, ok := c.topCache.Get(page); ok { + cacheKey := fmt.Sprintf("top:%d", page) + var cached TopAnimeResult + if c.getCache(cacheKey, &cached) { return cached, nil } @@ -54,6 +58,6 @@ func (c *Client) GetTopAnime(page int) (TopAnimeResult, error) { HasNextPage: result.Pagination.HasNextPage, } - c.topCache.Add(page, res) + c.setCache(cacheKey, res, time.Hour*1) return res, nil } diff --git a/internal/jikan/seasons.go b/internal/jikan/seasons.go index acfb212..fae5296 100644 --- a/internal/jikan/seasons.go +++ b/internal/jikan/seasons.go @@ -3,6 +3,7 @@ package jikan import ( "fmt" "strings" + "time" ) // ScheduleResult contains anime grouped by day @@ -17,7 +18,8 @@ func (c *Client) GetSchedule(day string) (ScheduleResult, error) { day = strings.ToLower(day) cacheKey := fmt.Sprintf("schedule_%s", day) - if cached, ok := c.scheduleCache.Get(cacheKey); ok { + var cached ScheduleResult + if c.getCache(cacheKey, &cached) { return cached, nil } @@ -32,7 +34,7 @@ func (c *Client) GetSchedule(day string) (ScheduleResult, error) { HasNextPage: result.Pagination.HasNextPage, } - c.scheduleCache.Add(cacheKey, res) + c.setCache(cacheKey, res, time.Hour*1) return res, nil } @@ -57,7 +59,9 @@ func (c *Client) GetSeasonsNow(page int) (TopAnimeResult, error) { if page < 1 { page = 1 } - if cached, ok := c.airingCache.Get(page); ok { + cacheKey := fmt.Sprintf("seasons_now:%d", page) + var cached TopAnimeResult + if c.getCache(cacheKey, &cached) { return cached, nil } @@ -72,7 +76,7 @@ func (c *Client) GetSeasonsNow(page int) (TopAnimeResult, error) { HasNextPage: result.Pagination.HasNextPage, } - c.airingCache.Add(page, res) + c.setCache(cacheKey, res, time.Hour*1) return res, nil } @@ -81,7 +85,9 @@ func (c *Client) GetSeasonsUpcoming(page int) (TopAnimeResult, error) { if page < 1 { page = 1 } - if cached, ok := c.upcomingCache.Get(page); ok { + cacheKey := fmt.Sprintf("seasons_upcoming:%d", page) + var cached TopAnimeResult + if c.getCache(cacheKey, &cached) { return cached, nil } @@ -96,6 +102,6 @@ func (c *Client) GetSeasonsUpcoming(page int) (TopAnimeResult, error) { HasNextPage: result.Pagination.HasNextPage, } - c.upcomingCache.Add(page, res) + c.setCache(cacheKey, res, time.Hour*1) return res, nil } diff --git a/internal/worker/relations.go b/internal/worker/relations.go index a488fb0..e7b011b 100644 --- a/internal/worker/relations.go +++ b/internal/worker/relations.go @@ -29,6 +29,9 @@ func (w *Worker) Start(ctx context.Context) { // Run once immediately w.syncRelations(ctx) + w.cleanupCache(ctx) + + cleanupCounter := 0 for { select { @@ -36,10 +39,24 @@ func (w *Worker) Start(ctx context.Context) { return case <-ticker.C: w.syncRelations(ctx) + + // Clean up cache every 60 runs (approx 1 hour) + cleanupCounter++ + if cleanupCounter >= 60 { + w.cleanupCache(ctx) + cleanupCounter = 0 + } } } } +func (w *Worker) cleanupCache(ctx context.Context) { + err := w.db.DeleteExpiredJikanCache(ctx) + if err != nil { + log.Printf("worker: failed to clean up expired jikan cache: %v", err) + } +} + func (w *Worker) syncRelations(ctx context.Context) { // Find up to 20 anime that need their relations synced animes, err := w.db.GetAnimeNeedingRelationSync(ctx) diff --git a/migrations/006_add_jikan_cache.sql b/migrations/006_add_jikan_cache.sql new file mode 100644 index 0000000..bc4852a --- /dev/null +++ b/migrations/006_add_jikan_cache.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS jikan_cache ( + key TEXT PRIMARY KEY, + data TEXT NOT NULL, + expires_at DATETIME NOT NULL, + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +);