feat: use sqlite for jikan api cache with hourly cleanup
This commit is contained in:
@@ -36,7 +36,7 @@ func main() {
|
||||
|
||||
queries := database.New(db)
|
||||
authService := auth.NewService(queries)
|
||||
jikanClient := jikan.NewClient()
|
||||
jikanClient := jikan.NewClient(queries)
|
||||
|
||||
// Start background workers
|
||||
relationsWorker := worker.New(queries, jikanClient)
|
||||
|
||||
@@ -35,6 +35,13 @@ type AnimeRelation struct {
|
||||
RelationType string `json:"relation_type"`
|
||||
}
|
||||
|
||||
type JikanCache struct {
|
||||
Key string `json:"key"`
|
||||
Data string `json:"data"`
|
||||
ExpiresAt time.Time `json:"expires_at"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
type NotificationPreference struct {
|
||||
ID string `json:"id"`
|
||||
UserID string `json:"user_id"`
|
||||
|
||||
@@ -11,11 +11,13 @@ import (
|
||||
type Querier interface {
|
||||
CreateSession(ctx context.Context, arg CreateSessionParams) (Session, error)
|
||||
CreateUser(ctx context.Context, arg CreateUserParams) (User, error)
|
||||
DeleteExpiredJikanCache(ctx context.Context) error
|
||||
DeleteSession(ctx context.Context, id string) error
|
||||
DeleteUserSessions(ctx context.Context, userID string) error
|
||||
DeleteWatchListEntry(ctx context.Context, arg DeleteWatchListEntryParams) error
|
||||
GetAnime(ctx context.Context, id int64) (Anime, error)
|
||||
GetAnimeNeedingRelationSync(ctx context.Context) ([]GetAnimeNeedingRelationSyncRow, error)
|
||||
GetJikanCache(ctx context.Context, key string) (string, error)
|
||||
GetSession(ctx context.Context, id string) (Session, error)
|
||||
GetUpcomingSeasons(ctx context.Context, userID string) ([]GetUpcomingSeasonsRow, error)
|
||||
GetUser(ctx context.Context, id string) (User, error)
|
||||
@@ -24,6 +26,7 @@ type Querier interface {
|
||||
GetWatchListEntry(ctx context.Context, arg GetWatchListEntryParams) (WatchListEntry, error)
|
||||
GetWatchingAnime(ctx context.Context, userID string) ([]GetWatchingAnimeRow, error)
|
||||
MarkRelationsSynced(ctx context.Context, id int64) error
|
||||
SetJikanCache(ctx context.Context, arg SetJikanCacheParams) error
|
||||
UpdateAnimeStatus(ctx context.Context, arg UpdateAnimeStatusParams) error
|
||||
UpsertAnime(ctx context.Context, arg UpsertAnimeParams) (Anime, error)
|
||||
UpsertAnimeRelation(ctx context.Context, arg UpsertAnimeRelationParams) error
|
||||
|
||||
@@ -151,3 +151,18 @@ WHERE related.status IN ('Not yet aired', 'Currently Airing')
|
||||
WHERE we.user_id = sc.user_id AND we.anime_id = related.id
|
||||
)
|
||||
ORDER BY related.id DESC;
|
||||
|
||||
-- name: GetJikanCache :one
|
||||
SELECT data FROM jikan_cache
|
||||
WHERE key = ? AND expires_at > CURRENT_TIMESTAMP LIMIT 1;
|
||||
|
||||
-- name: SetJikanCache :exec
|
||||
INSERT INTO jikan_cache (key, data, expires_at)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT (key) DO UPDATE SET
|
||||
data = excluded.data,
|
||||
expires_at = excluded.expires_at,
|
||||
created_at = CURRENT_TIMESTAMP;
|
||||
|
||||
-- name: DeleteExpiredJikanCache :exec
|
||||
DELETE FROM jikan_cache WHERE expires_at <= CURRENT_TIMESTAMP;
|
||||
|
||||
@@ -59,6 +59,15 @@ func (q *Queries) CreateUser(ctx context.Context, arg CreateUserParams) (User, e
|
||||
return i, err
|
||||
}
|
||||
|
||||
const deleteExpiredJikanCache = `-- name: DeleteExpiredJikanCache :exec
|
||||
DELETE FROM jikan_cache WHERE expires_at <= CURRENT_TIMESTAMP
|
||||
`
|
||||
|
||||
func (q *Queries) DeleteExpiredJikanCache(ctx context.Context) error {
|
||||
_, err := q.db.ExecContext(ctx, deleteExpiredJikanCache)
|
||||
return err
|
||||
}
|
||||
|
||||
const deleteSession = `-- name: DeleteSession :exec
|
||||
DELETE FROM session WHERE id = ?
|
||||
`
|
||||
@@ -164,6 +173,18 @@ func (q *Queries) GetAnimeNeedingRelationSync(ctx context.Context) ([]GetAnimeNe
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getJikanCache = `-- name: GetJikanCache :one
|
||||
SELECT data FROM jikan_cache
|
||||
WHERE key = ? AND expires_at > CURRENT_TIMESTAMP LIMIT 1
|
||||
`
|
||||
|
||||
func (q *Queries) GetJikanCache(ctx context.Context, key string) (string, error) {
|
||||
row := q.db.QueryRowContext(ctx, getJikanCache, key)
|
||||
var data string
|
||||
err := row.Scan(&data)
|
||||
return data, err
|
||||
}
|
||||
|
||||
const getSession = `-- name: GetSession :one
|
||||
SELECT id, user_id, expires_at, created_at FROM session WHERE id = ? LIMIT 1
|
||||
`
|
||||
@@ -468,6 +489,26 @@ func (q *Queries) MarkRelationsSynced(ctx context.Context, id int64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
const setJikanCache = `-- name: SetJikanCache :exec
|
||||
INSERT INTO jikan_cache (key, data, expires_at)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT (key) DO UPDATE SET
|
||||
data = excluded.data,
|
||||
expires_at = excluded.expires_at,
|
||||
created_at = CURRENT_TIMESTAMP
|
||||
`
|
||||
|
||||
type SetJikanCacheParams struct {
|
||||
Key string `json:"key"`
|
||||
Data string `json:"data"`
|
||||
ExpiresAt time.Time `json:"expires_at"`
|
||||
}
|
||||
|
||||
func (q *Queries) SetJikanCache(ctx context.Context, arg SetJikanCacheParams) error {
|
||||
_, err := q.db.ExecContext(ctx, setJikanCache, arg.Key, arg.Data, arg.ExpiresAt)
|
||||
return err
|
||||
}
|
||||
|
||||
const updateAnimeStatus = `-- name: UpdateAnimeStatus :exec
|
||||
UPDATE anime SET status = ? WHERE id = ?
|
||||
`
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
package jikan
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// GetAnimeByID fetches full details for a single anime
|
||||
func (c *Client) GetAnimeByID(id int) (Anime, error) {
|
||||
if cached, ok := c.animeCache.Get(id); ok {
|
||||
cacheKey := fmt.Sprintf("anime:%d", id)
|
||||
var cached Anime
|
||||
if c.getCache(cacheKey, &cached) {
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
@@ -14,6 +19,6 @@ func (c *Client) GetAnimeByID(id int) (Anime, error) {
|
||||
return Anime{}, err
|
||||
}
|
||||
|
||||
c.animeCache.Add(id, result.Data)
|
||||
c.setCache(cacheKey, result.Data, time.Hour*24)
|
||||
return result.Data, nil
|
||||
}
|
||||
|
||||
@@ -1,51 +1,58 @@
|
||||
package jikan
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||
"mal/internal/database"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
httpClient *http.Client
|
||||
baseURL string
|
||||
cache *expirable.LRU[string, SearchResult]
|
||||
topCache *expirable.LRU[int, TopAnimeResult]
|
||||
airingCache *expirable.LRU[int, TopAnimeResult]
|
||||
upcomingCache *expirable.LRU[int, TopAnimeResult]
|
||||
animeCache *expirable.LRU[int, Anime]
|
||||
relationsCache *expirable.LRU[int, JikanRelationsResponse]
|
||||
scheduleCache *expirable.LRU[string, ScheduleResult]
|
||||
recsCache *expirable.LRU[int, []Anime]
|
||||
httpClient *http.Client
|
||||
baseURL string
|
||||
db database.Querier
|
||||
}
|
||||
|
||||
func NewClient() *Client {
|
||||
cache := expirable.NewLRU[string, SearchResult](500, nil, time.Hour*1)
|
||||
topCache := expirable.NewLRU[int, TopAnimeResult](100, nil, time.Hour*1)
|
||||
airingCache := expirable.NewLRU[int, TopAnimeResult](100, nil, time.Hour*1)
|
||||
upcomingCache := expirable.NewLRU[int, TopAnimeResult](100, nil, time.Hour*1)
|
||||
animeCache := expirable.NewLRU[int, Anime](1000, nil, time.Hour*24)
|
||||
relationsCache := expirable.NewLRU[int, JikanRelationsResponse](1000, nil, time.Hour*24)
|
||||
scheduleCache := expirable.NewLRU[string, ScheduleResult](50, nil, time.Hour*1)
|
||||
recsCache := expirable.NewLRU[int, []Anime](500, nil, time.Hour*24)
|
||||
|
||||
func NewClient(db database.Querier) *Client {
|
||||
return &Client{
|
||||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||
baseURL: "https://api.jikan.moe/v4",
|
||||
cache: cache,
|
||||
topCache: topCache,
|
||||
airingCache: airingCache,
|
||||
upcomingCache: upcomingCache,
|
||||
animeCache: animeCache,
|
||||
relationsCache: relationsCache,
|
||||
scheduleCache: scheduleCache,
|
||||
recsCache: recsCache,
|
||||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||
baseURL: "https://api.jikan.moe/v4",
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) getCache(key string, out interface{}) bool {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
data, err := c.db.GetJikanCache(ctx, key)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(data), out)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (c *Client) setCache(key string, data interface{}, ttl time.Duration) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
bytes, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_ = c.db.SetJikanCache(ctx, database.SetJikanCacheParams{
|
||||
Key: key,
|
||||
Data: string(bytes),
|
||||
ExpiresAt: time.Now().Add(ttl),
|
||||
})
|
||||
}
|
||||
|
||||
// fetchWithRetry provides robust fetching respecting Jikan's strict 3 req/sec rate limit
|
||||
func (c *Client) fetchWithRetry(urlStr string, out interface{}) error {
|
||||
maxRetries := 3
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package jikan
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// RecommendationEntry represents a single recommendation
|
||||
type RecommendationEntry struct {
|
||||
@@ -23,7 +26,9 @@ type RecommendationsResponse struct {
|
||||
|
||||
// GetRecommendations fetches full details for the top recommended anime
|
||||
func (c *Client) GetRecommendations(animeID int, limit int) ([]Anime, error) {
|
||||
if cached, ok := c.recsCache.Get(animeID); ok {
|
||||
cacheKey := fmt.Sprintf("recs:%d", animeID)
|
||||
var cached []Anime
|
||||
if c.getCache(cacheKey, &cached) {
|
||||
if len(cached) > limit {
|
||||
return cached[:limit], nil
|
||||
}
|
||||
@@ -71,6 +76,6 @@ func (c *Client) GetRecommendations(animeID int, limit int) ([]Anime, error) {
|
||||
}
|
||||
}
|
||||
|
||||
c.recsCache.Add(animeID, animes)
|
||||
c.setCache(cacheKey, animes, time.Hour*24)
|
||||
return animes, nil
|
||||
}
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
package jikan
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// GetRelationsData fetches the raw relationships for an anime
|
||||
func (c *Client) GetRelationsData(id int) (JikanRelationsResponse, error) {
|
||||
if cached, ok := c.relationsCache.Get(id); ok {
|
||||
cacheKey := fmt.Sprintf("relations:%d", id)
|
||||
var cached JikanRelationsResponse
|
||||
if c.getCache(cacheKey, &cached) {
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
@@ -14,7 +19,7 @@ func (c *Client) GetRelationsData(id int) (JikanRelationsResponse, error) {
|
||||
return JikanRelationsResponse{}, err
|
||||
}
|
||||
|
||||
c.relationsCache.Add(id, result)
|
||||
c.setCache(cacheKey, result, time.Hour*24)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package jikan
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Search returns the anime list with pagination support
|
||||
@@ -15,7 +16,8 @@ func (c *Client) Search(query string, page int) (SearchResult, error) {
|
||||
}
|
||||
|
||||
cacheKey := fmt.Sprintf("search:%s:%d", query, page)
|
||||
if cached, ok := c.cache.Get(cacheKey); ok {
|
||||
var cached SearchResult
|
||||
if c.getCache(cacheKey, &cached) {
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
@@ -30,7 +32,7 @@ func (c *Client) Search(query string, page int) (SearchResult, error) {
|
||||
HasNextPage: result.Pagination.HasNextPage,
|
||||
}
|
||||
|
||||
c.cache.Add(cacheKey, res)
|
||||
c.setCache(cacheKey, res, time.Hour*1)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
@@ -39,7 +41,9 @@ func (c *Client) GetTopAnime(page int) (TopAnimeResult, error) {
|
||||
if page < 1 {
|
||||
page = 1
|
||||
}
|
||||
if cached, ok := c.topCache.Get(page); ok {
|
||||
cacheKey := fmt.Sprintf("top:%d", page)
|
||||
var cached TopAnimeResult
|
||||
if c.getCache(cacheKey, &cached) {
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
@@ -54,6 +58,6 @@ func (c *Client) GetTopAnime(page int) (TopAnimeResult, error) {
|
||||
HasNextPage: result.Pagination.HasNextPage,
|
||||
}
|
||||
|
||||
c.topCache.Add(page, res)
|
||||
c.setCache(cacheKey, res, time.Hour*1)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package jikan
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ScheduleResult contains anime grouped by day
|
||||
@@ -17,7 +18,8 @@ func (c *Client) GetSchedule(day string) (ScheduleResult, error) {
|
||||
day = strings.ToLower(day)
|
||||
cacheKey := fmt.Sprintf("schedule_%s", day)
|
||||
|
||||
if cached, ok := c.scheduleCache.Get(cacheKey); ok {
|
||||
var cached ScheduleResult
|
||||
if c.getCache(cacheKey, &cached) {
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
@@ -32,7 +34,7 @@ func (c *Client) GetSchedule(day string) (ScheduleResult, error) {
|
||||
HasNextPage: result.Pagination.HasNextPage,
|
||||
}
|
||||
|
||||
c.scheduleCache.Add(cacheKey, res)
|
||||
c.setCache(cacheKey, res, time.Hour*1)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
@@ -57,7 +59,9 @@ func (c *Client) GetSeasonsNow(page int) (TopAnimeResult, error) {
|
||||
if page < 1 {
|
||||
page = 1
|
||||
}
|
||||
if cached, ok := c.airingCache.Get(page); ok {
|
||||
cacheKey := fmt.Sprintf("seasons_now:%d", page)
|
||||
var cached TopAnimeResult
|
||||
if c.getCache(cacheKey, &cached) {
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
@@ -72,7 +76,7 @@ func (c *Client) GetSeasonsNow(page int) (TopAnimeResult, error) {
|
||||
HasNextPage: result.Pagination.HasNextPage,
|
||||
}
|
||||
|
||||
c.airingCache.Add(page, res)
|
||||
c.setCache(cacheKey, res, time.Hour*1)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
@@ -81,7 +85,9 @@ func (c *Client) GetSeasonsUpcoming(page int) (TopAnimeResult, error) {
|
||||
if page < 1 {
|
||||
page = 1
|
||||
}
|
||||
if cached, ok := c.upcomingCache.Get(page); ok {
|
||||
cacheKey := fmt.Sprintf("seasons_upcoming:%d", page)
|
||||
var cached TopAnimeResult
|
||||
if c.getCache(cacheKey, &cached) {
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
@@ -96,6 +102,6 @@ func (c *Client) GetSeasonsUpcoming(page int) (TopAnimeResult, error) {
|
||||
HasNextPage: result.Pagination.HasNextPage,
|
||||
}
|
||||
|
||||
c.upcomingCache.Add(page, res)
|
||||
c.setCache(cacheKey, res, time.Hour*1)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
@@ -29,6 +29,9 @@ func (w *Worker) Start(ctx context.Context) {
|
||||
|
||||
// Run once immediately
|
||||
w.syncRelations(ctx)
|
||||
w.cleanupCache(ctx)
|
||||
|
||||
cleanupCounter := 0
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -36,10 +39,24 @@ func (w *Worker) Start(ctx context.Context) {
|
||||
return
|
||||
case <-ticker.C:
|
||||
w.syncRelations(ctx)
|
||||
|
||||
// Clean up cache every 60 runs (approx 1 hour)
|
||||
cleanupCounter++
|
||||
if cleanupCounter >= 60 {
|
||||
w.cleanupCache(ctx)
|
||||
cleanupCounter = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) cleanupCache(ctx context.Context) {
|
||||
err := w.db.DeleteExpiredJikanCache(ctx)
|
||||
if err != nil {
|
||||
log.Printf("worker: failed to clean up expired jikan cache: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) syncRelations(ctx context.Context) {
|
||||
// Find up to 20 anime that need their relations synced
|
||||
animes, err := w.db.GetAnimeNeedingRelationSync(ctx)
|
||||
|
||||
6
migrations/006_add_jikan_cache.sql
Normal file
6
migrations/006_add_jikan_cache.sql
Normal file
@@ -0,0 +1,6 @@
|
||||
CREATE TABLE IF NOT EXISTS jikan_cache (
|
||||
key TEXT PRIMARY KEY,
|
||||
data TEXT NOT NULL,
|
||||
expires_at DATETIME NOT NULL,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
Reference in New Issue
Block a user