fix: stop stale request retries

This commit is contained in:
2026-06-18 20:25:30 +02:00
committed by Milas Holsting
parent aed61b8b61
commit 9e0f2231b5
5 changed files with 50 additions and 5 deletions

View File

@@ -3,6 +3,7 @@ package jikan
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"sync"
@@ -98,6 +99,10 @@ func jikanCacheLogLevel(source string, err error) observability.LogLevel {
}
func (c *Client) logJikanCache(cacheKey string, source string, startedAt time.Time, err error) {
if isContextError(err) {
return
}
duration := time.Since(startedAt)
if c.shouldSkipJikanCacheLog(source, duration, err) {
return
@@ -125,6 +130,10 @@ func truncateErrorMessage(message string) string {
return message[:400]
}
func isContextError(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}
// notifyRetryWorker signals the retry worker, non-blocking.
func (c *Client) notifyRetryWorker() {
select {

View File

@@ -2,7 +2,6 @@ package rate
import (
"context"
"fmt"
"sync"
"time"
)
@@ -19,6 +18,12 @@ func NewLimiter(interval time.Duration) *Limiter {
// Wait enforces minimum spacing between upstream Jikan requests.
func (l *Limiter) Wait(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
waitUntil := l.reserve(time.Now())
if waitUntil.IsZero() {
return nil
@@ -31,7 +36,8 @@ func (l *Limiter) Wait(ctx context.Context) error {
case <-timer.C:
return nil
case <-ctx.Done():
return fmt.Errorf("request canceled while waiting for rate limit: %w", ctx.Err())
l.release(waitUntil)
return ctx.Err()
}
}
@@ -48,3 +54,13 @@ func (l *Limiter) reserve(now time.Time) time.Time {
l.nextReqTime = l.nextReqTime.Add(l.interval)
return waitUntil
}
func (l *Limiter) release(waitUntil time.Time) {
l.mu.Lock()
defer l.mu.Unlock()
reservationEnd := waitUntil.Add(l.interval)
if l.nextReqTime.Equal(reservationEnd) {
l.nextReqTime = waitUntil
}
}

View File

@@ -67,6 +67,9 @@ func IsRetryableError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.Canceled) {
return false
}
var apiErr *APIError
if errors.As(err, &apiErr) {
@@ -92,6 +95,9 @@ func (c *Client) FetchWithRetry(ctx context.Context, urlStr string, out any) err
attempts := 0
endpoint := metricsEndpoint(urlStr)
logAndReturn := func(statusCode int, err error) error {
if isDoneContextError(ctx, err) {
return err
}
c.Metrics.ObserveJikanRequest(endpoint, statusCode, time.Since(startedAt), err)
c.logUpstream(urlStr, statusCode, attempts, startedAt, err)
return err
@@ -127,7 +133,7 @@ func (c *Client) FetchWithRetry(ctx context.Context, urlStr string, out any) err
func (c *Client) prepareRetryAttempt(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err())
return ctx.Err()
default:
}
@@ -150,8 +156,11 @@ func (c *Client) doRequest(ctx context.Context, urlStr string) (*http.Response,
}
func handleRequestRetry(ctx context.Context, err error, attempt int, maxRetries int) (bool, error) {
if ctx.Err() != nil {
return false, ctx.Err()
}
if errors.Is(err, context.Canceled) {
return false, fmt.Errorf("request canceled while retrying jikan request: %w", err)
return false, err
}
if attempt >= maxRetries-1 || !IsRetryableError(err) {
@@ -253,10 +262,14 @@ func waitForRetry(ctx context.Context, delay time.Duration) error {
case <-timer.C:
return nil
case <-ctx.Done():
return fmt.Errorf("request canceled while retrying jikan request: %w", ctx.Err())
return ctx.Err()
}
}
func isDoneContextError(ctx context.Context, err error) bool {
return err != nil && ctx.Err() != nil && errors.Is(err, ctx.Err())
}
func (c *Client) logUpstream(urlStr string, statusCode int, attempts int, startedAt time.Time, err error) {
duration := time.Since(startedAt)
traceEnabled := c.TraceEnabled != nil && c.TraceEnabled()

View File

@@ -2,6 +2,7 @@ package anime
import (
"context"
"errors"
"fmt"
"mal/integrations/jikan"
"mal/internal/domain"
@@ -181,6 +182,9 @@ func (h *AnimeHandler) handleAnimeDetailsSection(c *gin.Context, id int, section
data, tplName, err := h.loadAnimeDetailsSection(sectionCtx, id, section)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
observability.Warn(
"anime_section_fetch_failed",
"anime",

View File

@@ -48,6 +48,9 @@ func (h *PlaybackHandler) HandleProxyStream(c *gin.Context) {
copyProxyHeaders(c.Writer.Header(), resp.Header)
c.Status(resp.StatusCode)
if n, err := io.Copy(c.Writer, resp.Body); err != nil {
if errors.Is(err, context.Canceled) || c.Request.Context().Err() != nil {
return
}
observability.WarnContext(c.Request.Context(), "proxy_stream_copy_failed", "playback", "", map[string]any{"target_url": targetURL, "bytes_copied": n}, err)
}
}