feat: add jikan client observability, dedup, and stale-while-revalidate
This commit is contained in:
@@ -41,10 +41,13 @@ func (c *Client) GetAnimeByID(ctx context.Context, id int) (Anime, error) {
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
var result AnimeResponse
|
||||
reqURL := fmt.Sprintf("%s/anime/%d/full", c.baseURL, id)
|
||||
if c.getStaleCache(ctx, cacheKey, &cached) && cached.MalID != 0 {
|
||||
c.refreshAnimeByIDAsync(id)
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil {
|
||||
anime, err := c.refreshAnimeByID(ctx, id)
|
||||
if err != nil {
|
||||
var stale Anime
|
||||
if c.getStaleCache(ctx, cacheKey, &stale) {
|
||||
return stale, nil
|
||||
@@ -52,11 +55,57 @@ func (c *Client) GetAnimeByID(ctx context.Context, id int) (Anime, error) {
|
||||
return Anime{}, err
|
||||
}
|
||||
|
||||
ttl := time.Hour * 24
|
||||
if result.Data.Status == "Finished Airing" {
|
||||
ttl = time.Hour * 24 * 30
|
||||
return anime, nil
|
||||
}
|
||||
|
||||
func (c *Client) refreshAnimeByID(ctx context.Context, id int) (Anime, error) {
|
||||
cacheKey := fmt.Sprintf("anime:%d", id)
|
||||
|
||||
value, err, _ := c.sf.Do("refresh:"+cacheKey, func() (any, error) {
|
||||
var cached Anime
|
||||
if c.getCache(ctx, cacheKey, &cached) && cached.MalID != 0 {
|
||||
return cached, nil
|
||||
}
|
||||
|
||||
var result AnimeResponse
|
||||
reqURL := fmt.Sprintf("%s/anime/%d/full", c.baseURL, id)
|
||||
|
||||
if err := c.fetchWithRetry(ctx, reqURL, &result); err != nil {
|
||||
return Anime{}, err
|
||||
}
|
||||
|
||||
ttl := time.Hour * 24
|
||||
if result.Data.Status == "Finished Airing" {
|
||||
ttl = time.Hour * 24 * 30
|
||||
}
|
||||
|
||||
c.setCache(ctx, cacheKey, result.Data, ttl)
|
||||
return result.Data, nil
|
||||
})
|
||||
if err != nil {
|
||||
return Anime{}, err
|
||||
}
|
||||
|
||||
c.setCache(ctx, cacheKey, result.Data, ttl)
|
||||
return result.Data, nil
|
||||
if anime, ok := value.(Anime); ok && anime.MalID != 0 {
|
||||
return anime, nil
|
||||
}
|
||||
|
||||
return Anime{}, fmt.Errorf("jikan: empty response for %s", cacheKey)
|
||||
}
|
||||
|
||||
func (c *Client) refreshAnimeByIDAsync(id int) {
|
||||
select {
|
||||
case c.refreshSem <- struct{}{}:
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() { <-c.refreshSem }()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, _ = c.refreshAnimeByID(ctx, id)
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -5,14 +5,19 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"mal/internal/db"
|
||||
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
@@ -22,8 +27,12 @@ type Client struct {
|
||||
retrySignal chan struct{} // signals retry worker to process queued retries
|
||||
mu sync.Mutex
|
||||
lastReqTime time.Time // rate limiting: last request timestamp
|
||||
sf singleflight.Group
|
||||
refreshSem chan struct{}
|
||||
}
|
||||
|
||||
const jikanSlowLogThreshold = 750 * time.Millisecond
|
||||
|
||||
func NewClient(queries *db.Queries) *Client {
|
||||
return &Client{
|
||||
httpClient: &http.Client{
|
||||
@@ -38,6 +47,7 @@ func NewClient(queries *db.Queries) *Client {
|
||||
baseURL: "https://api.jikan.moe/v4",
|
||||
db: queries,
|
||||
retrySignal: make(chan struct{}, 1),
|
||||
refreshSem: make(chan struct{}, 4),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,6 +133,55 @@ func waitForRetry(ctx context.Context, delay time.Duration) error {
|
||||
}
|
||||
}
|
||||
|
||||
func jikanTraceEnabled() bool {
|
||||
value := strings.ToLower(strings.TrimSpace(os.Getenv("MAL_JIKAN_TRACE")))
|
||||
return value == "1" || value == "true" || value == "yes"
|
||||
}
|
||||
|
||||
func logJikanCache(cacheKey string, source string, startedAt time.Time, err error) {
|
||||
duration := time.Since(startedAt)
|
||||
if !jikanTraceEnabled() && err == nil && source == "fresh" && duration < 50*time.Millisecond {
|
||||
return
|
||||
}
|
||||
if !jikanTraceEnabled() && err == nil && source == "refresh" && duration < jikanSlowLogThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
errorValue := ""
|
||||
if err != nil {
|
||||
errorValue = err.Error()
|
||||
}
|
||||
|
||||
log.Printf(
|
||||
"jikan_cache key=%s source=%s duration_ms=%.2f error=%s",
|
||||
strconv.Quote(cacheKey),
|
||||
source,
|
||||
float64(duration.Microseconds())/1000,
|
||||
strconv.Quote(errorValue),
|
||||
)
|
||||
}
|
||||
|
||||
func logJikanUpstream(urlStr string, statusCode int, attempts int, startedAt time.Time, err error) {
|
||||
duration := time.Since(startedAt)
|
||||
if !jikanTraceEnabled() && err == nil && statusCode < http.StatusBadRequest && duration < jikanSlowLogThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
errorValue := ""
|
||||
if err != nil {
|
||||
errorValue = err.Error()
|
||||
}
|
||||
|
||||
log.Printf(
|
||||
"jikan_upstream url=%s status=%d attempts=%d duration_ms=%.2f error=%s",
|
||||
strconv.Quote(urlStr),
|
||||
statusCode,
|
||||
attempts,
|
||||
float64(duration.Microseconds())/1000,
|
||||
strconv.Quote(errorValue),
|
||||
)
|
||||
}
|
||||
|
||||
func truncateErrorMessage(message string) string {
|
||||
if len(message) <= 400 {
|
||||
return message
|
||||
@@ -258,73 +317,143 @@ func isEmptyResult(out any) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// getWithCache fetches URL with cache-aside pattern: checks cache first, falls back to stale on error.
|
||||
func (c *Client) getWithCache(ctx context.Context, cacheKey string, ttl time.Duration, url string, out any) error {
|
||||
if c.getCache(ctx, cacheKey, out) {
|
||||
if !isEmptyResult(out) {
|
||||
func cloneResponseTarget(out any) (any, bool) {
|
||||
if out == nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
outType := reflect.TypeOf(out)
|
||||
if outType.Kind() != reflect.Pointer || outType.Elem() == nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return reflect.New(outType.Elem()).Interface(), true
|
||||
}
|
||||
|
||||
func (c *Client) refreshWithCache(ctx context.Context, cacheKey string, ttl time.Duration, url string, out any) error {
|
||||
value, err, _ := c.sf.Do("refresh:"+cacheKey, func() (any, error) {
|
||||
if c.getCache(ctx, cacheKey, out) {
|
||||
if !isEmptyResult(out) {
|
||||
return json.Marshal(out)
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.fetchWithRetry(ctx, url, out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Don't cache empty results to avoid caching failures
|
||||
if isEmptyResult(out) {
|
||||
return nil, fmt.Errorf("jikan: empty response for %s", cacheKey)
|
||||
}
|
||||
|
||||
c.setCache(ctx, cacheKey, out, ttl)
|
||||
return json.Marshal(out)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if bytes, ok := value.([]byte); ok {
|
||||
if err := json.Unmarshal(bytes, out); err == nil && !isEmptyResult(out) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
var stale any
|
||||
hasStale := c.getStaleCache(ctx, cacheKey, &stale)
|
||||
return fmt.Errorf("jikan: empty response for %s", cacheKey)
|
||||
}
|
||||
|
||||
if err := c.fetchWithRetry(ctx, url, out); err != nil {
|
||||
if hasStale {
|
||||
staleBytes, marshalErr := json.Marshal(stale)
|
||||
if marshalErr == nil {
|
||||
unmarshalErr := json.Unmarshal(staleBytes, out)
|
||||
if unmarshalErr == nil && !isEmptyResult(out) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
}
|
||||
func (c *Client) refreshWithCacheAsync(cacheKey string, ttl time.Duration, url string, out any) {
|
||||
target, ok := cloneResponseTarget(out)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case c.refreshSem <- struct{}{}:
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() { <-c.refreshSem }()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_ = c.refreshWithCache(ctx, cacheKey, ttl, url, target)
|
||||
}()
|
||||
}
|
||||
|
||||
// getWithCache fetches URL with a stale-while-revalidate DB cache strategy.
|
||||
func (c *Client) getWithCache(ctx context.Context, cacheKey string, ttl time.Duration, url string, out any) error {
|
||||
startedAt := time.Now()
|
||||
if c.getCache(ctx, cacheKey, out) {
|
||||
if !isEmptyResult(out) {
|
||||
logJikanCache(cacheKey, "fresh", startedAt, nil)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if c.getStaleCache(ctx, cacheKey, out) && !isEmptyResult(out) {
|
||||
logJikanCache(cacheKey, "stale", startedAt, nil)
|
||||
c.refreshWithCacheAsync(cacheKey, ttl, url, out)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := c.refreshWithCache(ctx, cacheKey, ttl, url, out); err != nil {
|
||||
if c.getStaleCache(ctx, cacheKey, out) && !isEmptyResult(out) {
|
||||
logJikanCache(cacheKey, "stale_after_error", startedAt, err)
|
||||
return nil
|
||||
}
|
||||
logJikanCache(cacheKey, "miss", startedAt, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Don't cache empty results to avoid caching failures
|
||||
if isEmptyResult(out) {
|
||||
return fmt.Errorf("jikan: empty response for %s", cacheKey)
|
||||
}
|
||||
|
||||
c.setCache(ctx, cacheKey, out, ttl)
|
||||
logJikanCache(cacheKey, "refresh", startedAt, nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
// fetchWithRetry makes HTTP request with exponential backoff retry on transient failures.
|
||||
func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) error {
|
||||
maxRetries := 5
|
||||
startedAt := time.Now()
|
||||
attempts := 0
|
||||
logAndReturn := func(statusCode int, err error) error {
|
||||
logJikanUpstream(urlStr, statusCode, attempts, startedAt, err)
|
||||
return err
|
||||
}
|
||||
|
||||
for attempt := range maxRetries {
|
||||
attempts = attempt + 1
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err())
|
||||
return logAndReturn(0, fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()))
|
||||
default:
|
||||
}
|
||||
|
||||
if err := c.waitRateLimit(ctx); err != nil {
|
||||
return err
|
||||
return logAndReturn(0, err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create jikan request: %w", err)
|
||||
return logAndReturn(0, fmt.Errorf("failed to create jikan request: %w", err))
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return fmt.Errorf("request canceled while retrying jikan request: %w", err)
|
||||
return logAndReturn(0, fmt.Errorf("request canceled while retrying jikan request: %w", err))
|
||||
}
|
||||
if attempt < maxRetries-1 && IsRetryableError(err) {
|
||||
if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil {
|
||||
return retryErr
|
||||
return logAndReturn(0, retryErr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf("jikan api error: %w", err)
|
||||
return logAndReturn(0, fmt.Errorf("jikan api error: %w", err))
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
@@ -341,7 +470,7 @@ func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) err
|
||||
delay := max(retryAfter, retryDelay(attempt))
|
||||
|
||||
if retryErr := waitForRetry(ctx, delay); retryErr != nil {
|
||||
return retryErr
|
||||
return logAndReturn(resp.StatusCode, retryErr)
|
||||
}
|
||||
|
||||
continue
|
||||
@@ -350,24 +479,24 @@ func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) err
|
||||
// Best-effort decode (often useful for debugging), but still treat non-200 as error.
|
||||
_ = json.NewDecoder(resp.Body).Decode(out)
|
||||
_ = resp.Body.Close()
|
||||
return apiErr
|
||||
return logAndReturn(resp.StatusCode, apiErr)
|
||||
}
|
||||
|
||||
err = json.NewDecoder(resp.Body).Decode(out)
|
||||
_ = resp.Body.Close()
|
||||
if err == nil {
|
||||
return nil
|
||||
return logAndReturn(resp.StatusCode, nil)
|
||||
}
|
||||
|
||||
if attempt < maxRetries-1 {
|
||||
if retryErr := waitForRetry(ctx, retryDelay(attempt)); retryErr != nil {
|
||||
return retryErr
|
||||
return logAndReturn(resp.StatusCode, retryErr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to decode jikan response: %w", err)
|
||||
return logAndReturn(resp.StatusCode, fmt.Errorf("failed to decode jikan response: %w", err))
|
||||
}
|
||||
|
||||
return fmt.Errorf("max retries exceeded for %s", urlStr)
|
||||
return logAndReturn(0, fmt.Errorf("max retries exceeded for %s", urlStr))
|
||||
}
|
||||
|
||||
93
integrations/jikan/client_test.go
Normal file
93
integrations/jikan/client_test.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package jikan
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"mal/internal/db"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
type roundTripFunc func(*http.Request) (*http.Response, error)
|
||||
|
||||
func (fn roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
return fn(req)
|
||||
}
|
||||
|
||||
func TestGetWithCacheReturnsStaleAndRefreshesAsync(t *testing.T) {
|
||||
sqlDB, err := sql.Open("sqlite3", ":memory:")
|
||||
if err != nil {
|
||||
t.Fatalf("open sqlite: %v", err)
|
||||
}
|
||||
defer sqlDB.Close()
|
||||
sqlDB.SetMaxOpenConns(1)
|
||||
|
||||
_, err = sqlDB.Exec(`
|
||||
CREATE TABLE jikan_cache (
|
||||
key TEXT PRIMARY KEY,
|
||||
data TEXT NOT NULL,
|
||||
expires_at DATETIME NOT NULL,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
`)
|
||||
if err != nil {
|
||||
t.Fatalf("create cache table: %v", err)
|
||||
}
|
||||
|
||||
queries := db.New(sqlDB)
|
||||
client := NewClient(queries)
|
||||
stale := TopAnimeResponse{Data: []Anime{{MalID: 1, Title: "stale"}}}
|
||||
staleBytes, err := json.Marshal(stale)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal stale response: %v", err)
|
||||
}
|
||||
|
||||
_, err = sqlDB.Exec(
|
||||
`INSERT INTO jikan_cache (key, data, expires_at) VALUES (?, ?, ?)`,
|
||||
"top:1",
|
||||
string(staleBytes),
|
||||
time.Now().Add(-time.Hour),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("insert stale cache: %v", err)
|
||||
}
|
||||
|
||||
client.httpClient = &http.Client{
|
||||
Transport: roundTripFunc(func(*http.Request) (*http.Response, error) {
|
||||
body := `{"data":[{"mal_id":2,"title":"fresh"}]}`
|
||||
return &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(strings.NewReader(body)),
|
||||
Header: make(http.Header),
|
||||
}, nil
|
||||
}),
|
||||
}
|
||||
|
||||
var got TopAnimeResponse
|
||||
if err := client.getWithCache(context.Background(), "top:1", time.Hour, "https://example.test/top", &got); err != nil {
|
||||
t.Fatalf("getWithCache: %v", err)
|
||||
}
|
||||
if len(got.Data) != 1 || got.Data[0].Title != "stale" {
|
||||
t.Fatalf("got %+v, want stale cache response", got.Data)
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
var refreshed TopAnimeResponse
|
||||
if client.getCache(context.Background(), "top:1", &refreshed) && len(refreshed.Data) == 1 && refreshed.Data[0].Title == "fresh" {
|
||||
return
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
var rawData string
|
||||
var rawExpires string
|
||||
_ = sqlDB.QueryRow(`SELECT data, expires_at FROM jikan_cache WHERE key = ?`, "top:1").Scan(&rawData, &rawExpires)
|
||||
t.Fatalf("cache was not refreshed asynchronously; data=%s expires_at=%s", rawData, rawExpires)
|
||||
}
|
||||
Reference in New Issue
Block a user