refactor: remove metrics from server and database
This commit is contained in:
@@ -24,9 +24,6 @@ var publicRoutes = []publicRoute{
|
|||||||
{path: "/static", prefix: true},
|
{path: "/static", prefix: true},
|
||||||
{path: "/dist", prefix: true},
|
{path: "/dist", prefix: true},
|
||||||
|
|
||||||
// Observability endpoints.
|
|
||||||
{method: http.MethodGet, path: "/metrics"},
|
|
||||||
|
|
||||||
// Auth API.
|
// Auth API.
|
||||||
{method: http.MethodPost, path: "/api/auth/login"},
|
{method: http.MethodPost, path: "/api/auth/login"},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,8 +33,8 @@ func ProvideSQLDB(cfg config.Config) (*sql.DB, error) {
|
|||||||
return dbConn, nil
|
return dbConn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ProvideQueries(sqlDB *sql.DB, metrics *observability.Metrics) *db.Queries {
|
func ProvideQueries(sqlDB *sql.DB) *db.Queries {
|
||||||
return db.New(observability.InstrumentDB(sqlDB, metrics))
|
return db.New(sqlDB)
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunMigrations(sqlDB *sql.DB) error {
|
func RunMigrations(sqlDB *sql.DB) error {
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"mal/internal/db"
|
"mal/internal/db"
|
||||||
"mal/internal/observability"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
@@ -64,7 +63,7 @@ func TestCleanupExpiredJikanCache(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanupExpiredJikanCache(ctx, db.New(sqlDB), observability.NewMetrics())
|
cleanupExpiredJikanCache(ctx, db.New(sqlDB))
|
||||||
|
|
||||||
keys := jikanCacheKeys(ctx, t, sqlDB)
|
keys := jikanCacheKeys(ctx, t, sqlDB)
|
||||||
if len(keys) != 1 || keys[0] != "fresh" {
|
if len(keys) != 1 || keys[0] != "fresh" {
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"mal/integrations/jikan"
|
"mal/integrations/jikan"
|
||||||
"mal/internal/config"
|
"mal/internal/config"
|
||||||
"mal/internal/db"
|
"mal/internal/db"
|
||||||
"mal/internal/observability"
|
|
||||||
errlog "mal/pkg"
|
errlog "mal/pkg"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -31,7 +30,7 @@ func applyAnimeDurationSecondsBackfill(ctx context.Context, sqlDB *sql.DB) error
|
|||||||
return fmt.Errorf("list anime missing duration_seconds: %w", err)
|
return fmt.Errorf("list anime missing duration_seconds: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
client := jikan.NewClient(config.Config{}, db.New(sqlDB), observability.NewMetrics())
|
client := jikan.NewClient(config.Config{}, db.New(sqlDB))
|
||||||
for _, row := range toUpdate {
|
for _, row := range toUpdate {
|
||||||
anime, err := client.GetAnimeByID(ctx, int(row.id))
|
anime, err := client.GetAnimeByID(ctx, int(row.id))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ const (
|
|||||||
jikanCacheCleanupWorker = "jikan_cache_cleanup"
|
jikanCacheCleanupWorker = "jikan_cache_cleanup"
|
||||||
)
|
)
|
||||||
|
|
||||||
func RegisterJikanCacheCleanupWorker(lc fx.Lifecycle, queries *db.Queries, metrics *observability.Metrics) {
|
func RegisterJikanCacheCleanupWorker(lc fx.Lifecycle, queries *db.Queries) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
@@ -24,7 +24,7 @@ func RegisterJikanCacheCleanupWorker(lc fx.Lifecycle, queries *db.Queries, metri
|
|||||||
<-startCtx.Done()
|
<-startCtx.Done()
|
||||||
cancel()
|
cancel()
|
||||||
}()
|
}()
|
||||||
go runJikanCacheCleanupWorker(ctx, queries, metrics)
|
go runJikanCacheCleanupWorker(ctx, queries)
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
OnStop: func(context.Context) error {
|
OnStop: func(context.Context) error {
|
||||||
@@ -34,14 +34,14 @@ func RegisterJikanCacheCleanupWorker(lc fx.Lifecycle, queries *db.Queries, metri
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func runJikanCacheCleanupWorker(ctx context.Context, queries *db.Queries, metrics *observability.Metrics) {
|
func runJikanCacheCleanupWorker(ctx context.Context, queries *db.Queries) {
|
||||||
observability.Info("jikan_cache_cleanup_worker_start", "database", "", nil)
|
observability.Info("jikan_cache_cleanup_worker_start", "database", "", nil)
|
||||||
|
|
||||||
ticker := time.NewTicker(jikanCacheCleanupInterval)
|
ticker := time.NewTicker(jikanCacheCleanupInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
cleanupExpiredJikanCache(ctx, queries, metrics)
|
cleanupExpiredJikanCache(ctx, queries)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
@@ -52,12 +52,11 @@ func runJikanCacheCleanupWorker(ctx context.Context, queries *db.Queries, metric
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func cleanupExpiredJikanCache(ctx context.Context, queries *db.Queries, metrics *observability.Metrics) {
|
func cleanupExpiredJikanCache(ctx context.Context, queries *db.Queries) {
|
||||||
cleanupCtx, cancel := context.WithTimeout(ctx, jikanCacheCleanupTimeout)
|
cleanupCtx, cancel := context.WithTimeout(ctx, jikanCacheCleanupTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
err := queries.DeleteExpiredJikanCache(cleanupCtx)
|
err := queries.DeleteExpiredJikanCache(cleanupCtx)
|
||||||
metrics.ObserveWorkerTick(jikanCacheCleanupWorker, err)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
observability.Warn(
|
observability.Warn(
|
||||||
"jikan_cache_cleanup_failed",
|
"jikan_cache_cleanup_failed",
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
)
|
)
|
||||||
|
|
||||||
func RequestLogger(metrics *observability.Metrics) gin.HandlerFunc {
|
func RequestLogger() gin.HandlerFunc {
|
||||||
return func(c *gin.Context) {
|
return func(c *gin.Context) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
path := c.Request.URL.Path
|
path := c.Request.URL.Path
|
||||||
@@ -21,8 +21,6 @@ func RequestLogger(metrics *observability.Metrics) gin.HandlerFunc {
|
|||||||
}
|
}
|
||||||
|
|
||||||
duration := time.Since(start)
|
duration := time.Since(start)
|
||||||
metrics.ObserveHTTPRequest(c.Request.Method, route, c.Writer.Status(), duration)
|
|
||||||
|
|
||||||
status := c.Writer.Status()
|
status := c.Writer.Status()
|
||||||
fields := map[string]any{
|
fields := map[string]any{
|
||||||
"client_ip": c.ClientIP(),
|
"client_ip": c.ClientIP(),
|
||||||
|
|||||||
@@ -16,22 +16,20 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var Module = fx.Options(
|
var Module = fx.Options(
|
||||||
fx.Provide(observability.NewMetrics),
|
|
||||||
fx.Provide(ProvideRouter),
|
fx.Provide(ProvideRouter),
|
||||||
fx.Invoke(RunServer),
|
fx.Invoke(RunServer),
|
||||||
)
|
)
|
||||||
|
|
||||||
func ProvideRouter(cfg config.Config, htmlRender render.HTMLRender, metrics *observability.Metrics) *gin.Engine {
|
func ProvideRouter(cfg config.Config, htmlRender render.HTMLRender) *gin.Engine {
|
||||||
if cfg.GinMode == "" {
|
if cfg.GinMode == "" {
|
||||||
gin.SetMode(gin.ReleaseMode)
|
gin.SetMode(gin.ReleaseMode)
|
||||||
} else {
|
} else {
|
||||||
gin.SetMode(cfg.GinMode)
|
gin.SetMode(cfg.GinMode)
|
||||||
}
|
}
|
||||||
r := gin.New()
|
r := gin.New()
|
||||||
r.Use(CORSMiddlewareWithConfig(cfg), RequestContextMiddleware(), audit.ContextMiddleware(), RequestLogger(metrics), gin.Recovery())
|
r.Use(CORSMiddlewareWithConfig(cfg), RequestContextMiddleware(), audit.ContextMiddleware(), RequestLogger(), gin.Recovery())
|
||||||
r.Static("/static", "./static")
|
r.Static("/static", "./static")
|
||||||
r.Static("/dist", "./dist")
|
r.Static("/dist", "./dist")
|
||||||
r.GET("/metrics", gin.WrapH(metrics.Handler()))
|
|
||||||
r.GET("/debug/pprof", gin.WrapH(http.DefaultServeMux))
|
r.GET("/debug/pprof", gin.WrapH(http.DefaultServeMux))
|
||||||
r.GET("/debug/pprof/*action", gin.WrapH(http.DefaultServeMux))
|
r.GET("/debug/pprof/*action", gin.WrapH(http.DefaultServeMux))
|
||||||
r.HTMLRender = htmlRender
|
r.HTMLRender = htmlRender
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"mal/internal/config"
|
"mal/internal/config"
|
||||||
"mal/internal/observability"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -39,7 +38,7 @@ func TestNewHTTPServer_TimeoutsAndAddr(t *testing.T) {
|
|||||||
func TestProvideRouterRegistersPprof(t *testing.T) {
|
func TestProvideRouterRegistersPprof(t *testing.T) {
|
||||||
gin.SetMode(gin.TestMode)
|
gin.SetMode(gin.TestMode)
|
||||||
|
|
||||||
router := ProvideRouter(config.Config{GinMode: gin.TestMode}, nil, observability.NewMetrics())
|
router := ProvideRouter(config.Config{GinMode: gin.TestMode}, nil)
|
||||||
req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/debug/pprof/", nil)
|
req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/debug/pprof/", nil)
|
||||||
rec := httptest.NewRecorder()
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
@@ -63,7 +62,7 @@ func TestRequestLoggerUsesMatchedRoute(t *testing.T) {
|
|||||||
|
|
||||||
router := gin.New()
|
router := gin.New()
|
||||||
router.Use(RequestContextMiddleware())
|
router.Use(RequestContextMiddleware())
|
||||||
router.Use(RequestLogger(observability.NewMetrics()))
|
router.Use(RequestLogger())
|
||||||
router.GET("/anime/:id", func(c *gin.Context) {
|
router.GET("/anime/:id", func(c *gin.Context) {
|
||||||
c.String(http.StatusOK, "ok")
|
c.String(http.StatusOK, "ok")
|
||||||
})
|
})
|
||||||
@@ -105,7 +104,7 @@ func TestRequestLoggerSkipsSuccessfulStreamProxy(t *testing.T) {
|
|||||||
|
|
||||||
router := gin.New()
|
router := gin.New()
|
||||||
router.Use(RequestContextMiddleware())
|
router.Use(RequestContextMiddleware())
|
||||||
router.Use(RequestLogger(observability.NewMetrics()))
|
router.Use(RequestLogger())
|
||||||
router.GET("/watch/proxy/stream", func(c *gin.Context) {
|
router.GET("/watch/proxy/stream", func(c *gin.Context) {
|
||||||
c.String(http.StatusOK, "segment")
|
c.String(http.StatusOK, "segment")
|
||||||
})
|
})
|
||||||
@@ -132,7 +131,7 @@ func TestRequestLoggerLogsFailedStreamProxy(t *testing.T) {
|
|||||||
|
|
||||||
router := gin.New()
|
router := gin.New()
|
||||||
router.Use(RequestContextMiddleware())
|
router.Use(RequestContextMiddleware())
|
||||||
router.Use(RequestLogger(observability.NewMetrics()))
|
router.Use(RequestLogger())
|
||||||
router.GET("/watch/proxy/stream", func(c *gin.Context) {
|
router.GET("/watch/proxy/stream", func(c *gin.Context) {
|
||||||
c.Status(http.StatusBadGateway)
|
c.Status(http.StatusBadGateway)
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user