feat: add observability metrics
This commit is contained in:
@@ -16,6 +16,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"mal/internal/db"
|
"mal/internal/db"
|
||||||
|
"mal/internal/observability"
|
||||||
|
|
||||||
"golang.org/x/sync/singleflight"
|
"golang.org/x/sync/singleflight"
|
||||||
)
|
)
|
||||||
@@ -29,6 +30,7 @@ type Client struct {
|
|||||||
lastReqTime time.Time // rate limiting: last request timestamp
|
lastReqTime time.Time // rate limiting: last request timestamp
|
||||||
sf singleflight.Group
|
sf singleflight.Group
|
||||||
refreshSem chan struct{}
|
refreshSem chan struct{}
|
||||||
|
metrics *observability.Metrics
|
||||||
|
|
||||||
// Random anime pool for DDoS-proof truly random "Surprise Me"
|
// Random anime pool for DDoS-proof truly random "Surprise Me"
|
||||||
randomPool []Anime
|
randomPool []Anime
|
||||||
@@ -38,7 +40,7 @@ type Client struct {
|
|||||||
|
|
||||||
const jikanSlowLogThreshold = 750 * time.Millisecond
|
const jikanSlowLogThreshold = 750 * time.Millisecond
|
||||||
|
|
||||||
func NewClient(queries *db.Queries) *Client {
|
func NewClient(queries *db.Queries, metrics *observability.Metrics) *Client {
|
||||||
return &Client{
|
return &Client{
|
||||||
httpClient: &http.Client{
|
httpClient: &http.Client{
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
@@ -51,6 +53,7 @@ func NewClient(queries *db.Queries) *Client {
|
|||||||
},
|
},
|
||||||
baseURL: "https://api.jikan.moe/v4",
|
baseURL: "https://api.jikan.moe/v4",
|
||||||
db: queries,
|
db: queries,
|
||||||
|
metrics: metrics,
|
||||||
retrySignal: make(chan struct{}, 1),
|
retrySignal: make(chan struct{}, 1),
|
||||||
refreshSem: make(chan struct{}, 4),
|
refreshSem: make(chan struct{}, 4),
|
||||||
randomPool: make([]Anime, 0),
|
randomPool: make([]Anime, 0),
|
||||||
@@ -262,11 +265,18 @@ func (c *Client) getCache(parentCtx context.Context, key string, out any) bool {
|
|||||||
|
|
||||||
data, err := c.db.GetJikanCache(ctx, key)
|
data, err := c.db.GetJikanCache(ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
c.metrics.ObserveCache("jikan", "miss")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
err = json.Unmarshal([]byte(data), out)
|
err = json.Unmarshal([]byte(data), out)
|
||||||
return err == nil
|
if err != nil {
|
||||||
|
c.metrics.ObserveCache("jikan", "miss")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
c.metrics.ObserveCache("jikan", "hit")
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// getStaleCache retrieves expired-but-available cache by key.
|
// getStaleCache retrieves expired-but-available cache by key.
|
||||||
@@ -276,11 +286,18 @@ func (c *Client) getStaleCache(parentCtx context.Context, key string, out any) b
|
|||||||
|
|
||||||
data, err := c.db.GetJikanCacheStale(ctx, key)
|
data, err := c.db.GetJikanCacheStale(ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
c.metrics.ObserveCache("jikan_stale", "miss")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
err = json.Unmarshal([]byte(data), out)
|
err = json.Unmarshal([]byte(data), out)
|
||||||
return err == nil
|
if err != nil {
|
||||||
|
c.metrics.ObserveCache("jikan_stale", "miss")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
c.metrics.ObserveCache("jikan_stale", "hit")
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// setCache stores data in cache with specified TTL.
|
// setCache stores data in cache with specified TTL.
|
||||||
@@ -425,7 +442,9 @@ func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) err
|
|||||||
maxRetries := 5
|
maxRetries := 5
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
attempts := 0
|
attempts := 0
|
||||||
|
endpoint := metricsEndpoint(urlStr)
|
||||||
logAndReturn := func(statusCode int, err error) error {
|
logAndReturn := func(statusCode int, err error) error {
|
||||||
|
c.metrics.ObserveJikanRequest(endpoint, statusCode, time.Since(startedAt), err)
|
||||||
logJikanUpstream(urlStr, statusCode, attempts, startedAt, err)
|
logJikanUpstream(urlStr, statusCode, attempts, startedAt, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -506,3 +525,38 @@ func (c *Client) fetchWithRetry(ctx context.Context, urlStr string, out any) err
|
|||||||
|
|
||||||
return logAndReturn(0, fmt.Errorf("max retries exceeded for %s", urlStr))
|
return logAndReturn(0, fmt.Errorf("max retries exceeded for %s", urlStr))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func metricsEndpoint(urlStr string) string {
|
||||||
|
trimmed := strings.TrimSpace(urlStr)
|
||||||
|
if trimmed == "" {
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
|
||||||
|
prefix := "https://api.jikan.moe/v4"
|
||||||
|
if strings.HasPrefix(trimmed, prefix) {
|
||||||
|
trimmed = strings.TrimPrefix(trimmed, prefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
if idx := strings.Index(trimmed, "?"); idx >= 0 {
|
||||||
|
trimmed = trimmed[:idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
parts := strings.Split(trimmed, "/")
|
||||||
|
out := make([]string, 0, len(parts))
|
||||||
|
for _, part := range parts {
|
||||||
|
if part == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := strconv.Atoi(part); err == nil {
|
||||||
|
out = append(out, "{id}")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
out = append(out, part)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(out) == 0 {
|
||||||
|
return "/"
|
||||||
|
}
|
||||||
|
|
||||||
|
return "/" + strings.Join(out, "/")
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
"mal/internal/db"
|
"mal/internal/db"
|
||||||
|
"mal/internal/observability"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -41,7 +42,7 @@ func TestGetWithCacheReturnsStaleAndRefreshesAsync(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
queries := db.New(sqlDB)
|
queries := db.New(sqlDB)
|
||||||
client := NewClient(queries)
|
client := NewClient(queries, observability.NewMetrics())
|
||||||
stale := TopAnimeResponse{Data: []Anime{{MalID: 1, Title: "stale"}}}
|
stale := TopAnimeResponse{Data: []Anime{{MalID: 1, Title: "stale"}}}
|
||||||
staleBytes, err := json.Marshal(stale)
|
staleBytes, err := json.Marshal(stale)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
package jikan
|
package jikan
|
||||||
|
|
||||||
import (
|
import "go.uber.org/fx"
|
||||||
"go.uber.org/fx"
|
|
||||||
)
|
|
||||||
|
|
||||||
var Module = fx.Options(
|
var Module = fx.Options(
|
||||||
fx.Provide(NewClient),
|
fx.Provide(NewClient),
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ func AuthMiddleware(svc domain.AuthService) gin.HandlerFunc {
|
|||||||
if path == "/login" || path == "/logout" ||
|
if path == "/login" || path == "/logout" ||
|
||||||
strings.HasPrefix(path, "/static") ||
|
strings.HasPrefix(path, "/static") ||
|
||||||
strings.HasPrefix(path, "/dist") ||
|
strings.HasPrefix(path, "/dist") ||
|
||||||
|
path == "/metrics" ||
|
||||||
path == "/api/auth/login" {
|
path == "/api/auth/login" {
|
||||||
c.Next()
|
c.Next()
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"mal/internal/db"
|
"mal/internal/db"
|
||||||
"mal/internal/domain"
|
"mal/internal/domain"
|
||||||
episodeService "mal/internal/episodes/service"
|
episodeService "mal/internal/episodes/service"
|
||||||
|
"mal/internal/observability"
|
||||||
|
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
)
|
)
|
||||||
@@ -22,10 +23,10 @@ var Module = fx.Options(
|
|||||||
fx.Provide(
|
fx.Provide(
|
||||||
episodeAvailabilityEnabled,
|
episodeAvailabilityEnabled,
|
||||||
fx.Annotate(
|
fx.Annotate(
|
||||||
func(queries *db.Queries, jikanClient *jikan.Client, providers []domain.EpisodeAvailabilityProvider, enabled bool) domain.EpisodeService {
|
func(queries *db.Queries, jikanClient *jikan.Client, providers []domain.EpisodeAvailabilityProvider, enabled bool, metrics *observability.Metrics) domain.EpisodeService {
|
||||||
return episodeService.NewEpisodeService(queries, jikanClient, providers, enabled)
|
return episodeService.NewEpisodeService(queries, jikanClient, providers, enabled, metrics)
|
||||||
},
|
},
|
||||||
fx.ParamTags(``, ``, ``, ``),
|
fx.ParamTags(``, ``, ``, ``, ``),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
fx.Provide(func(p *allanime.AllAnimeProvider) []domain.EpisodeAvailabilityProvider {
|
fx.Provide(func(p *allanime.AllAnimeProvider) []domain.EpisodeAvailabilityProvider {
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"mal/integrations/jikan"
|
"mal/integrations/jikan"
|
||||||
"mal/internal/db"
|
"mal/internal/db"
|
||||||
"mal/internal/domain"
|
"mal/internal/domain"
|
||||||
|
"mal/internal/observability"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -34,19 +35,21 @@ type EpisodeService struct {
|
|||||||
providers []domain.EpisodeAvailabilityProvider
|
providers []domain.EpisodeAvailabilityProvider
|
||||||
clock Clock
|
clock Clock
|
||||||
enabled bool
|
enabled bool
|
||||||
|
metrics *observability.Metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEpisodeService(queries *db.Queries, jikanClient *jikan.Client, providers []domain.EpisodeAvailabilityProvider, enabled bool) domain.EpisodeService {
|
func NewEpisodeService(queries *db.Queries, jikanClient *jikan.Client, providers []domain.EpisodeAvailabilityProvider, enabled bool, metrics *observability.Metrics) domain.EpisodeService {
|
||||||
return NewEpisodeServiceWithClock(queries, jikanClient, providers, enabled, realClock{})
|
return NewEpisodeServiceWithClock(queries, jikanClient, providers, enabled, realClock{}, metrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEpisodeServiceWithClock(queries *db.Queries, jikanClient *jikan.Client, providers []domain.EpisodeAvailabilityProvider, enabled bool, clock Clock) *EpisodeService {
|
func NewEpisodeServiceWithClock(queries *db.Queries, jikanClient *jikan.Client, providers []domain.EpisodeAvailabilityProvider, enabled bool, clock Clock, metrics *observability.Metrics) *EpisodeService {
|
||||||
return &EpisodeService{
|
return &EpisodeService{
|
||||||
queries: queries,
|
queries: queries,
|
||||||
jikan: jikanClient,
|
jikan: jikanClient,
|
||||||
providers: providers,
|
providers: providers,
|
||||||
clock: clock,
|
clock: clock,
|
||||||
enabled: enabled,
|
enabled: enabled,
|
||||||
|
metrics: metrics,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -143,14 +146,20 @@ func (s *EpisodeService) providerID(ctx context.Context, anime domain.Anime, pro
|
|||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if row.FailedUntil.Valid && row.FailedUntil.Time.After(s.clock.Now()) {
|
if row.FailedUntil.Valid && row.FailedUntil.Time.After(s.clock.Now()) {
|
||||||
|
s.metrics.ObserveCache("episode_provider_mapping", "hit")
|
||||||
return "", fmt.Errorf("cached provider mapping failure active until %s: %s", row.FailedUntil.Time.Format(time.RFC3339), row.LastError)
|
return "", fmt.Errorf("cached provider mapping failure active until %s: %s", row.FailedUntil.Time.Format(time.RFC3339), row.LastError)
|
||||||
}
|
}
|
||||||
if strings.TrimSpace(row.ProviderShowID) != "" {
|
if strings.TrimSpace(row.ProviderShowID) != "" {
|
||||||
|
s.metrics.ObserveCache("episode_provider_mapping", "hit")
|
||||||
log.Printf("episodes: provider id cache hit anime_id=%d provider=%s provider_id=%s", anime.MalID, provider.Name(), row.ProviderShowID)
|
log.Printf("episodes: provider id cache hit anime_id=%d provider=%s provider_id=%s", anime.MalID, provider.Name(), row.ProviderShowID)
|
||||||
return row.ProviderShowID, nil
|
return row.ProviderShowID, nil
|
||||||
}
|
}
|
||||||
|
s.metrics.ObserveCache("episode_provider_mapping", "miss")
|
||||||
} else if !errors.Is(err, sql.ErrNoRows) {
|
} else if !errors.Is(err, sql.ErrNoRows) {
|
||||||
|
s.metrics.ObserveCache("episode_provider_mapping", "miss")
|
||||||
log.Printf("episodes: provider id cache read failed anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err)
|
log.Printf("episodes: provider id cache read failed anime_id=%d provider=%s error=%v", anime.MalID, provider.Name(), err)
|
||||||
|
} else {
|
||||||
|
s.metrics.ObserveCache("episode_provider_mapping", "miss")
|
||||||
}
|
}
|
||||||
|
|
||||||
providerID, err := provider.ResolveEpisodeProviderID(ctx, anime.MalID, titles)
|
providerID, err := provider.ResolveEpisodeProviderID(ctx, anime.MalID, titles)
|
||||||
@@ -256,31 +265,38 @@ func (s *EpisodeService) markFailure(ctx context.Context, anime domain.Anime, ca
|
|||||||
func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) {
|
func (s *EpisodeService) getCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) {
|
||||||
row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID))
|
row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.metrics.ObserveCache("episode_availability", "miss")
|
||||||
return domain.CanonicalEpisodeList{}, false
|
return domain.CanonicalEpisodeList{}, false
|
||||||
}
|
}
|
||||||
var payload domain.CanonicalEpisodeList
|
var payload domain.CanonicalEpisodeList
|
||||||
if err := json.Unmarshal([]byte(row.Data), &payload); err != nil {
|
if err := json.Unmarshal([]byte(row.Data), &payload); err != nil {
|
||||||
|
s.metrics.ObserveCache("episode_availability", "miss")
|
||||||
log.Printf("episodes: invalid cached payload anime_id=%d error=%v", animeID, err)
|
log.Printf("episodes: invalid cached payload anime_id=%d error=%v", animeID, err)
|
||||||
return domain.CanonicalEpisodeList{}, false
|
return domain.CanonicalEpisodeList{}, false
|
||||||
}
|
}
|
||||||
|
s.metrics.ObserveCache("episode_availability", "hit")
|
||||||
return payload, true
|
return payload, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *EpisodeService) getFreshCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) {
|
func (s *EpisodeService) getFreshCached(ctx context.Context, animeID int) (domain.CanonicalEpisodeList, bool) {
|
||||||
row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID))
|
row, err := s.queries.GetEpisodeAvailabilityCache(ctx, int64(animeID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||||
return domain.CanonicalEpisodeList{}, false
|
return domain.CanonicalEpisodeList{}, false
|
||||||
}
|
}
|
||||||
if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(s.clock.Now()) {
|
if row.NextRefreshAt.Valid && !row.NextRefreshAt.Time.After(s.clock.Now()) {
|
||||||
|
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||||
log.Printf("episodes: cached availability due for refresh anime_id=%d next_refresh=%s", animeID, row.NextRefreshAt.Time.Format(time.RFC3339))
|
log.Printf("episodes: cached availability due for refresh anime_id=%d next_refresh=%s", animeID, row.NextRefreshAt.Time.Format(time.RFC3339))
|
||||||
return domain.CanonicalEpisodeList{}, false
|
return domain.CanonicalEpisodeList{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
var payload domain.CanonicalEpisodeList
|
var payload domain.CanonicalEpisodeList
|
||||||
if err := json.Unmarshal([]byte(row.Data), &payload); err != nil {
|
if err := json.Unmarshal([]byte(row.Data), &payload); err != nil {
|
||||||
|
s.metrics.ObserveCache("episode_availability_fresh", "miss")
|
||||||
log.Printf("episodes: invalid cached payload anime_id=%d error=%v", animeID, err)
|
log.Printf("episodes: invalid cached payload anime_id=%d error=%v", animeID, err)
|
||||||
return domain.CanonicalEpisodeList{}, false
|
return domain.CanonicalEpisodeList{}, false
|
||||||
}
|
}
|
||||||
|
s.metrics.ObserveCache("episode_availability_fresh", "hit")
|
||||||
log.Printf("episodes: served cached availability anime_id=%d episodes=%d next_refresh=%s", animeID, len(payload.Episodes), payload.NextRefreshAt)
|
log.Printf("episodes: served cached availability anime_id=%d episodes=%d next_refresh=%s", animeID, len(payload.Episodes), payload.NextRefreshAt)
|
||||||
return payload, true
|
return payload, true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"log"
|
"log"
|
||||||
"mal/internal/domain"
|
"mal/internal/domain"
|
||||||
|
"mal/internal/observability"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
@@ -11,7 +12,7 @@ import (
|
|||||||
|
|
||||||
const workerInterval = time.Minute
|
const workerInterval = time.Minute
|
||||||
|
|
||||||
func RegisterWorker(lc fx.Lifecycle, svc domain.EpisodeService) {
|
func RegisterWorker(lc fx.Lifecycle, svc domain.EpisodeService, metrics *observability.Metrics) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
@@ -23,7 +24,10 @@ func RegisterWorker(lc fx.Lifecycle, svc domain.EpisodeService) {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
if err := svc.RefreshTrackedDue(ctx, 25); err != nil {
|
if err := svc.RefreshTrackedDue(ctx, 25); err != nil {
|
||||||
|
metrics.ObserveWorkerTick("episodes_availability", err)
|
||||||
log.Printf("episodes: availability worker tick failed error=%v", err)
|
log.Printf("episodes: availability worker tick failed error=%v", err)
|
||||||
|
} else {
|
||||||
|
metrics.ObserveWorkerTick("episodes_availability", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
|||||||
292
internal/observability/metrics.go
Normal file
292
internal/observability/metrics.go
Normal file
@@ -0,0 +1,292 @@
|
|||||||
|
package observability
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var defaultDurationBuckets = []float64{
|
||||||
|
0.005,
|
||||||
|
0.01,
|
||||||
|
0.025,
|
||||||
|
0.05,
|
||||||
|
0.1,
|
||||||
|
0.25,
|
||||||
|
0.5,
|
||||||
|
1,
|
||||||
|
2.5,
|
||||||
|
5,
|
||||||
|
10,
|
||||||
|
}
|
||||||
|
|
||||||
|
type counterSample struct {
|
||||||
|
labels map[string]string
|
||||||
|
value uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type histogramSample struct {
|
||||||
|
labels map[string]string
|
||||||
|
buckets []uint64
|
||||||
|
count uint64
|
||||||
|
sum float64
|
||||||
|
}
|
||||||
|
|
||||||
|
type counterVec struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
labelNames []string
|
||||||
|
samples map[string]*counterSample
|
||||||
|
}
|
||||||
|
|
||||||
|
type histogramVec struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
labelNames []string
|
||||||
|
bounds []float64
|
||||||
|
samples map[string]*histogramSample
|
||||||
|
}
|
||||||
|
|
||||||
|
type Metrics struct {
|
||||||
|
httpRequests *counterVec
|
||||||
|
httpRequestLatency *histogramVec
|
||||||
|
jikanRequests *counterVec
|
||||||
|
jikanRequestErrors *counterVec
|
||||||
|
jikanLatency *histogramVec
|
||||||
|
workerTicks *counterVec
|
||||||
|
cacheOperations *counterVec
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMetrics() *Metrics {
|
||||||
|
return &Metrics{
|
||||||
|
httpRequests: newCounterVec("method", "route", "status"),
|
||||||
|
httpRequestLatency: newHistogramVec(defaultDurationBuckets, "method", "route", "status"),
|
||||||
|
jikanRequests: newCounterVec("endpoint", "status"),
|
||||||
|
jikanRequestErrors: newCounterVec("endpoint", "status"),
|
||||||
|
jikanLatency: newHistogramVec(defaultDurationBuckets, "endpoint", "status"),
|
||||||
|
workerTicks: newCounterVec("worker", "result"),
|
||||||
|
cacheOperations: newCounterVec("cache", "result"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Metrics) Handler() http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
m.writePrometheus(w)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Metrics) ObserveHTTPRequest(method string, route string, status int, duration time.Duration) {
|
||||||
|
statusLabel := strconv.Itoa(status)
|
||||||
|
m.httpRequests.Inc(method, route, statusLabel)
|
||||||
|
m.httpRequestLatency.Observe(duration.Seconds(), method, route, statusLabel)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Metrics) ObserveJikanRequest(endpoint string, status int, duration time.Duration, err error) {
|
||||||
|
statusLabel := strconv.Itoa(status)
|
||||||
|
m.jikanRequests.Inc(endpoint, statusLabel)
|
||||||
|
m.jikanLatency.Observe(duration.Seconds(), endpoint, statusLabel)
|
||||||
|
if err != nil || status >= http.StatusBadRequest {
|
||||||
|
m.jikanRequestErrors.Inc(endpoint, statusLabel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Metrics) ObserveWorkerTick(worker string, err error) {
|
||||||
|
if err != nil {
|
||||||
|
m.workerTicks.Inc(worker, "failure")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.workerTicks.Inc(worker, "success")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Metrics) ObserveCache(cache string, result string) {
|
||||||
|
m.cacheOperations.Inc(cache, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Metrics) writePrometheus(w http.ResponseWriter) {
|
||||||
|
writeCounterMetric(w, "mal_http_requests_total", "Total HTTP requests by method, route, and status.", m.httpRequests.snapshot())
|
||||||
|
writeHistogramMetric(w, "mal_http_request_duration_seconds", "HTTP request latency in seconds.", m.httpRequestLatency.snapshot(), m.httpRequestLatency.bounds)
|
||||||
|
writeCounterMetric(w, "mal_jikan_upstream_requests_total", "Total upstream Jikan requests by endpoint and status.", m.jikanRequests.snapshot())
|
||||||
|
writeCounterMetric(w, "mal_jikan_upstream_errors_total", "Total upstream Jikan errors by endpoint and status.", m.jikanRequestErrors.snapshot())
|
||||||
|
writeHistogramMetric(w, "mal_jikan_upstream_request_duration_seconds", "Upstream Jikan request latency in seconds.", m.jikanLatency.snapshot(), m.jikanLatency.bounds)
|
||||||
|
writeCounterMetric(w, "mal_worker_ticks_total", "Total background worker ticks by worker and result.", m.workerTicks.snapshot())
|
||||||
|
writeCounterMetric(w, "mal_cache_operations_total", "Total cache hits and misses by cache name.", m.cacheOperations.snapshot())
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCounterVec(labelNames ...string) *counterVec {
|
||||||
|
return &counterVec{
|
||||||
|
labelNames: append([]string(nil), labelNames...),
|
||||||
|
samples: make(map[string]*counterSample),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *counterVec) Inc(labelValues ...string) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
key, labels := buildLabelKey(c.labelNames, labelValues)
|
||||||
|
sample, ok := c.samples[key]
|
||||||
|
if !ok {
|
||||||
|
sample = &counterSample{labels: labels}
|
||||||
|
c.samples[key] = sample
|
||||||
|
}
|
||||||
|
sample.value++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *counterVec) snapshot() []counterSample {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
keys := sortedCounterSampleKeys(c.samples)
|
||||||
|
out := make([]counterSample, 0, len(keys))
|
||||||
|
for _, key := range keys {
|
||||||
|
sample := c.samples[key]
|
||||||
|
out = append(out, counterSample{
|
||||||
|
labels: copyLabels(sample.labels),
|
||||||
|
value: sample.value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHistogramVec(bounds []float64, labelNames ...string) *histogramVec {
|
||||||
|
return &histogramVec{
|
||||||
|
labelNames: append([]string(nil), labelNames...),
|
||||||
|
bounds: append([]float64(nil), bounds...),
|
||||||
|
samples: make(map[string]*histogramSample),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *histogramVec) Observe(value float64, labelValues ...string) {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
|
||||||
|
key, labels := buildLabelKey(h.labelNames, labelValues)
|
||||||
|
sample, ok := h.samples[key]
|
||||||
|
if !ok {
|
||||||
|
sample = &histogramSample{
|
||||||
|
labels: labels,
|
||||||
|
buckets: make([]uint64, len(h.bounds)),
|
||||||
|
}
|
||||||
|
h.samples[key] = sample
|
||||||
|
}
|
||||||
|
|
||||||
|
sample.count++
|
||||||
|
sample.sum += value
|
||||||
|
for idx, bound := range h.bounds {
|
||||||
|
if value <= bound {
|
||||||
|
sample.buckets[idx]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *histogramVec) snapshot() []histogramSample {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
|
||||||
|
keys := sortedHistogramSampleKeys(h.samples)
|
||||||
|
out := make([]histogramSample, 0, len(keys))
|
||||||
|
for _, key := range keys {
|
||||||
|
sample := h.samples[key]
|
||||||
|
buckets := make([]uint64, len(sample.buckets))
|
||||||
|
copy(buckets, sample.buckets)
|
||||||
|
out = append(out, histogramSample{
|
||||||
|
labels: copyLabels(sample.labels),
|
||||||
|
buckets: buckets,
|
||||||
|
count: sample.count,
|
||||||
|
sum: sample.sum,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildLabelKey(labelNames []string, labelValues []string) (string, map[string]string) {
|
||||||
|
if len(labelNames) != len(labelValues) {
|
||||||
|
panic("label cardinality mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
labels := make(map[string]string, len(labelNames))
|
||||||
|
parts := make([]string, 0, len(labelNames)*2)
|
||||||
|
for idx, name := range labelNames {
|
||||||
|
value := labelValues[idx]
|
||||||
|
labels[name] = value
|
||||||
|
parts = append(parts, name, value)
|
||||||
|
}
|
||||||
|
return strings.Join(parts, "\xff"), labels
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyLabels(labels map[string]string) map[string]string {
|
||||||
|
out := make(map[string]string, len(labels))
|
||||||
|
for key, value := range labels {
|
||||||
|
out[key] = value
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortedCounterSampleKeys(samples map[string]*counterSample) []string {
|
||||||
|
keys := make([]string, 0, len(samples))
|
||||||
|
for key := range samples {
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortedHistogramSampleKeys(samples map[string]*histogramSample) []string {
|
||||||
|
keys := make([]string, 0, len(samples))
|
||||||
|
for key := range samples {
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeCounterMetric(w http.ResponseWriter, name string, help string, samples []counterSample) {
|
||||||
|
_, _ = fmt.Fprintf(w, "# HELP %s %s\n", name, help)
|
||||||
|
_, _ = fmt.Fprintf(w, "# TYPE %s counter\n", name)
|
||||||
|
for _, sample := range samples {
|
||||||
|
_, _ = fmt.Fprintf(w, "%s%s %d\n", name, formatLabels(sample.labels), sample.value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeHistogramMetric(w http.ResponseWriter, name string, help string, samples []histogramSample, bounds []float64) {
|
||||||
|
_, _ = fmt.Fprintf(w, "# HELP %s %s\n", name, help)
|
||||||
|
_, _ = fmt.Fprintf(w, "# TYPE %s histogram\n", name)
|
||||||
|
for _, sample := range samples {
|
||||||
|
for idx, bound := range bounds {
|
||||||
|
labels := copyLabels(sample.labels)
|
||||||
|
labels["le"] = formatFloat(bound)
|
||||||
|
_, _ = fmt.Fprintf(w, "%s_bucket%s %d\n", name, formatLabels(labels), sample.buckets[idx])
|
||||||
|
}
|
||||||
|
labels := copyLabels(sample.labels)
|
||||||
|
labels["le"] = "+Inf"
|
||||||
|
_, _ = fmt.Fprintf(w, "%s_bucket%s %d\n", name, formatLabels(labels), sample.count)
|
||||||
|
_, _ = fmt.Fprintf(w, "%s_sum%s %s\n", name, formatLabels(sample.labels), formatFloat(sample.sum))
|
||||||
|
_, _ = fmt.Fprintf(w, "%s_count%s %d\n", name, formatLabels(sample.labels), sample.count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func formatLabels(labels map[string]string) string {
|
||||||
|
if len(labels) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
keys := make([]string, 0, len(labels))
|
||||||
|
for key := range labels {
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
|
||||||
|
parts := make([]string, 0, len(keys))
|
||||||
|
for _, key := range keys {
|
||||||
|
parts = append(parts, fmt.Sprintf(`%s=%q`, key, labels[key]))
|
||||||
|
}
|
||||||
|
return "{" + strings.Join(parts, ",") + "}"
|
||||||
|
}
|
||||||
|
|
||||||
|
func formatFloat(value float64) string {
|
||||||
|
return strconv.FormatFloat(value, 'f', -1, 64)
|
||||||
|
}
|
||||||
47
internal/observability/metrics_test.go
Normal file
47
internal/observability/metrics_test.go
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
package observability
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMetricsHandlerRendersPrometheusFamilies(t *testing.T) {
|
||||||
|
metrics := NewMetrics()
|
||||||
|
metrics.ObserveHTTPRequest(http.MethodGet, "/anime/:id", http.StatusOK, 125*time.Millisecond)
|
||||||
|
metrics.ObserveJikanRequest("/anime/{id}", http.StatusTooManyRequests, 800*time.Millisecond, assertErr{})
|
||||||
|
metrics.ObserveWorkerTick("episodes_availability", nil)
|
||||||
|
metrics.ObserveCache("jikan", "hit")
|
||||||
|
metrics.ObserveCache("episode_availability", "miss")
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/metrics", nil)
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
metrics.Handler().ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
body, err := io.ReadAll(rec.Result().Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("read body: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
text := string(body)
|
||||||
|
assertContains(t, text, `mal_http_requests_total{method="GET",route="/anime/:id",status="200"} 1`)
|
||||||
|
assertContains(t, text, `mal_http_request_duration_seconds_count{method="GET",route="/anime/:id",status="200"} 1`)
|
||||||
|
assertContains(t, text, `mal_jikan_upstream_requests_total{endpoint="/anime/{id}",status="429"} 1`)
|
||||||
|
assertContains(t, text, `mal_jikan_upstream_errors_total{endpoint="/anime/{id}",status="429"} 1`)
|
||||||
|
assertContains(t, text, `mal_worker_ticks_total{result="success",worker="episodes_availability"} 1`)
|
||||||
|
assertContains(t, text, `mal_cache_operations_total{cache="episode_availability",result="miss"} 1`)
|
||||||
|
}
|
||||||
|
|
||||||
|
type assertErr struct{}
|
||||||
|
|
||||||
|
func (assertErr) Error() string { return "boom" }
|
||||||
|
|
||||||
|
func assertContains(t *testing.T, text string, want string) {
|
||||||
|
t.Helper()
|
||||||
|
if !strings.Contains(text, want) {
|
||||||
|
t.Fatalf("missing metric line %q in:\n%s", want, text)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,13 +2,14 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
"mal/internal/observability"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
|
|
||||||
func RequestLogger() gin.HandlerFunc {
|
func RequestLogger(metrics *observability.Metrics) gin.HandlerFunc {
|
||||||
return func(c *gin.Context) {
|
return func(c *gin.Context) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
path := c.Request.URL.Path
|
path := c.Request.URL.Path
|
||||||
@@ -21,6 +22,9 @@ func RequestLogger() gin.HandlerFunc {
|
|||||||
route = path
|
route = path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
duration := time.Since(start)
|
||||||
|
metrics.ObserveHTTPRequest(c.Request.Method, route, c.Writer.Status(), duration)
|
||||||
|
|
||||||
log.Printf(
|
log.Printf(
|
||||||
"http_request method=%s route=%s path=%s query=%s status=%d duration_ms=%.2f bytes=%d client_ip=%s errors=%s",
|
"http_request method=%s route=%s path=%s query=%s status=%d duration_ms=%.2f bytes=%d client_ip=%s errors=%s",
|
||||||
c.Request.Method,
|
c.Request.Method,
|
||||||
@@ -28,7 +32,7 @@ func RequestLogger() gin.HandlerFunc {
|
|||||||
strconv.Quote(path),
|
strconv.Quote(path),
|
||||||
strconv.Quote(query),
|
strconv.Quote(query),
|
||||||
c.Writer.Status(),
|
c.Writer.Status(),
|
||||||
float64(time.Since(start).Microseconds())/1000,
|
float64(duration.Microseconds())/1000,
|
||||||
c.Writer.Size(),
|
c.Writer.Size(),
|
||||||
strconv.Quote(c.ClientIP()),
|
strconv.Quote(c.ClientIP()),
|
||||||
strconv.Quote(c.Errors.ByType(gin.ErrorTypePrivate).String()),
|
strconv.Quote(c.Errors.ByType(gin.ErrorTypePrivate).String()),
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package server
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
"log"
|
||||||
|
"mal/internal/observability"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
@@ -13,18 +14,20 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var Module = fx.Options(
|
var Module = fx.Options(
|
||||||
|
fx.Provide(observability.NewMetrics),
|
||||||
fx.Provide(ProvideRouter),
|
fx.Provide(ProvideRouter),
|
||||||
fx.Invoke(RunServer),
|
fx.Invoke(RunServer),
|
||||||
)
|
)
|
||||||
|
|
||||||
func ProvideRouter(htmlRender render.HTMLRender) *gin.Engine {
|
func ProvideRouter(htmlRender render.HTMLRender, metrics *observability.Metrics) *gin.Engine {
|
||||||
if os.Getenv("GIN_MODE") == "" {
|
if os.Getenv("GIN_MODE") == "" {
|
||||||
gin.SetMode(gin.ReleaseMode)
|
gin.SetMode(gin.ReleaseMode)
|
||||||
}
|
}
|
||||||
r := gin.New()
|
r := gin.New()
|
||||||
r.Use(CORSMiddleware(), RequestLogger(), gin.Recovery())
|
r.Use(CORSMiddleware(), RequestLogger(metrics), gin.Recovery())
|
||||||
r.Static("/static", "./static")
|
r.Static("/static", "./static")
|
||||||
r.Static("/dist", "./dist")
|
r.Static("/dist", "./dist")
|
||||||
|
r.GET("/metrics", gin.WrapH(metrics.Handler()))
|
||||||
r.HTMLRender = htmlRender
|
r.HTMLRender = htmlRender
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"mal/internal/observability"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -42,7 +43,7 @@ func TestRequestLoggerUsesMatchedRoute(t *testing.T) {
|
|||||||
defer log.SetOutput(previousOutput)
|
defer log.SetOutput(previousOutput)
|
||||||
|
|
||||||
router := gin.New()
|
router := gin.New()
|
||||||
router.Use(RequestLogger())
|
router.Use(RequestLogger(observability.NewMetrics()))
|
||||||
router.GET("/anime/:id", func(c *gin.Context) {
|
router.GET("/anime/:id", func(c *gin.Context) {
|
||||||
c.String(http.StatusOK, "ok")
|
c.String(http.StatusOK, "ok")
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user