feat: add jikan cache cleanup worker

This commit is contained in:
2026-06-21 17:17:57 +02:00
committed by Milas Holsting
parent 963f6e925b
commit 86d0c2b5c0
3 changed files with 153 additions and 0 deletions

View File

@@ -22,6 +22,7 @@ var Module = fx.Options(
ProvideSQLDB,
ProvideQueries,
),
fx.Invoke(RegisterJikanCacheCleanupWorker),
)
func ProvideSQLDB(cfg config.Config) (*sql.DB, error) {

View File

@@ -3,6 +3,8 @@ package database
import (
"context"
"database/sql"
"mal/internal/db"
"mal/internal/observability"
"testing"
_ "github.com/mattn/go-sqlite3"
@@ -43,3 +45,81 @@ func TestRunMigrationsCreatesHotPathIndexes(t *testing.T) {
})
}
}
func TestCleanupExpiredJikanCache(t *testing.T) {
sqlDB := newMigratedTestDB(t)
defer closeTestDB(t, sqlDB)
ctx := context.Background()
for _, row := range []struct {
key string
expiresAt string
}{
{key: "expired", expiresAt: "2000-01-01T00:00:00Z"},
{key: "fresh", expiresAt: "2999-01-01T00:00:00Z"},
} {
_, err := sqlDB.ExecContext(ctx, `INSERT INTO jikan_cache (key, data, expires_at) VALUES (?, ?, ?)`, row.key, "{}", row.expiresAt)
if err != nil {
t.Fatalf("insert %s cache row: %v", row.key, err)
}
}
cleanupExpiredJikanCache(ctx, db.New(sqlDB), observability.NewMetrics())
keys := jikanCacheKeys(ctx, t, sqlDB)
if len(keys) != 1 || keys[0] != "fresh" {
t.Fatalf("remaining cache keys = %v, want [fresh]", keys)
}
}
func newMigratedTestDB(t *testing.T) *sql.DB {
t.Helper()
sqlDB, err := sql.Open("sqlite3", ":memory:")
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
sqlDB.SetMaxOpenConns(1)
if err := RunMigrations(sqlDB); err != nil {
closeTestDB(t, sqlDB)
t.Fatalf("RunMigrations: %v", err)
}
return sqlDB
}
func closeTestDB(t *testing.T, sqlDB *sql.DB) {
t.Helper()
if err := sqlDB.Close(); err != nil {
t.Errorf("close sqlite: %v", err)
}
}
func jikanCacheKeys(ctx context.Context, t *testing.T, sqlDB *sql.DB) []string {
t.Helper()
var keys []string
rows, err := sqlDB.QueryContext(ctx, `SELECT key FROM jikan_cache ORDER BY key`)
if err != nil {
t.Fatalf("query cache keys: %v", err)
}
defer func() {
if err := rows.Close(); err != nil {
t.Errorf("close rows: %v", err)
}
}()
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
t.Fatalf("scan key: %v", err)
}
keys = append(keys, key)
}
if err := rows.Err(); err != nil {
t.Fatalf("iterate keys: %v", err)
}
return keys
}

View File

@@ -0,0 +1,72 @@
package database
import (
"context"
"mal/internal/db"
"mal/internal/observability"
"time"
"go.uber.org/fx"
)
const (
jikanCacheCleanupInterval = time.Hour
jikanCacheCleanupTimeout = 30 * time.Second
jikanCacheCleanupWorker = "jikan_cache_cleanup"
)
func RegisterJikanCacheCleanupWorker(lc fx.Lifecycle, queries *db.Queries, metrics *observability.Metrics) {
ctx, cancel := context.WithCancel(context.Background())
lc.Append(fx.Hook{
OnStart: func(startCtx context.Context) error {
go func() {
<-startCtx.Done()
cancel()
}()
go runJikanCacheCleanupWorker(ctx, queries, metrics)
return nil
},
OnStop: func(context.Context) error {
cancel()
return nil
},
})
}
func runJikanCacheCleanupWorker(ctx context.Context, queries *db.Queries, metrics *observability.Metrics) {
observability.Info("jikan_cache_cleanup_worker_start", "database", "", nil)
ticker := time.NewTicker(jikanCacheCleanupInterval)
defer ticker.Stop()
for {
cleanupExpiredJikanCache(ctx, queries, metrics)
select {
case <-ticker.C:
case <-ctx.Done():
observability.Info("jikan_cache_cleanup_worker_stop", "database", "", nil)
return
}
}
}
func cleanupExpiredJikanCache(ctx context.Context, queries *db.Queries, metrics *observability.Metrics) {
cleanupCtx, cancel := context.WithTimeout(ctx, jikanCacheCleanupTimeout)
defer cancel()
err := queries.DeleteExpiredJikanCache(cleanupCtx)
metrics.ObserveWorkerTick(jikanCacheCleanupWorker, err)
if err != nil {
observability.Warn(
"jikan_cache_cleanup_failed",
"database",
"",
map[string]any{
"worker": jikanCacheCleanupWorker,
},
err,
)
}
}