From 16260ecedb89d4398f9b7ba64aa2802a9f471097 Mon Sep 17 00:00:00 2001 From: Sergey Chubaryan Date: Sun, 23 Feb 2025 20:09:55 +0300 Subject: [PATCH] add wrapper for prometheus --- cmd/backend/server/server.go | 12 +-- cmd/notifyer/config.go | 1 + cmd/notifyer/config.yaml | 1 + cmd/notifyer/event_handler.go | 90 +++++++++++++++++++++ cmd/notifyer/main.go | 98 +++++++++-------------- cmd/shortlinks/main.go | 9 ++- internal/http_server/metrics.go | 49 ++++++++++++ internal/http_server/recovery.go | 5 +- internal/http_server/request_log.go | 16 ++-- internal/integrations/prometheus.go | 117 +++++++++++----------------- 10 files changed, 245 insertions(+), 153 deletions(-) create mode 100644 cmd/notifyer/event_handler.go create mode 100644 internal/http_server/metrics.go diff --git a/cmd/backend/server/server.go b/cmd/backend/server/server.go index e76b7d4..028ee83 100644 --- a/cmd/backend/server/server.go +++ b/cmd/backend/server/server.go @@ -28,14 +28,14 @@ func NewServer(opts NewServerOpts) *httpserver.Server { r := gin.New() r.ContextWithFallback = true // Use it to allow getting values from c.Request.Context() - // r.Static("/webapp", "./webapp") + metrics := integrations.NewMetrics("backend") + serverMetrics := httpserver.NewServerMetrics(metrics) + r.GET("/health", handlers.New200OkHandler()) + r.Any("/metrics", gin.WrapH(metrics.HttpHandler())) - prometheus := integrations.NewPrometheus() - r.Any("/metrics", gin.WrapH(prometheus.GetRequestHandler())) - - r.Use(httpserver.NewRecoveryMiddleware(opts.Logger, prometheus, opts.DebugMode)) - r.Use(httpserver.NewRequestLogMiddleware(opts.Logger, opts.Tracer, prometheus)) + r.Use(httpserver.NewRecoveryMiddleware(opts.Logger, serverMetrics, opts.DebugMode)) + r.Use(httpserver.NewRequestLogMiddleware(opts.Logger, opts.Tracer, serverMetrics)) r.Use(httpserver.NewTracingMiddleware(opts.Tracer)) r.GET("/verify-user", handlers.NewUserVerifyEmailHandler(opts.Logger, opts.UserService)) diff --git a/cmd/notifyer/config.go b/cmd/notifyer/config.go index e40f597..f0c0437 100644 --- a/cmd/notifyer/config.go +++ b/cmd/notifyer/config.go @@ -8,6 +8,7 @@ func LoadConfig(filePath string) (Config, error) { type Config struct { App struct { + Port uint16 `yaml:"port"` LogFile string `yaml:"logFile"` ServiceUrl string `yaml:"serviceUrl"` } diff --git a/cmd/notifyer/config.yaml b/cmd/notifyer/config.yaml index 4de10a5..1762b99 100644 --- a/cmd/notifyer/config.yaml +++ b/cmd/notifyer/config.yaml @@ -1,4 +1,5 @@ app: + port: 8082 serviceUrl: "http://localhost:8080" kafka: brokers: diff --git a/cmd/notifyer/event_handler.go b/cmd/notifyer/event_handler.go new file mode 100644 index 0000000..9d2b58e --- /dev/null +++ b/cmd/notifyer/event_handler.go @@ -0,0 +1,90 @@ +package main + +import ( + "backend/internal/integrations" + "backend/pkg/logger" + "context" + "encoding/json" + "fmt" + "io" + + "github.com/segmentio/kafka-go" +) + +type SendEmailEvent struct { + Email string `json:"email"` + Token string `json:"token"` +} + +func NewEventHandler( + config Config, + logger logger.Logger, + metrics *integrations.Metrics, + emailer *Emailer, +) *EventHandler { + eventsCounter := metrics.NewCounter("events_counter", "total events handled") + return &EventHandler{ + config: config, + logger: logger, + emailer: emailer, + eventsCounter: eventsCounter, + } +} + +type EventHandler struct { + config Config + logger logger.Logger + emailer *Emailer + eventsCounter integrations.Counter +} + +func (e *EventHandler) eventLoop(ctx context.Context, kafkaReader *kafka.Reader) { + for { + msg, err := kafkaReader.FetchMessage(ctx) + if err == io.EOF { + e.logger.Fatal().Err(err) + } + if err != nil { + e.logger.Fatal().Err(err) + } + + select { + case <-ctx.Done(): + return + default: + } + + e.logger.Log().Msgf("event: %s", msg.Key) + e.eventsCounter.Inc() + + if err := kafkaReader.CommitMessages(ctx, msg); err != nil { + e.logger.Error().Err(err).Msg("failed to commit offset") + continue + } + + if err := e.handleEvent(ctx, msg); err != nil { + e.logger.Error().Err(err).Msg("failed to handle event") + continue + } + } +} + +func (e *EventHandler) handleEvent(ctx context.Context, msg kafka.Message) error { + event := SendEmailEvent{} + if err := json.Unmarshal(msg.Value, &event); err != nil { + return err + } + + // TODO: add context somehow + switch string(msg.Key) { + case "email_forgot_password": + return e.emailer.SendRestorePassword(event.Email, event.Token) + case "email_password_changed": + return e.emailer.SendPasswordChanged(event.Email) + case "email_verify_user": + link := fmt.Sprintf("%s/verify-user?token=%s", e.config.App.ServiceUrl, event.Token) + return e.emailer.SendVerifyUser(event.Email, link) + } + + return fmt.Errorf("unknown event type") +} diff --git a/cmd/notifyer/main.go b/cmd/notifyer/main.go index c328b89..71fb044 100644 --- a/cmd/notifyer/main.go +++ b/cmd/notifyer/main.go @@ -1,21 +1,16 @@ package main import ( + httpserver "backend/internal/http_server" + "backend/internal/integrations" "backend/pkg/logger" "context" - "encoding/json" - "fmt" - "io" "log" + "github.com/gin-gonic/gin" "github.com/segmentio/kafka-go" ) -type SendEmailEvent struct { - Email string `json:"email"` - Token string `json:"token"` -} - func main() { ctx := context.Background() @@ -24,67 +19,44 @@ func main() { log.Fatal(err.Error()) } - emailer, err := NewEmailer(config.SMTP) - if err != nil { - log.Fatal(err.Error()) - } - - r := kafka.NewReader(kafka.ReaderConfig{ - Brokers: config.Kafka.Brokers, - Topic: config.Kafka.Topic, - GroupID: config.Kafka.ConsumerGroupId, - }) - logger, err := logger.New(ctx, logger.NewLoggerOpts{ Debug: true, OutputFile: config.App.LogFile, }) if err != nil { - log.Fatal(err.Error()) + logger.Fatal().Err(err) } - logger.Printf("notifyer service started\n") - - for { - msg, err := r.FetchMessage(ctx) - if err == io.EOF { - log.Fatal("EOF") - return - } - if err != nil { - log.Fatal(err.Error()) - return - } - - log.Printf("offset: %d, partition: %d, key: %s, value: %s\n", msg.Offset, msg.Partition, string(msg.Key), string(msg.Value)) - - if err := r.CommitMessages(ctx, msg); err != nil { - log.Fatalf("failed to commit: %s\n", err.Error()) - continue - } - - if err := handleEvent(config, emailer, msg); err != nil { - log.Printf("failed to handle event: %s\n", err.Error()) - continue - } + emailer, err := NewEmailer(config.SMTP) + if err != nil { + logger.Fatal().Err(err) } -} - -func handleEvent(config Config, emailer *Emailer, msg kafka.Message) error { - event := SendEmailEvent{} - if err := json.Unmarshal(msg.Value, &event); err != nil { - return err - } - - switch string(msg.Key) { - case "email_forgot_password": - return emailer.SendRestorePassword(event.Email, event.Token) - case "email_password_changed": - return emailer.SendPasswordChanged(event.Email) - case "email_verify_user": - link := fmt.Sprintf("%s/verify-user?token=%s", config.App.ServiceUrl, event.Token) - return emailer.SendVerifyUser(event.Email, link) - } - - return fmt.Errorf("unknown event type") + + metrics := integrations.NewMetrics("notifyer") + + ginRouter := gin.New() + ginRouter.GET("/metrics", gin.WrapH(metrics.HttpHandler())) + ginRouter.GET("/health", func(ctx *gin.Context) { + ctx.Status(200) + }) + + kafkaReader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: config.Kafka.Brokers, + Topic: config.Kafka.Topic, + GroupID: config.Kafka.ConsumerGroupId, + }) + kafkaReader.SetOffset(kafka.LastOffset) + + eventHandler := NewEventHandler(config, logger, metrics, emailer) + go eventHandler.eventLoop(ctx, kafkaReader) + + logger.Log().Msg("notifyer service started") + + srv := httpserver.New( + httpserver.NewServerOpts{ + Logger: logger, + HttpServer: ginRouter, + }, + ) + srv.Run(ctx, config.App.Port) } diff --git a/cmd/shortlinks/main.go b/cmd/shortlinks/main.go index 2adbf3d..f9765ca 100644 --- a/cmd/shortlinks/main.go +++ b/cmd/shortlinks/main.go @@ -85,16 +85,17 @@ func RunServer(ctx context.Context, log logger.Logger, tracer trace.Tracer, conf gin.SetMode(gin.ReleaseMode) } - prometheus := integrations.NewPrometheus() + metrics := integrations.NewMetrics("shortlinks") + serverMetrics := httpserver.NewServerMetrics(metrics) r := gin.New() - r.Any("/metrics", gin.WrapH(prometheus.GetRequestHandler())) + r.Any("/metrics", gin.WrapH(metrics.HttpHandler())) r.GET("/health", func(ctx *gin.Context) { ctx.Status(200) }) - r.Use(httpserver.NewRecoveryMiddleware(log, prometheus, debugMode)) - r.Use(httpserver.NewRequestLogMiddleware(log, tracer, prometheus)) + r.Use(httpserver.NewRecoveryMiddleware(log, serverMetrics, debugMode)) + r.Use(httpserver.NewRequestLogMiddleware(log, tracer, serverMetrics)) r.Use(httpserver.NewTracingMiddleware(tracer)) linkGroup := r.Group("/s") diff --git a/internal/http_server/metrics.go b/internal/http_server/metrics.go new file mode 100644 index 0000000..e024d94 --- /dev/null +++ b/internal/http_server/metrics.go @@ -0,0 +1,49 @@ +package httpserver + +import ( + "backend/internal/integrations" +) + +func NewServerMetrics(p *integrations.Metrics) *ServerMetrics { + errors5xxCounter := p.NewCounter("server_responses_5xx", "5xx responses counter") + errors4xxCounter := p.NewCounter("server_responses_4xx", "4xx responses count") + requestsCounter := p.NewCounter("server_requests_total", "requests counter") + avgReqTimeHist := p.NewHistogram("server_requests_time", "requests time histogram") + panicsHist := p.NewHistogram("server_panics", "panics histogram metric") + + return &ServerMetrics{ + rpsCounter: requestsCounter, + avgReqTimeHist: avgReqTimeHist, + panicsHist: panicsHist, + errors4xxCounter: errors4xxCounter, + errors5xxCounter: errors5xxCounter, + } +} + +type ServerMetrics struct { + rpsCounter integrations.Counter + avgReqTimeHist integrations.Histogram + panicsHist integrations.Histogram + errors4xxCounter integrations.Counter + errors5xxCounter integrations.Counter +} + +func (b *ServerMetrics) AddRequest() { + b.rpsCounter.Inc() +} + +func (b *ServerMetrics) AddRequestTime(reqTime float64) { + b.avgReqTimeHist.Observe(reqTime) +} + +func (b *ServerMetrics) AddPanic() { + b.panicsHist.Observe(1) +} + +func (b *ServerMetrics) Add4xxError() { + b.errors4xxCounter.Inc() +} + +func (b *ServerMetrics) Add5xxError() { + b.errors5xxCounter.Inc() +} diff --git a/internal/http_server/recovery.go b/internal/http_server/recovery.go index 4611dab..6c5a409 100644 --- a/internal/http_server/recovery.go +++ b/internal/http_server/recovery.go @@ -3,7 +3,6 @@ package httpserver // Modified recovery from gin, use own logger import ( - "backend/internal/integrations" "backend/pkg/logger" "bytes" "errors" @@ -30,12 +29,12 @@ var ( slash = []byte("/") ) -func NewRecoveryMiddleware(logger logger.Logger, prometheus *integrations.Prometheus, debugMode bool) gin.HandlerFunc { +func NewRecoveryMiddleware(logger logger.Logger, serverMetrics *ServerMetrics, debugMode bool) gin.HandlerFunc { handle := defaultHandleRecovery return func(c *gin.Context) { defer func() { if err := recover(); err != nil { - prometheus.AddPanic() + serverMetrics.AddPanic() // Check for a broken connection, as it is not really a // condition that warrants a panic stack trace. diff --git a/internal/http_server/request_log.go b/internal/http_server/request_log.go index f3c7e5b..24c9c7a 100644 --- a/internal/http_server/request_log.go +++ b/internal/http_server/request_log.go @@ -1,7 +1,6 @@ package httpserver import ( - "backend/internal/integrations" log "backend/pkg/logger" "fmt" "time" @@ -11,10 +10,13 @@ import ( "go.opentelemetry.io/otel/trace" ) -func NewRequestLogMiddleware(logger log.Logger, tracer trace.Tracer, prometheus *integrations.Prometheus) gin.HandlerFunc { +func NewRequestLogMiddleware( + logger log.Logger, + tracer trace.Tracer, + serverMetrics *ServerMetrics, +) gin.HandlerFunc { return func(c *gin.Context) { - prometheus.RequestInc() - defer prometheus.RequestDec() + serverMetrics.AddRequest() requestId := c.GetHeader("X-Request-Id") if requestId == "" { @@ -34,7 +36,7 @@ func NewRequestLogMiddleware(logger log.Logger, tracer trace.Tracer, prometheus c.Next() latency := time.Since(start) - prometheus.AddRequestTime(float64(latency.Microseconds())) + serverMetrics.AddRequestTime(float64(latency.Microseconds())) method := c.Request.Method statusCode := c.Writer.Status() @@ -49,12 +51,12 @@ func NewRequestLogMiddleware(logger log.Logger, tracer trace.Tracer, prometheus } if statusCode >= 400 && statusCode < 500 { - prometheus.Add4xxError() + serverMetrics.Add4xxError() ctxLogger.Warning().Msg(msg) return } - prometheus.Add5xxError() + serverMetrics.Add5xxError() ctxLogger.Error().Msg(msg) } } diff --git a/internal/integrations/prometheus.go b/internal/integrations/prometheus.go index 9fecfba..31533c6 100644 --- a/internal/integrations/prometheus.go +++ b/internal/integrations/prometheus.go @@ -8,90 +8,67 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -type Prometheus struct { - reg *prometheus.Registry - rpsCounter prometheus.Counter - avgReqTimeHist prometheus.Histogram - panicsHist prometheus.Histogram - errors4xxCounter prometheus.Counter - errors5xxCounter prometheus.Counter +type Counter interface { + Inc() } -func NewPrometheus() *Prometheus { - reg := prometheus.NewRegistry() +type Gauge interface { + Set(float64) + Inc() + Dec() +} - // Add go runtime metrics and process collectors. - reg.MustRegister( +type Histogram interface { + Observe(float64) +} + +type Metrics struct { + registry *prometheus.Registry + registerer prometheus.Registerer +} + +func NewMetrics(prefix string) *Metrics { + registry := prometheus.NewRegistry() + registerer := prometheus.WrapRegistererWithPrefix(prefix, registry) + + registerer.MustRegister( collectors.NewGoCollector(), collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), ) - errors5xxCounter := prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "backend_errors_count_5xx", - Help: "5xx errors count", - }, - ) - errors4xxCounter := prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "backend_errors_count_4xx", - Help: "4xx errors count", - }, - ) - rpsCounter := prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "backend_requests_per_second", - Help: "Requests per second metric", - }, - ) - avgReqTimeHist := prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: "backend_requests_average_time", - Help: "Average time of requests", - }, - ) - panicsHist := prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: "backend_panics", - Help: "Panics histogram metric", - }, - ) - reg.MustRegister(rpsCounter, avgReqTimeHist, panicsHist, errors4xxCounter, errors5xxCounter) - - return &Prometheus{ - panicsHist: panicsHist, - avgReqTimeHist: avgReqTimeHist, - rpsCounter: rpsCounter, - errors4xxCounter: errors4xxCounter, - errors5xxCounter: errors5xxCounter, - reg: reg, + return &Metrics{ + registry: registry, + registerer: registerer, } } -func (p *Prometheus) GetRequestHandler() http.Handler { - return promhttp.HandlerFor(p.reg, promhttp.HandlerOpts{Registry: p.reg}) +func (m *Metrics) NewCounter(name, description string) Counter { + collector := prometheus.NewCounter(prometheus.CounterOpts{ + Name: name, + Help: description, + }) + m.registerer.MustRegister(collector) + return collector } -func (p *Prometheus) RequestInc() { - p.rpsCounter.Inc() +func (m *Metrics) NewGauge(name, description string) Gauge { + collector := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: name, + Help: description, + }) + m.registerer.MustRegister(collector) + return collector } -func (p *Prometheus) RequestDec() { - // p.rpsGauge.Dec() +func (m *Metrics) NewHistogram(name, description string) Histogram { + collector := prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: name, + Help: description, + }) + m.registerer.MustRegister(collector) + return collector } -func (p *Prometheus) AddRequestTime(reqTime float64) { - p.avgReqTimeHist.Observe(reqTime) -} - -func (p *Prometheus) AddPanic() { - p.panicsHist.Observe(1) -} - -func (p *Prometheus) Add4xxError() { - p.errors4xxCounter.Inc() -} - -func (p *Prometheus) Add5xxError() { - p.errors5xxCounter.Inc() +func (m *Metrics) HttpHandler() http.Handler { + return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{Registry: m.registerer}) }