From 9e0f2231b5dbfafb179e9827404cbe69d6cea83b Mon Sep 17 00:00:00 2001 From: mkelvers Date: Thu, 18 Jun 2026 20:25:30 +0200 Subject: [PATCH] fix: stop stale request retries --- integrations/jikan/client.go | 9 +++++++++ integrations/jikan/rate/limiter.go | 20 ++++++++++++++++++-- integrations/jikan/transport/client.go | 19 ++++++++++++++++--- internal/anime/details_handler.go | 4 ++++ internal/playback/handler/proxy_stream.go | 3 +++ 5 files changed, 50 insertions(+), 5 deletions(-) diff --git a/integrations/jikan/client.go b/integrations/jikan/client.go index 3cfddb1..148aa80 100644 --- a/integrations/jikan/client.go +++ b/integrations/jikan/client.go @@ -3,6 +3,7 @@ package jikan import ( "context" "encoding/json" + "errors" "fmt" "reflect" "sync" @@ -98,6 +99,10 @@ func jikanCacheLogLevel(source string, err error) observability.LogLevel { } func (c *Client) logJikanCache(cacheKey string, source string, startedAt time.Time, err error) { + if isContextError(err) { + return + } + duration := time.Since(startedAt) if c.shouldSkipJikanCacheLog(source, duration, err) { return @@ -125,6 +130,10 @@ func truncateErrorMessage(message string) string { return message[:400] } +func isContextError(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) +} + // notifyRetryWorker signals the retry worker, non-blocking. func (c *Client) notifyRetryWorker() { select { diff --git a/integrations/jikan/rate/limiter.go b/integrations/jikan/rate/limiter.go index 5a0a4e1..705abf2 100644 --- a/integrations/jikan/rate/limiter.go +++ b/integrations/jikan/rate/limiter.go @@ -2,7 +2,6 @@ package rate import ( "context" - "fmt" "sync" "time" ) @@ -19,6 +18,12 @@ func NewLimiter(interval time.Duration) *Limiter { // Wait enforces minimum spacing between upstream Jikan requests. func (l *Limiter) Wait(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + waitUntil := l.reserve(time.Now()) if waitUntil.IsZero() { return nil @@ -31,7 +36,8 @@ func (l *Limiter) Wait(ctx context.Context) error { case <-timer.C: return nil case <-ctx.Done(): - return fmt.Errorf("request canceled while waiting for rate limit: %w", ctx.Err()) + l.release(waitUntil) + return ctx.Err() } } @@ -48,3 +54,13 @@ func (l *Limiter) reserve(now time.Time) time.Time { l.nextReqTime = l.nextReqTime.Add(l.interval) return waitUntil } + +func (l *Limiter) release(waitUntil time.Time) { + l.mu.Lock() + defer l.mu.Unlock() + + reservationEnd := waitUntil.Add(l.interval) + if l.nextReqTime.Equal(reservationEnd) { + l.nextReqTime = waitUntil + } +} diff --git a/integrations/jikan/transport/client.go b/integrations/jikan/transport/client.go index 0c09aa2..972d35e 100644 --- a/integrations/jikan/transport/client.go +++ b/integrations/jikan/transport/client.go @@ -67,6 +67,9 @@ func IsRetryableError(err error) bool { if err == nil { return false } + if errors.Is(err, context.Canceled) { + return false + } var apiErr *APIError if errors.As(err, &apiErr) { @@ -92,6 +95,9 @@ func (c *Client) FetchWithRetry(ctx context.Context, urlStr string, out any) err attempts := 0 endpoint := metricsEndpoint(urlStr) logAndReturn := func(statusCode int, err error) error { + if isDoneContextError(ctx, err) { + return err + } c.Metrics.ObserveJikanRequest(endpoint, statusCode, time.Since(startedAt), err) c.logUpstream(urlStr, statusCode, attempts, startedAt, err) return err @@ -127,7 +133,7 @@ func (c *Client) FetchWithRetry(ctx context.Context, urlStr string, out any) err func (c *Client) prepareRetryAttempt(ctx context.Context) error { select { case <-ctx.Done(): - return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()) + return ctx.Err() default: } @@ -150,8 +156,11 @@ func (c *Client) doRequest(ctx context.Context, urlStr string) (*http.Response, } func handleRequestRetry(ctx context.Context, err error, attempt int, maxRetries int) (bool, error) { + if ctx.Err() != nil { + return false, ctx.Err() + } if errors.Is(err, context.Canceled) { - return false, fmt.Errorf("request canceled while retrying jikan request: %w", err) + return false, err } if attempt >= maxRetries-1 || !IsRetryableError(err) { @@ -253,10 +262,14 @@ func waitForRetry(ctx context.Context, delay time.Duration) error { case <-timer.C: return nil case <-ctx.Done(): - return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err()) + return ctx.Err() } } +func isDoneContextError(ctx context.Context, err error) bool { + return err != nil && ctx.Err() != nil && errors.Is(err, ctx.Err()) +} + func (c *Client) logUpstream(urlStr string, statusCode int, attempts int, startedAt time.Time, err error) { duration := time.Since(startedAt) traceEnabled := c.TraceEnabled != nil && c.TraceEnabled() diff --git a/internal/anime/details_handler.go b/internal/anime/details_handler.go index e981467..5dca2dc 100644 --- a/internal/anime/details_handler.go +++ b/internal/anime/details_handler.go @@ -2,6 +2,7 @@ package anime import ( "context" + "errors" "fmt" "mal/integrations/jikan" "mal/internal/domain" @@ -181,6 +182,9 @@ func (h *AnimeHandler) handleAnimeDetailsSection(c *gin.Context, id int, section data, tplName, err := h.loadAnimeDetailsSection(sectionCtx, id, section) if err != nil { + if errors.Is(err, context.Canceled) { + return + } observability.Warn( "anime_section_fetch_failed", "anime", diff --git a/internal/playback/handler/proxy_stream.go b/internal/playback/handler/proxy_stream.go index a638fb7..b847649 100644 --- a/internal/playback/handler/proxy_stream.go +++ b/internal/playback/handler/proxy_stream.go @@ -48,6 +48,9 @@ func (h *PlaybackHandler) HandleProxyStream(c *gin.Context) { copyProxyHeaders(c.Writer.Header(), resp.Header) c.Status(resp.StatusCode) if n, err := io.Copy(c.Writer, resp.Body); err != nil { + if errors.Is(err, context.Canceled) || c.Request.Context().Err() != nil { + return + } observability.WarnContext(c.Request.Context(), "proxy_stream_copy_failed", "playback", "", map[string]any{"target_url": targetURL, "bytes_copied": n}, err) } }