diff --git a/README.md b/README.md index 49bf75e..373f26e 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ docker exec mal ./cmd/user | `ENV` | _(empty)_ | Set to `production` to enable secure session cookies | | `MIGRATIONS_DIR` | _(auto-discovered)_ | Optional explicit path to migration files | | `PLAYBACK_PROXY_SECRET` | _(required)_ | HMAC secret for signed playback proxy tokens (min 32 chars) | +| `MAL_JIKAN_TRACE` | `false` | Log all Jikan cache/upstream timings when enabled | ## Testing diff --git a/integrations/jikan/anime.go b/integrations/jikan/anime.go index 54ee529..37b87ca 100644 --- a/integrations/jikan/anime.go +++ b/integrations/jikan/anime.go @@ -41,10 +41,13 @@ func (c *Client) GetAnimeByID(ctx context.Context, id int) (Anime, error) { return cached, nil } - var result AnimeResponse - reqURL := fmt.Sprintf("%s/anime/%d/full", c.baseURL, id) + if c.getStaleCache(ctx, cacheKey, &cached) && cached.MalID != 0 { + c.refreshAnimeByIDAsync(id) + return cached, nil + } - if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil { + anime, err := c.refreshAnimeByID(ctx, id) + if err != nil { var stale Anime if c.getStaleCache(ctx, cacheKey, &stale) { return stale, nil @@ -52,11 +55,57 @@ func (c *Client) GetAnimeByID(ctx context.Context, id int) (Anime, error) { return Anime{}, err } - ttl := time.Hour * 24 - if result.Data.Status == "Finished Airing" { - ttl = time.Hour * 24 * 30 + return anime, nil +} + +func (c *Client) refreshAnimeByID(ctx context.Context, id int) (Anime, error) { + cacheKey := fmt.Sprintf("anime:%d", id) + + value, err, _ := c.sf.Do("refresh:"+cacheKey, func() (any, error) { + var cached Anime + if c.getCache(ctx, cacheKey, &cached) && cached.MalID != 0 { + return cached, nil + } + + var result AnimeResponse + reqURL := fmt.Sprintf("%s/anime/%d/full", c.baseURL, id) + + if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil { + return Anime{}, err + } + + ttl := time.Hour * 24 + if result.Data.Status == "Finished Airing" { + ttl = time.Hour * 24 * 30 + } + + c.setCache(ctx, cacheKey, result.Data, ttl) + return result.Data, nil + }) + if err != nil { + return Anime{}, err } - c.setCache(ctx, cacheKey, result.Data, ttl) - return result.Data, nil + if anime, ok := value.(Anime); ok && anime.MalID != 0 { + return anime, nil + } + + return Anime{}, fmt.Errorf("jikan: empty response for %s", cacheKey) +} + +func (c *Client) refreshAnimeByIDAsync(id int) { + select { + case c.refreshSem <- struct{}{}: + default: + return + } + + go func() { + defer func() { <-c.refreshSem }() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + _, _ = c.refreshAnimeByID(ctx, id) + }() } diff --git a/integrations/jikan/client.go b/integrations/jikan/client.go index 31d6ae4..3ea010d 100644 --- a/integrations/jikan/client.go +++ b/integrations/jikan/client.go @@ -5,14 +5,19 @@ import ( "encoding/json" "errors" "fmt" + "log" "net" "net/http" + "os" + "reflect" "strconv" "strings" "sync" "time" "mal/internal/db" + + "golang.org/x/sync/singleflight" ) type Client struct { @@ -22,8 +27,12 @@ type Client struct { retrySignal chan struct{} // signals retry worker to process queued retries mu sync.Mutex lastReqTime time.Time // rate limiting: last request timestamp + sf singleflight.Group + refreshSem chan struct{} } +const jikanSlowLogThreshold = 750 * time.Millisecond + func NewClient(queries *db.Queries) *Client { return &Client{ httpClient: &http.Client{ @@ -38,6 +47,7 @@ func NewClient(queries *db.Queries) *Client { baseURL: "https://api.jikan.moe/v4", db: queries, retrySignal: make(chan struct{}, 1), + refreshSem: make(chan struct{}, 4), } } @@ -123,6 +133,55 @@ func waitForRetry(ctx context.Context, delay time.Duration) error { } } +func jikanTraceEnabled() bool { + value := strings.ToLower(strings.TrimSpace(os.Getenv("MAL_JIKAN_TRACE"))) + return value == "1" || value == "true" || value == "yes" +} + +func logJikanCache(cacheKey string, source string, startedAt time.Time, err error) { + duration := time.Since(startedAt) + if !jikanTraceEnabled() && err == nil && source == "fresh" && duration < 50*time.Millisecond { + return + } + if !jikanTraceEnabled() && err == nil && source == "refresh" && duration < jikanSlowLogThreshold { + return + } + + errorValue := "" + if err != nil { + errorValue = err.Error() + } + + log.Printf( + "jikan_cache key=%s source=%s duration_ms=%.2f error=%s", + strconv.Quote(cacheKey), + source, + float64(duration.Microseconds())/1000, + strconv.Quote(errorValue), + ) +} + +func logJikanUpstream(urlStr string, statusCode int, attempts int, startedAt time.Time, err error) { + duration := time.Since(startedAt) + if !jikanTraceEnabled() && err == nil && statusCode < http.StatusBadRequest && duration < jikanSlowLogThreshold { + return + } + + errorValue := "" + if err != nil { + errorValue = err.Error() + } + + log.Printf( + "jikan_upstream url=%s status=%d attempts=%d duration_ms=%.2f error=%s", + strconv.Quote(urlStr), + statusCode, + attempts, + float64(duration.Microseconds())/1000, + strconv.Quote(errorValue), + ) +} + func truncateErrorMessage(message string) string { if len(message) <= 400 { return message @@ -258,73 +317,143 @@ func isEmptyResult(out any) bool { return false } -// getWithCache fetches URL with cache-aside pattern: checks cache first, falls back to stale on error. -func (c *Client) getWithCache(ctx context.Context, cacheKey string, ttl time.Duration, url string, out any) error { - if c.getCache(ctx, cacheKey, out) { - if !isEmptyResult(out) { +func cloneResponseTarget(out any) (any, bool) { + if out == nil { + return nil, false + } + + outType := reflect.TypeOf(out) + if outType.Kind() != reflect.Pointer || outType.Elem() == nil { + return nil, false + } + + return reflect.New(outType.Elem()).Interface(), true +} + +func (c *Client) refreshWithCache(ctx context.Context, cacheKey string, ttl time.Duration, url string, out any) error { + value, err, _ := c.sf.Do("refresh:"+cacheKey, func() (any, error) { + if c.getCache(ctx, cacheKey, out) { + if !isEmptyResult(out) { + return json.Marshal(out) + } + } + + if err := c.fetchWithRetry(ctx, url, out); err != nil { + return nil, err + } + + // Don't cache empty results to avoid caching failures + if isEmptyResult(out) { + return nil, fmt.Errorf("jikan: empty response for %s", cacheKey) + } + + c.setCache(ctx, cacheKey, out, ttl) + return json.Marshal(out) + }) + if err != nil { + return err + } + + if bytes, ok := value.([]byte); ok { + if err := json.Unmarshal(bytes, out); err == nil && !isEmptyResult(out) { return nil } } - var stale any - hasStale := c.getStaleCache(ctx, cacheKey, &stale) + return fmt.Errorf("jikan: empty response for %s", cacheKey) +} - if err := c.fetchWithRetry(ctx, url, out); err != nil { - if hasStale { - staleBytes, marshalErr := json.Marshal(stale) - if marshalErr == nil { - unmarshalErr := json.Unmarshal(staleBytes, out) - if unmarshalErr == nil && !isEmptyResult(out) { - return nil - } - } - if !errors.Is(err, context.Canceled) { - } +func (c *Client) refreshWithCacheAsync(cacheKey string, ttl time.Duration, url string, out any) { + target, ok := cloneResponseTarget(out) + if !ok { + return + } + + select { + case c.refreshSem <- struct{}{}: + default: + return + } + + go func() { + defer func() { <-c.refreshSem }() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + _ = c.refreshWithCache(ctx, cacheKey, ttl, url, target) + }() +} + +// getWithCache fetches URL with a stale-while-revalidate DB cache strategy. +func (c *Client) getWithCache(ctx context.Context, cacheKey string, ttl time.Duration, url string, out any) error { + startedAt := time.Now() + if c.getCache(ctx, cacheKey, out) { + if !isEmptyResult(out) { + logJikanCache(cacheKey, "fresh", startedAt, nil) + return nil } + } + + if c.getStaleCache(ctx, cacheKey, out) && !isEmptyResult(out) { + logJikanCache(cacheKey, "stale", startedAt, nil) + c.refreshWithCacheAsync(cacheKey, ttl, url, out) + return nil + } + + if err := c.refreshWithCache(ctx, cacheKey, ttl, url, out); err != nil { + if c.getStaleCache(ctx, cacheKey, out) && !isEmptyResult(out) { + logJikanCache(cacheKey, "stale_after_error", startedAt, err) + return nil + } + logJikanCache(cacheKey, "miss", startedAt, err) return err } - // Don't cache empty results to avoid caching failures - if isEmptyResult(out) { - return fmt.Errorf("jikan: empty response for %s", cacheKey) - } - - c.setCache(ctx, cacheKey, out, ttl) + logJikanCache(cacheKey, "refresh", startedAt, nil) return nil } // fetchWithRetry makes HTTP request with exponential backoff retry on transient failures. func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) error { maxRetries := 5 + startedAt := time.Now() + attempts := 0 + logAndReturn := func(statusCode int, err error) error { + logJikanUpstream(urlStr, statusCode, attempts, startedAt, err) + return err + } + for attempt := range maxRetries { + attempts = attempt + 1 select { case <-ctx.Done(): - return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()) + return logAndReturn(0, fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err())) default: } if err := c.waitRateLimit(ctx); err != nil { - return err + return logAndReturn(0, err) } req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil) if err != nil { - return fmt.Errorf("failed to create jikan request: %w", err) + return logAndReturn(0, fmt.Errorf("failed to create jikan request: %w", err)) } resp, err := c.httpClient.Do(req) if err != nil { if errors.Is(err, context.Canceled) { - return fmt.Errorf("request canceled while retrying jikan request: %w", err) + return logAndReturn(0, fmt.Errorf("request canceled while retrying jikan request: %w", err)) } if attempt < maxRetries-1 && IsRetryableError(err) { if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil { - return retryErr + return logAndReturn(0, retryErr) } continue } - return fmt.Errorf("jikan api error: %w", err) + return logAndReturn(0, fmt.Errorf("jikan api error: %w", err)) } if resp.StatusCode != http.StatusOK { @@ -341,7 +470,7 @@ func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) err delay := max(retryAfter, retryDelay(attempt)) if retryErr := waitForRetry(ctx, delay); retryErr != nil { - return retryErr + return logAndReturn(resp.StatusCode, retryErr) } continue @@ -350,24 +479,24 @@ func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) err // Best-effort decode (often useful for debugging), but still treat non-200 as error. _ = json.NewDecoder(resp.Body).Decode(out) _ = resp.Body.Close() - return apiErr + return logAndReturn(resp.StatusCode, apiErr) } err = json.NewDecoder(resp.Body).Decode(out) _ = resp.Body.Close() if err == nil { - return nil + return logAndReturn(resp.StatusCode, nil) } if attempt < maxRetries-1 { if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil { - return retryErr + return logAndReturn(resp.StatusCode, retryErr) } continue } - return fmt.Errorf("failed to decode jikan response: %w", err) + return logAndReturn(resp.StatusCode, fmt.Errorf("failed to decode jikan response: %w", err)) } - return fmt.Errorf("max retries exceeded for %s", urlStr) + return logAndReturn(0, fmt.Errorf("max retries exceeded for %s", urlStr)) } diff --git a/integrations/jikan/client_test.go b/integrations/jikan/client_test.go new file mode 100644 index 0000000..c34a993 --- /dev/null +++ b/integrations/jikan/client_test.go @@ -0,0 +1,93 @@ +package jikan + +import ( + "context" + "database/sql" + "encoding/json" + "io" + "mal/internal/db" + "net/http" + "strings" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (fn roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return fn(req) +} + +func TestGetWithCacheReturnsStaleAndRefreshesAsync(t *testing.T) { + sqlDB, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + defer sqlDB.Close() + sqlDB.SetMaxOpenConns(1) + + _, err = sqlDB.Exec(` + CREATE TABLE jikan_cache ( + key TEXT PRIMARY KEY, + data TEXT NOT NULL, + expires_at DATETIME NOT NULL, + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + `) + if err != nil { + t.Fatalf("create cache table: %v", err) + } + + queries := db.New(sqlDB) + client := NewClient(queries) + stale := TopAnimeResponse{Data: []Anime{{MalID: 1, Title: "stale"}}} + staleBytes, err := json.Marshal(stale) + if err != nil { + t.Fatalf("marshal stale response: %v", err) + } + + _, err = sqlDB.Exec( + `INSERT INTO jikan_cache (key, data, expires_at) VALUES (?, ?, ?)`, + "top:1", + string(staleBytes), + time.Now().Add(-time.Hour), + ) + if err != nil { + t.Fatalf("insert stale cache: %v", err) + } + + client.httpClient = &http.Client{ + Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { + body := `{"data":[{"mal_id":2,"title":"fresh"}]}` + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(body)), + Header: make(http.Header), + }, nil + }), + } + + var got TopAnimeResponse + if err := client.getWithCache(context.Background(), "top:1", time.Hour, "https://example.test/top", &got); err != nil { + t.Fatalf("getWithCache: %v", err) + } + if len(got.Data) != 1 || got.Data[0].Title != "stale" { + t.Fatalf("got %+v, want stale cache response", got.Data) + } + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + var refreshed TopAnimeResponse + if client.getCache(context.Background(), "top:1", &refreshed) && len(refreshed.Data) == 1 && refreshed.Data[0].Title == "fresh" { + return + } + time.Sleep(10 * time.Millisecond) + } + + var rawData string + var rawExpires string + _ = sqlDB.QueryRow(`SELECT data, expires_at FROM jikan_cache WHERE key = ?`, "top:1").Scan(&rawData, &rawExpires) + t.Fatalf("cache was not refreshed asynchronously; data=%s expires_at=%s", rawData, rawExpires) +}