feat: time database queries
This commit is contained in:
@@ -32,8 +32,8 @@ func ProvideSQLDB(cfg config.Config) (*sql.DB, error) {
|
||||
return dbConn, nil
|
||||
}
|
||||
|
||||
func ProvideQueries(sqlDB *sql.DB) *db.Queries {
|
||||
return db.New(sqlDB)
|
||||
func ProvideQueries(sqlDB *sql.DB, metrics *observability.Metrics) *db.Queries {
|
||||
return db.New(observability.InstrumentDB(sqlDB, metrics))
|
||||
}
|
||||
|
||||
func RunMigrations(sqlDB *sql.DB) error {
|
||||
|
||||
51
internal/observability/db.go
Normal file
51
internal/observability/db.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package observability
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
)
|
||||
|
||||
type instrumentedDB struct {
|
||||
db interface {
|
||||
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
|
||||
PrepareContext(context.Context, string) (*sql.Stmt, error)
|
||||
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
|
||||
}
|
||||
metrics *Metrics
|
||||
}
|
||||
|
||||
func InstrumentDB(db interface {
|
||||
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
|
||||
PrepareContext(context.Context, string) (*sql.Stmt, error)
|
||||
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
|
||||
}, metrics *Metrics) *instrumentedDB {
|
||||
return &instrumentedDB{db: db, metrics: metrics}
|
||||
}
|
||||
|
||||
func (db *instrumentedDB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
|
||||
start := time.Now()
|
||||
result, err := db.db.ExecContext(ctx, query, args...)
|
||||
db.metrics.ObserveDBQuery("exec", time.Since(start), err)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (db *instrumentedDB) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
|
||||
return db.db.PrepareContext(ctx, query)
|
||||
}
|
||||
|
||||
func (db *instrumentedDB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
|
||||
start := time.Now()
|
||||
rows, err := db.db.QueryContext(ctx, query, args...)
|
||||
db.metrics.ObserveDBQuery("query", time.Since(start), err)
|
||||
return rows, err
|
||||
}
|
||||
|
||||
func (db *instrumentedDB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
|
||||
start := time.Now()
|
||||
row := db.db.QueryRowContext(ctx, query, args...)
|
||||
db.metrics.ObserveDBQuery("query_row", time.Since(start), nil)
|
||||
return row
|
||||
}
|
||||
@@ -56,6 +56,7 @@ type Metrics struct {
|
||||
jikanRequests *counterVec
|
||||
jikanRequestErrors *counterVec
|
||||
jikanLatency *histogramVec
|
||||
dbQueryLatency *histogramVec
|
||||
workerTicks *counterVec
|
||||
cacheOperations *counterVec
|
||||
}
|
||||
@@ -67,6 +68,7 @@ func NewMetrics() *Metrics {
|
||||
jikanRequests: newCounterVec("endpoint", "status"),
|
||||
jikanRequestErrors: newCounterVec("endpoint", "status"),
|
||||
jikanLatency: newHistogramVec(defaultDurationBuckets, "endpoint", "status"),
|
||||
dbQueryLatency: newHistogramVec(defaultDurationBuckets, "operation", "result"),
|
||||
workerTicks: newCounterVec("worker", "result"),
|
||||
cacheOperations: newCounterVec("cache", "result"),
|
||||
}
|
||||
@@ -95,6 +97,14 @@ func (m *Metrics) ObserveJikanRequest(endpoint string, status int, duration time
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Metrics) ObserveDBQuery(operation string, duration time.Duration, err error) {
|
||||
result := "success"
|
||||
if err != nil {
|
||||
result = "error"
|
||||
}
|
||||
m.dbQueryLatency.Observe(duration.Seconds(), operation, result)
|
||||
}
|
||||
|
||||
func (m *Metrics) ObserveWorkerTick(worker string, err error) {
|
||||
if err != nil {
|
||||
m.workerTicks.Inc(worker, "failure")
|
||||
@@ -113,6 +123,7 @@ func (m *Metrics) writePrometheus(w http.ResponseWriter) {
|
||||
writeCounterMetric(w, "mal_jikan_upstream_requests_total", "Total upstream Jikan requests by endpoint and status.", m.jikanRequests.snapshot())
|
||||
writeCounterMetric(w, "mal_jikan_upstream_errors_total", "Total upstream Jikan errors by endpoint and status.", m.jikanRequestErrors.snapshot())
|
||||
writeHistogramMetric(w, "mal_jikan_upstream_request_duration_seconds", "Upstream Jikan request latency in seconds.", m.jikanLatency.snapshot(), m.jikanLatency.bounds)
|
||||
writeHistogramMetric(w, "mal_db_query_duration_seconds", "Database query latency in seconds.", m.dbQueryLatency.snapshot(), m.dbQueryLatency.bounds)
|
||||
writeCounterMetric(w, "mal_worker_ticks_total", "Total background worker ticks by worker and result.", m.workerTicks.snapshot())
|
||||
writeCounterMetric(w, "mal_cache_operations_total", "Total cache hits and misses by cache name.", m.cacheOperations.snapshot())
|
||||
}
|
||||
|
||||
@@ -2,18 +2,22 @@ package observability
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
func TestMetricsHandlerRendersPrometheusFamilies(t *testing.T) {
|
||||
metrics := NewMetrics()
|
||||
metrics.ObserveHTTPRequest(http.MethodGet, "/anime/:id", http.StatusOK, 125*time.Millisecond)
|
||||
metrics.ObserveJikanRequest("/anime/{id}", http.StatusTooManyRequests, 800*time.Millisecond, assertErr{})
|
||||
metrics.ObserveDBQuery("query", 25*time.Millisecond, nil)
|
||||
metrics.ObserveWorkerTick("episodes_availability", nil)
|
||||
metrics.ObserveCache("jikan", "hit")
|
||||
metrics.ObserveCache("episode_availability", "miss")
|
||||
@@ -32,10 +36,39 @@ func TestMetricsHandlerRendersPrometheusFamilies(t *testing.T) {
|
||||
assertContains(t, text, `mal_http_request_duration_seconds_count{method="GET",route="/anime/:id",status="200"} 1`)
|
||||
assertContains(t, text, `mal_jikan_upstream_requests_total{endpoint="/anime/{id}",status="429"} 1`)
|
||||
assertContains(t, text, `mal_jikan_upstream_errors_total{endpoint="/anime/{id}",status="429"} 1`)
|
||||
assertContains(t, text, `mal_db_query_duration_seconds_count{operation="query",result="success"} 1`)
|
||||
assertContains(t, text, `mal_worker_ticks_total{result="success",worker="episodes_availability"} 1`)
|
||||
assertContains(t, text, `mal_cache_operations_total{cache="episode_availability",result="miss"} 1`)
|
||||
}
|
||||
|
||||
func TestInstrumentDBRecordsQueryLatency(t *testing.T) {
|
||||
metrics := NewMetrics()
|
||||
sqlDB, err := sql.Open("sqlite3", ":memory:")
|
||||
if err != nil {
|
||||
t.Fatalf("open sqlite: %v", err)
|
||||
}
|
||||
defer func() { _ = sqlDB.Close() }()
|
||||
|
||||
instrumented := InstrumentDB(sqlDB, metrics)
|
||||
if _, err := instrumented.ExecContext(context.Background(), `CREATE TABLE item (id INTEGER PRIMARY KEY)`); err != nil {
|
||||
t.Fatalf("create table: %v", err)
|
||||
}
|
||||
if _, err := instrumented.QueryContext(context.Background(), `SELECT id FROM item`); err != nil {
|
||||
t.Fatalf("query table: %v", err)
|
||||
}
|
||||
if _, err := instrumented.QueryContext(context.Background(), `SELECT id FROM missing_table`); err == nil {
|
||||
t.Fatal("expected missing table query to fail")
|
||||
}
|
||||
|
||||
samples := metrics.dbQueryLatency.snapshot()
|
||||
if len(samples) != 3 {
|
||||
t.Fatalf("db samples = %d, want 3", len(samples))
|
||||
}
|
||||
assertHistogramSample(t, samples, "exec", "success")
|
||||
assertHistogramSample(t, samples, "query", "success")
|
||||
assertHistogramSample(t, samples, "query", "error")
|
||||
}
|
||||
|
||||
type assertErr struct{}
|
||||
|
||||
func (assertErr) Error() string { return "boom" }
|
||||
@@ -46,3 +79,13 @@ func assertContains(t *testing.T, text string, want string) {
|
||||
t.Fatalf("missing metric line %q in:\n%s", want, text)
|
||||
}
|
||||
}
|
||||
|
||||
func assertHistogramSample(t *testing.T, samples []histogramSample, operation string, result string) {
|
||||
t.Helper()
|
||||
for _, sample := range samples {
|
||||
if sample.labels["operation"] == operation && sample.labels["result"] == result && sample.count == 1 {
|
||||
return
|
||||
}
|
||||
}
|
||||
t.Fatalf("missing db histogram sample operation=%q result=%q in %#v", operation, result, samples)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user