diff --git a/internal/database/database.go b/internal/database/database.go index 347a947..0b4e484 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -22,6 +22,7 @@ var Module = fx.Options( ProvideSQLDB, ProvideQueries, ), + fx.Invoke(RegisterJikanCacheCleanupWorker), ) func ProvideSQLDB(cfg config.Config) (*sql.DB, error) { diff --git a/internal/database/database_test.go b/internal/database/database_test.go index 74af708..c32e6a4 100644 --- a/internal/database/database_test.go +++ b/internal/database/database_test.go @@ -3,6 +3,8 @@ package database import ( "context" "database/sql" + "mal/internal/db" + "mal/internal/observability" "testing" _ "github.com/mattn/go-sqlite3" @@ -43,3 +45,81 @@ func TestRunMigrationsCreatesHotPathIndexes(t *testing.T) { }) } } + +func TestCleanupExpiredJikanCache(t *testing.T) { + sqlDB := newMigratedTestDB(t) + defer closeTestDB(t, sqlDB) + + ctx := context.Background() + for _, row := range []struct { + key string + expiresAt string + }{ + {key: "expired", expiresAt: "2000-01-01T00:00:00Z"}, + {key: "fresh", expiresAt: "2999-01-01T00:00:00Z"}, + } { + _, err := sqlDB.ExecContext(ctx, `INSERT INTO jikan_cache (key, data, expires_at) VALUES (?, ?, ?)`, row.key, "{}", row.expiresAt) + if err != nil { + t.Fatalf("insert %s cache row: %v", row.key, err) + } + } + + cleanupExpiredJikanCache(ctx, db.New(sqlDB), observability.NewMetrics()) + + keys := jikanCacheKeys(ctx, t, sqlDB) + if len(keys) != 1 || keys[0] != "fresh" { + t.Fatalf("remaining cache keys = %v, want [fresh]", keys) + } +} + +func newMigratedTestDB(t *testing.T) *sql.DB { + t.Helper() + + sqlDB, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + sqlDB.SetMaxOpenConns(1) + + if err := RunMigrations(sqlDB); err != nil { + closeTestDB(t, sqlDB) + t.Fatalf("RunMigrations: %v", err) + } + + return sqlDB +} + +func closeTestDB(t *testing.T, sqlDB *sql.DB) { + t.Helper() + + if err := sqlDB.Close(); err != nil { + t.Errorf("close sqlite: %v", err) + } +} + +func jikanCacheKeys(ctx context.Context, t *testing.T, sqlDB *sql.DB) []string { + t.Helper() + + var keys []string + rows, err := sqlDB.QueryContext(ctx, `SELECT key FROM jikan_cache ORDER BY key`) + if err != nil { + t.Fatalf("query cache keys: %v", err) + } + defer func() { + if err := rows.Close(); err != nil { + t.Errorf("close rows: %v", err) + } + }() + for rows.Next() { + var key string + if err := rows.Scan(&key); err != nil { + t.Fatalf("scan key: %v", err) + } + keys = append(keys, key) + } + if err := rows.Err(); err != nil { + t.Fatalf("iterate keys: %v", err) + } + + return keys +} diff --git a/internal/database/jikan_cache_cleanup.go b/internal/database/jikan_cache_cleanup.go new file mode 100644 index 0000000..2d1b0f7 --- /dev/null +++ b/internal/database/jikan_cache_cleanup.go @@ -0,0 +1,72 @@ +package database + +import ( + "context" + "mal/internal/db" + "mal/internal/observability" + "time" + + "go.uber.org/fx" +) + +const ( + jikanCacheCleanupInterval = time.Hour + jikanCacheCleanupTimeout = 30 * time.Second + jikanCacheCleanupWorker = "jikan_cache_cleanup" +) + +func RegisterJikanCacheCleanupWorker(lc fx.Lifecycle, queries *db.Queries, metrics *observability.Metrics) { + ctx, cancel := context.WithCancel(context.Background()) + + lc.Append(fx.Hook{ + OnStart: func(startCtx context.Context) error { + go func() { + <-startCtx.Done() + cancel() + }() + go runJikanCacheCleanupWorker(ctx, queries, metrics) + return nil + }, + OnStop: func(context.Context) error { + cancel() + return nil + }, + }) +} + +func runJikanCacheCleanupWorker(ctx context.Context, queries *db.Queries, metrics *observability.Metrics) { + observability.Info("jikan_cache_cleanup_worker_start", "database", "", nil) + + ticker := time.NewTicker(jikanCacheCleanupInterval) + defer ticker.Stop() + + for { + cleanupExpiredJikanCache(ctx, queries, metrics) + + select { + case <-ticker.C: + case <-ctx.Done(): + observability.Info("jikan_cache_cleanup_worker_stop", "database", "", nil) + return + } + } +} + +func cleanupExpiredJikanCache(ctx context.Context, queries *db.Queries, metrics *observability.Metrics) { + cleanupCtx, cancel := context.WithTimeout(ctx, jikanCacheCleanupTimeout) + defer cancel() + + err := queries.DeleteExpiredJikanCache(cleanupCtx) + metrics.ObserveWorkerTick(jikanCacheCleanupWorker, err) + if err != nil { + observability.Warn( + "jikan_cache_cleanup_failed", + "database", + "", + map[string]any{ + "worker": jikanCacheCleanupWorker, + }, + err, + ) + } +}