Merge branch 'master' into feature/email-verification

This commit is contained in:
Sergey Chubaryan 2025-02-15 09:51:19 +03:00
commit a56b0eed56
29 changed files with 400 additions and 597 deletions

View File

@ -186,7 +186,7 @@ func (a *App) Run(p RunParams) {
}() }()
} }
srv := server.New( srv := server.NewServer(
server.NewServerOpts{ server.NewServerOpts{
DebugMode: debugMode, DebugMode: debugMode,
Logger: logger, Logger: logger,

View File

@ -1,5 +1,5 @@
port: 8080 port: 8080
postgres_url: "postgres://postgres:postgres@localhost:5432/postgres" postgres_url: "postgres://postgres:postgres@localhost:5432/postgres"
jwt_signing_key: "./jwt_signing_key" jwt_signing_key: "./config_defaults/jwt_signing_key"
kafka_url: "localhost:9092" kafka_url: "localhost:9092"
kafka_topic: "backend_events" kafka_topic: "backend_events"

View File

@ -2,16 +2,17 @@ package handlers
import ( import (
"backend/internal/core/services" "backend/internal/core/services"
httpserver "backend/internal/http_server"
"backend/pkg/logger" "backend/pkg/logger"
"encoding/json" "context"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
type createUserInput struct { type createUserInput struct {
Email string `json:"email"` Email string `json:"email" validate:"required,email"`
Password string `json:"password"` Password string `json:"password" validate:"required"`
Name string `json:"name"` Name string `json:"name" validate:"required"`
} }
type createUserOutput struct { type createUserOutput struct {
@ -20,54 +21,27 @@ type createUserOutput struct {
Name string `json:"name"` Name string `json:"name"`
} }
func NewUserCreateHandler(logger logger.Logger, userService services.UserService) gin.HandlerFunc { func NewUserCreateHandler(log logger.Logger, userService services.UserService) gin.HandlerFunc {
return func(c *gin.Context) { return httpserver.WrapGin(log,
ctxLogger := logger.WithContext(c) func(ctx context.Context, input createUserInput) (createUserOutput, error) {
user, err := userService.CreateUser(ctx,
params := createUserInput{}
if err := c.ShouldBindJSON(&params); err != nil {
ctxLogger.Error().Err(err).Msg("bad input body model")
c.Data(400, "plain/text", []byte(err.Error()))
return
}
dto, err := userService.CreateUser(
c,
services.UserCreateParams{ services.UserCreateParams{
Email: params.Email, Email: input.Email,
Password: params.Password, Password: input.Password,
Name: params.Name, Name: input.Name,
}, },
) )
if err == services.ErrUserExists {
ctxLogger.Error().Err(err).Msg("user already exists") out := createUserOutput{}
c.Data(400, "plain/text", []byte(err.Error()))
return
}
if err == services.ErrUserBadPassword {
ctxLogger.Error().Err(err).Msg("password does not satisfy requirements")
c.Data(400, "plain/text", []byte(err.Error()))
return
}
if err != nil { if err != nil {
ctxLogger.Error().Err(err).Msg("unexpected create user error") return out, err
c.Data(500, "plain/text", []byte(err.Error()))
return
} }
resultBody, err := json.Marshal( return createUserOutput{
createUserOutput{ Id: user.Id,
Id: dto.Id, Email: user.Email,
Email: dto.Email, Name: user.Name,
Name: dto.Name, }, nil
}, },
) )
if err != nil {
ctxLogger.Error().Err(err).Msg("marshal user model error")
c.Data(500, "plain/text", []byte(err.Error()))
return
}
c.Data(200, "application/json", resultBody)
}
} }

View File

@ -2,14 +2,15 @@ package handlers
import ( import (
"backend/internal/core/services" "backend/internal/core/services"
httpserver "backend/internal/http_server"
"backend/pkg/logger" "backend/pkg/logger"
"encoding/json" "context"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
type loginUserInput struct { type loginUserInput struct {
Login string `json:"email"` Login string `json:"email" validate:"required,email"`
Password string `json:"password"` Password string `json:"password"`
} }
@ -17,43 +18,17 @@ type loginUserOutput struct {
Token string `json:"token"` Token string `json:"token"`
} }
func NewUserLoginHandler(logger logger.Logger, userService services.UserService) gin.HandlerFunc { func NewUserLoginHandler(log logger.Logger, userService services.UserService) gin.HandlerFunc {
return func(c *gin.Context) { return httpserver.WrapGin(log,
ctxLogger := logger.WithContext(c).WithPrefix("NewUserLoginHandler") func(ctx context.Context, input loginUserInput) (loginUserOutput, error) {
token, err := userService.AuthenticateUser(ctx, input.Login, input.Password)
params := loginUserInput{}
if err := c.ShouldBindJSON(&params); err != nil {
ctxLogger.Error().Err(err).Msg("bad input body model")
c.AbortWithError(400, err)
return
}
token, err := userService.AuthenticateUser(c, params.Login, params.Password)
if err == services.ErrUserNotExists {
ctxLogger.Error().Err(err).Msg("user does not exist")
c.AbortWithError(400, err)
return
}
if err == services.ErrUserWrongPassword {
ctxLogger.Error().Err(err).Msg("wrong password")
c.AbortWithError(400, err)
return
}
if err != nil { if err != nil {
ctxLogger.Error().Err(err).Msg("AuthenticateUser internal error") return loginUserOutput{}, err
c.AbortWithError(500, err)
return
} }
resultBody, err := json.Marshal(loginUserOutput{ return loginUserOutput{
Token: token, Token: token,
}) }, nil
if err != nil { },
ctxLogger.Error().Err(err).Msg("marshal json internal error") )
c.AbortWithError(500, err)
return
}
c.Data(200, "application/json", resultBody)
}
} }

View File

@ -1,56 +0,0 @@
package middleware
import (
"backend/internal/integrations"
log "backend/pkg/logger"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
)
func NewRequestLogMiddleware(logger log.Logger, tracer trace.Tracer, prometheus *integrations.Prometheus) gin.HandlerFunc {
return func(c *gin.Context) {
prometheus.RequestInc()
defer prometheus.RequestDec()
requestId := c.GetHeader("X-Request-Id")
if requestId == "" {
requestId = uuid.New().String()
}
c.Header("X-Request-Id", requestId)
c.Header("Access-Control-Allow-Origin", "*")
log.SetCtxRequestId(c, requestId)
path := c.Request.URL.Path
if c.Request.URL.RawQuery != "" {
path = path + "?" + c.Request.URL.RawQuery
}
start := time.Now()
c.Next()
latency := time.Since(start)
prometheus.AddRequestTime(float64(latency.Microseconds()))
method := c.Request.Method
statusCode := c.Writer.Status()
if statusCode >= 200 && statusCode < 400 {
return
}
ctxLogger := logger.WithContext(c)
if statusCode >= 400 && statusCode < 500 {
prometheus.Add4xxError()
ctxLogger.Warning().Msgf("Request %s %s %d %v", method, path, statusCode, latency)
return
}
prometheus.Add5xxError()
ctxLogger.Error().Msgf("Request %s %s %d %v", method, path, statusCode, latency)
}
}

View File

@ -5,21 +5,14 @@ import (
"backend/cmd/backend/server/middleware" "backend/cmd/backend/server/middleware"
"backend/cmd/backend/server/utils" "backend/cmd/backend/server/utils"
"backend/internal/core/services" "backend/internal/core/services"
httpserver "backend/internal/http_server"
"backend/internal/integrations" "backend/internal/integrations"
"backend/pkg/logger" "backend/pkg/logger"
"context"
"fmt"
"net"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
type Server struct {
logger logger.Logger
ginEngine *gin.Engine
}
type NewServerOpts struct { type NewServerOpts struct {
DebugMode bool DebugMode bool
Logger logger.Logger Logger logger.Logger
@ -28,7 +21,7 @@ type NewServerOpts struct {
Tracer trace.Tracer Tracer trace.Tracer
} }
func New(opts NewServerOpts) *Server { func NewServer(opts NewServerOpts) *httpserver.Server {
if !opts.DebugMode { if !opts.DebugMode {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
} }
@ -36,21 +29,25 @@ func New(opts NewServerOpts) *Server {
r := gin.New() r := gin.New()
r.ContextWithFallback = true // Use it to allow getting values from c.Request.Context() r.ContextWithFallback = true // Use it to allow getting values from c.Request.Context()
r.Static("/webapp", "./webapp") // r.Static("/webapp", "./webapp")
r.GET("/health", handlers.NewDummyHandler()) r.GET("/health", handlers.NewDummyHandler())
prometheus := integrations.NewPrometheus() prometheus := integrations.NewPrometheus()
r.Any("/metrics", gin.WrapH(prometheus.GetRequestHandler())) r.Any("/metrics", gin.WrapH(prometheus.GetRequestHandler()))
r.Use(middleware.NewRecoveryMiddleware(opts.Logger, prometheus, opts.DebugMode)) r.Use(httpserver.NewRecoveryMiddleware(opts.Logger, prometheus, opts.DebugMode))
r.Use(middleware.NewRequestLogMiddleware(opts.Logger, opts.Tracer, prometheus)) r.Use(httpserver.NewRequestLogMiddleware(opts.Logger, opts.Tracer, prometheus))
r.Use(middleware.NewTracingMiddleware(opts.Tracer)) r.Use(httpserver.NewTracingMiddleware(opts.Tracer))
userGroup := r.Group("/user") v1 := r.Group("/v1")
userGroup := v1.Group("/user")
{
userGroup.POST("/create", handlers.NewUserCreateHandler(opts.Logger, opts.UserService)) userGroup.POST("/create", handlers.NewUserCreateHandler(opts.Logger, opts.UserService))
userGroup.POST("/login", handlers.NewUserLoginHandler(opts.Logger, opts.UserService)) userGroup.POST("/login", handlers.NewUserLoginHandler(opts.Logger, opts.UserService))
}
dummyGroup := r.Group("/dummy") dummyGroup := v1.Group("/dummy")
{ {
dummyGroup.Use(middleware.NewAuthMiddleware(opts.UserService)) dummyGroup.Use(middleware.NewAuthMiddleware(opts.UserService))
dummyGroup.GET("", handlers.NewDummyHandler()) dummyGroup.GET("", handlers.NewDummyHandler())
@ -60,29 +57,10 @@ func New(opts NewServerOpts) *Server {
}) })
} }
return &Server{ return httpserver.New(
logger: opts.Logger, httpserver.NewServerOpts{
ginEngine: r, Logger: opts.Logger,
} HttpServer: r,
} },
)
func (s *Server) Run(ctx context.Context, port uint16) {
listenAddr := fmt.Sprintf("0.0.0.0:%d", port)
s.logger.Log().Msgf("server listening on %s", listenAddr)
listener, err := (&net.ListenConfig{}).Listen(ctx, "tcp", listenAddr)
if err != nil {
s.logger.Fatal().Err(err).Msg("can not create network listener")
}
go func() {
<-ctx.Done()
s.logger.Log().Msg("stopping tcp listener...")
listener.Close()
}()
err = s.ginEngine.RunListener(listener)
if err != nil && err == net.ErrClosed {
s.logger.Fatal().Err(err).Msg("server stopped with error")
}
} }

31
cmd/shortlinks/grpc.go Normal file
View File

@ -0,0 +1,31 @@
package main
import (
"backend/internal/core/services"
"backend/internal/grpc_server/shortlinks"
httpserver "backend/internal/http_server"
"backend/pkg/logger"
"context"
)
func NewShortlinksGrpc(log logger.Logger, shortlinkService services.ShortlinkService, host string) *ShortlinksGrpc {
return &ShortlinksGrpc{
handler: NewCreateHandler(log, shortlinkService, host),
}
}
type ShortlinksGrpc struct {
shortlinks.UnimplementedShortlinksServer
handler httpserver.Handler[shortlinkCreateInput, shortlinkCreateOutput]
}
func (s *ShortlinksGrpc) Create(ctx context.Context, req *shortlinks.CreateRequest) (*shortlinks.CreateResponse, error) {
output, err := s.handler(ctx, shortlinkCreateInput{req.Url})
if err != nil {
return nil, err
}
return &shortlinks.CreateResponse{
Link: output.Link,
}, nil
}

View File

@ -2,91 +2,50 @@ package main
import ( import (
"backend/internal/core/services" "backend/internal/core/services"
"backend/internal/grpc_server/shortlinks" httpserver "backend/internal/http_server"
"backend/pkg/logger" "backend/pkg/logger"
"context" "context"
"encoding/json"
"fmt" "fmt"
"net/url" "net/url"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
type shortlinkCreateInput struct {
Url string `json:"url"`
}
type shortlinkCreateOutput struct { type shortlinkCreateOutput struct {
Link string `json:"link"` Link string `json:"link"`
} }
type ShortlinksGrpc struct { func NewCreateHandler(
shortlinks.UnimplementedShortlinksServer log logger.Logger,
log logger.Logger shortlinkService services.ShortlinkService,
host string host string,
shortlinkService services.ShortlinkService ) httpserver.Handler[shortlinkCreateInput, shortlinkCreateOutput] {
} return func(ctx context.Context, input shortlinkCreateInput) (shortlinkCreateOutput, error) {
output := shortlinkCreateOutput{}
func (s *ShortlinksGrpc) Create(ctx context.Context, req *shortlinks.CreateRequest) (*shortlinks.CreateResponse, error) { u, err := url.Parse(input.Url)
ctxLogger := s.log.WithContext(ctx)
rawUrl := req.GetUrl()
if rawUrl == "" {
ctxLogger.Error().Msg("url query param missing")
return nil, fmt.Errorf("url query param missing")
}
u, err := url.Parse(rawUrl)
if err != nil { if err != nil {
ctxLogger.Error().Err(err).Msg("error parsing url param") return output, err
return nil, err
}
u.Scheme = "https"
linkId, err := s.shortlinkService.CreateShortlink(ctx, u.String())
if err != nil {
ctxLogger.Error().Err(err).Msg("err creating shortlink")
return nil, err
}
return &shortlinks.CreateResponse{
Link: fmt.Sprintf("%s/s/%s", s.host, linkId),
}, nil
}
func NewShortlinkCreateHandler(logger logger.Logger, shortlinkService services.ShortlinkService, host string) gin.HandlerFunc {
return func(ctx *gin.Context) {
ctxLogger := logger.WithContext(ctx)
rawUrl := ctx.Query("url")
if rawUrl == "" {
ctxLogger.Error().Msg("url query param missing")
ctx.AbortWithError(400, fmt.Errorf("url query param missing"))
return
}
u, err := url.Parse(rawUrl)
if err != nil {
ctxLogger.Error().Err(err).Msg("error parsing url param")
ctx.Data(400, "plain/text", []byte(err.Error()))
return
} }
u.Scheme = "https" u.Scheme = "https"
linkId, err := shortlinkService.CreateShortlink(ctx, u.String()) linkId, err := shortlinkService.CreateShortlink(ctx, u.String())
if err != nil { if err != nil {
ctxLogger.Error().Err(err).Msg("err creating shortlink") return output, err
ctx.Data(500, "plain/text", []byte(err.Error()))
return
} }
resultBody, err := json.Marshal(shortlinkCreateOutput{ return shortlinkCreateOutput{
Link: fmt.Sprintf("%s/s/%s", host, linkId), Link: fmt.Sprintf("%s/s/%s", host, linkId),
}) }, nil
if err != nil {
ctxLogger.Error().Err(err).Msg("err marshalling shortlink")
ctx.AbortWithError(500, err)
return
} }
}
ctx.Data(200, "application/json", resultBody) func NewShortlinkCreateHandler(log logger.Logger, shortlinkService services.ShortlinkService, host string) gin.HandlerFunc {
} return httpserver.WrapGin(log, NewCreateHandler(log, shortlinkService, host))
} }
func NewShortlinkResolveHandler(logger logger.Logger, shortlinkService services.ShortlinkService) gin.HandlerFunc { func NewShortlinkResolveHandler(logger logger.Logger, shortlinkService services.ShortlinkService) gin.HandlerFunc {

View File

@ -6,7 +6,6 @@ import (
grpcserver "backend/internal/grpc_server" grpcserver "backend/internal/grpc_server"
"backend/internal/grpc_server/shortlinks" "backend/internal/grpc_server/shortlinks"
httpserver "backend/internal/http_server" httpserver "backend/internal/http_server"
"backend/internal/http_server/middleware"
"backend/internal/integrations" "backend/internal/integrations"
"backend/pkg/cache" "backend/pkg/cache"
"backend/pkg/logger" "backend/pkg/logger"
@ -94,22 +93,19 @@ func RunServer(ctx context.Context, log logger.Logger, tracer trace.Tracer, conf
ctx.Status(200) ctx.Status(200)
}) })
r.Use(middleware.NewRecoveryMiddleware(log, prometheus, debugMode)) r.Use(httpserver.NewRecoveryMiddleware(log, prometheus, debugMode))
r.Use(middleware.NewRequestLogMiddleware(log, tracer, prometheus)) r.Use(httpserver.NewRequestLogMiddleware(log, tracer, prometheus))
r.Use(middleware.NewTracingMiddleware(tracer)) r.Use(httpserver.NewTracingMiddleware(tracer))
linkGroup := r.Group("/s") linkGroup := r.Group("/s")
linkGroup.POST("/new", NewShortlinkCreateHandler(log, shortlinkService, host)) linkGroup.POST("/new", NewShortlinkCreateHandler(log, shortlinkService, host))
linkGroup.GET("/:linkId", NewShortlinkResolveHandler(log, shortlinkService)) linkGroup.GET("/:linkId", NewShortlinkResolveHandler(log, shortlinkService))
grpcObj := &ShortlinksGrpc{
log: log,
host: host,
shortlinkService: shortlinkService,
}
grpcUnderlying := grpc.NewServer() grpcUnderlying := grpc.NewServer()
shortlinks.RegisterShortlinksServer(grpcUnderlying, grpcObj) shortlinks.RegisterShortlinksServer(
grpcUnderlying,
NewShortlinksGrpc(log, shortlinkService, host),
)
httpServer := httpserver.New( httpServer := httpserver.New(
httpserver.NewServerOpts{ httpserver.NewServerOpts{

View File

@ -34,6 +34,6 @@ scrape_configs:
- job_name: 'machine' - job_name: 'machine'
scrape_interval: 2s scrape_interval: 2s
static_configs: static_configs:
- targets: ['host.docker.internal:9100'] - targets: ['node_exporter:9100']
labels: labels:
group: 'backend' group: 'backend'

View File

@ -54,17 +54,22 @@ services:
- prometheus-volume:/etc/prometheus - prometheus-volume:/etc/prometheus
- ./deploy/prometheus.yml:/etc/prometheus/prometheus.yml - ./deploy/prometheus.yml:/etc/prometheus/prometheus.yml
# node_exporter: node_exporter:
# image: quay.io/prometheus/node-exporter:latest image: quay.io/prometheus/node-exporter:latest
# command: pid: host
# - '--path.rootfs=/host' command:
# ports: - '--path.procfs=/host/proc'
# - 9100:9100 - '--path.rootfs=/rootfs'
# extra_hosts: - '--path.sysfs=/host/sys'
# - "host.docker.internal:host-gateway" - '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)'
# pid: host volumes:
# volumes: - /proc:/host/proc:ro
# - '/:/host:ro,rslave' - /sys:/host/sys:ro
- /:/rootfs:ro
extra_hosts:
- "host.docker.internal:host-gateway"
ports:
- 9100:9100
otel-collector: otel-collector:
image: otel/opentelemetry-collector-contrib:0.108.0 image: otel/opentelemetry-collector-contrib:0.108.0
@ -97,12 +102,17 @@ services:
- tempo-init - tempo-init
kafka: kafka:
image: apache/kafka:3.8.0 image: &kafkaImage apache/kafka:3.8.0
healthcheck:
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server http://127.0.0.1:9092 || exit 1"]
interval: 1s
timeout: 30s
retries: 30
environment: environment:
KAFKA_NODE_ID: 1 KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
@ -113,18 +123,48 @@ services:
KAFKA_NUM_PARTITIONS: 3 KAFKA_NUM_PARTITIONS: 3
ports: ports:
- 9092:9092 - 9092:9092
# - 9093:9093
# backend: kafka-init:
# build: . image: *kafkaImage
# # dockerfile: ./dockerfile depends_on:
# volumes: kafka:
# - ./:/app condition: service_healthy
# ports: entrypoint: >
# - 8080:8080 /bin/bash -c "/opt/kafka/bin/kafka-topics.sh --bootstrap-server http://kafka:9092 --create --topic events --partitions 6"
minio:
image: quay.io/minio/minio:latest
command: ["server", "/data", "--console-address", ":9001"]
healthcheck:
test: 'mc ready local'
interval: 1s
environment:
MINIO_ROOT_USER: miniouser
MINIO_ROOT_PASSWORD: miniouser
MINIO_ACCESS_KEY: miniokey
MINIO_SECRET_KEY: miniokey
ports:
- 9000:9000
- 9001:9001
volumes:
- minio-volume:/data
minio-init:
image: quay.io/minio/mc:latest
depends_on:
- minio
entrypoint: >
/bin/sh -c "
/usr/bin/mc alias set myminio http://minio:9000 miniouser miniouser;
/usr/bin/mc mb minio/bucket;
/usr/bin/mc anonymous set public minio/bucket;
exit 0;
"
volumes: volumes:
postgres-volume: postgres-volume:
grafana-volume: grafana-volume:
tempo-volume: tempo-volume:
prometheus-volume: prometheus-volume:
minio-volume:

View File

@ -1,155 +0,0 @@
package middleware
// Modified recovery from gin, use own logger
import (
"backend/internal/integrations"
"backend/pkg/logger"
"bytes"
"errors"
"fmt"
"net"
"net/http"
"net/http/httputil"
"os"
"runtime"
"strings"
"time"
"github.com/gin-gonic/gin"
)
const (
reset = "\033[0m"
)
var (
dunno = []byte("???")
centerDot = []byte("·")
dot = []byte(".")
slash = []byte("/")
)
func NewRecoveryMiddleware(logger logger.Logger, prometheus *integrations.Prometheus, debugMode bool) gin.HandlerFunc {
handle := defaultHandleRecovery
return func(c *gin.Context) {
defer func() {
if err := recover(); err != nil {
prometheus.AddPanic()
// Check for a broken connection, as it is not really a
// condition that warrants a panic stack trace.
var brokenPipe bool
if ne, ok := err.(*net.OpError); ok {
var se *os.SyscallError
if errors.As(ne, &se) {
seStr := strings.ToLower(se.Error())
if strings.Contains(seStr, "broken pipe") ||
strings.Contains(seStr, "connection reset by peer") {
brokenPipe = true
}
}
}
if logger != nil {
stack := stack(3)
httpRequest, _ := httputil.DumpRequest(c.Request, false)
headers := strings.Split(string(httpRequest), "\r\n")
for idx, header := range headers {
current := strings.Split(header, ":")
if current[0] == "Authorization" {
headers[idx] = current[0] + ": *"
}
}
headersToStr := strings.Join(headers, "\r\n")
if brokenPipe {
logger.Printf("%s\n%s%s", err, headersToStr, reset)
} else if debugMode {
logger.Printf("[Recovery] %s panic recovered:\n%s\n%s\n%s%s",
timeFormat(time.Now()), headersToStr, err, stack, reset)
} else {
logger.Printf("[Recovery] %s panic recovered:\n%s\n%s%s",
timeFormat(time.Now()), err, stack, reset)
}
}
if brokenPipe {
// If the connection is dead, we can't write a status to it.
c.Error(err.(error)) //nolint: errcheck
c.Abort()
} else {
handle(c, err)
}
}
}()
c.Next()
}
}
func defaultHandleRecovery(c *gin.Context, _ any) {
c.AbortWithStatus(http.StatusInternalServerError)
}
// stack returns a nicely formatted stack frame, skipping skip frames.
func stack(skip int) []byte {
buf := new(bytes.Buffer) // the returned data
// As we loop, we open files and read them. These variables record the currently
// loaded file.
var lines [][]byte
var lastFile string
for i := skip; ; i++ { // Skip the expected number of frames
pc, file, line, ok := runtime.Caller(i)
if !ok {
break
}
// Print this much at least. If we can't find the source, it won't show.
fmt.Fprintf(buf, "%s:%d (0x%x)\n", file, line, pc)
if file != lastFile {
data, err := os.ReadFile(file)
if err != nil {
continue
}
lines = bytes.Split(data, []byte{'\n'})
lastFile = file
}
fmt.Fprintf(buf, "\t%s: %s\n", function(pc), source(lines, line))
}
return buf.Bytes()
}
// source returns a space-trimmed slice of the n'th line.
func source(lines [][]byte, n int) []byte {
n-- // in stack trace, lines are 1-indexed but our array is 0-indexed
if n < 0 || n >= len(lines) {
return dunno
}
return bytes.TrimSpace(lines[n])
}
// function returns, if possible, the name of the function containing the PC.
func function(pc uintptr) []byte {
fn := runtime.FuncForPC(pc)
if fn == nil {
return dunno
}
name := []byte(fn.Name())
// The name includes the path name to the package, which is unnecessary
// since the file name is already included. Plus, it has center dots.
// That is, we see
// runtime/debug.*T·ptrmethod
// and want
// *T.ptrmethod
// Also the package path might contain dot (e.g. code.google.com/...),
// so first eliminate the path prefix
if lastSlash := bytes.LastIndex(name, slash); lastSlash >= 0 {
name = name[lastSlash+1:]
}
if period := bytes.Index(name, dot); period >= 0 {
name = name[period+1:]
}
name = bytes.ReplaceAll(name, centerDot, dot)
return name
}
// timeFormat returns a customized time string for logger.
func timeFormat(t time.Time) string {
return t.Format("2006/01/02 - 15:04:05")
}

View File

@ -1,33 +0,0 @@
package middleware
import (
"fmt"
"github.com/gin-gonic/gin"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
func NewTracingMiddleware(tracer trace.Tracer) gin.HandlerFunc {
prop := otel.GetTextMapPropagator()
return func(c *gin.Context) {
savedCtx := c.Request.Context()
defer func() {
c.Request = c.Request.WithContext(savedCtx)
}()
ctx := prop.Extract(savedCtx, propagation.HeaderCarrier(c.Request.Header))
ctx, span := tracer.Start(ctx, fmt.Sprintf("%s %s", c.Request.Method, c.Request.URL.Path))
defer span.End()
traceId := span.SpanContext().TraceID()
c.Header("X-Trace-Id", traceId.String())
c.Request = c.Request.WithContext(ctx)
c.Next()
}
}

View File

@ -1,4 +1,4 @@
package middleware package httpserver
// Modified recovery from gin, use own logger // Modified recovery from gin, use own logger

View File

@ -1,4 +1,4 @@
package middleware package httpserver
import ( import (
"backend/internal/integrations" "backend/internal/integrations"

View File

@ -1,4 +1,4 @@
package middleware package httpserver
import ( import (
"fmt" "fmt"

View File

@ -0,0 +1,79 @@
package httpserver
import (
"backend/pkg/logger"
"context"
"encoding/json"
"github.com/gin-gonic/gin"
)
type Handler[Input, Output interface{}] func(ctx context.Context, input Input) (Output, error)
type ResponseOk struct {
Status string `json:"status"`
Result interface{} `json:"result"`
}
type ResponseError struct {
Status string `json:"status"`
Error struct {
Id string `json:"id"`
Message string `json:"message"`
} `json:"error"`
}
func WrapGin[In, Out interface{}](log logger.Logger, handler Handler[In, Out]) gin.HandlerFunc {
return func(c *gin.Context) {
log := log.WithContext(c)
var input In
if err := c.ShouldBindJSON(&input); err != nil {
response := ResponseError{
Status: "error",
Error: struct {
Id string `json:"id"`
Message string `json:"message"`
}{
Id: "WrongBody",
Message: err.Error(),
},
}
body, _ := json.Marshal(response)
c.Data(400, "application/json", body)
return
}
var response interface{}
output, err := handler(c, input)
if err != nil {
log.Error().Err(err).Msg("error in request handler")
response = ResponseError{
Status: "error",
Error: struct {
Id string `json:"id"`
Message string `json:"message"`
}{
Id: "-",
Message: err.Error(),
},
}
} else {
response = ResponseOk{
Status: "success",
Result: output,
}
}
body, err := json.Marshal(response)
if err != nil {
log.Error().Err(err).Msg("marshal response error")
c.Data(500, "plain/text", []byte(err.Error()))
return
}
c.Data(200, "application/json", body)
}
}

View File

@ -16,7 +16,7 @@ func NewKafka(addr, topic string) *Kafka {
Addr: kafka.TCP(addr), Addr: kafka.TCP(addr),
Topic: topic, Topic: topic,
Balancer: &kafka.RoundRobin{}, Balancer: &kafka.RoundRobin{},
AllowAutoTopicCreation: true, AllowAutoTopicCreation: false,
BatchSize: 100, BatchSize: 100,
BatchTimeout: 100 * time.Millisecond, BatchTimeout: 100 * time.Millisecond,
} }

View File

@ -1,84 +0,0 @@
import random
import string
from locust import HttpUser, FastHttpUser
class Auth():
token: string
def __init__(self, token):
self.token = token
class User():
email: string
name: string
password: string
def __init__(self, email, password, name, token = ""):
self.email = email
self.password = password
self.name = name
self.token = token
class BackendApi():
http: FastHttpUser
def __init__(self, http: FastHttpUser):
self.http = http
def user_create(self) -> User:
email = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) + '@test.test'
name = ''.join(random.choices(string.ascii_letters, k=10))
password = 'Abcdef1!!1'
response = self.http.client.post(
"/user/create",
json={
"email": email,
"password": password,
"name": name,
},
)
if response.status_code != 200:
raise AssertionError('can not create user')
return User(email, password, name)
def user_login(self, user: User) -> Auth:
response = self.http.client.post(
"/user/login",
json={
"email": user.email,
"password": user.password,
},
)
if response.status_code != 200:
raise AssertionError('can not login user')
token = response.json()['token']
if token == '':
raise AssertionError('empty user token')
return Auth(token)
def dummy_get(self, auth: Auth):
headers = {"X-Auth": auth.token}
response = self.http.client.get("/dummy", headers=headers)
if response.status_code != 200:
raise AssertionError('something wrong')
def health_get(self):
response = self.http.client.get("/health")
if response.status_code != 200:
raise AssertionError('something wrong')
def shortlink_create(self, url: string) -> string:
response = self.http.client.post("/s/new?url=" + url)
if response.status_code != 200:
raise AssertionError('can not login user')
link = response.json()['link']
if link == '':
raise AssertionError('empty user token')
return link

88
tests/api.py Normal file
View File

@ -0,0 +1,88 @@
import random
import string
import requests
class Requests():
def __init__(self, baseUrl):
self.baseUrl = baseUrl
def post(self, path, json = {}):
return requests.post(self.baseUrl + path, json=json)
class Auth():
token: string
def __init__(self, token):
self.token = token
class User():
id: string
email: string
name: string
password: string
def __init__(self, email, password, name, id="", token = ""):
self.email = email
self.password = password
self.name = name
self.token = token
class BackendApi():
def __init__(self, httpClient):
self.httpClient = httpClient
def parse_response(self, response):
if response.status != 200:
raise AssertionError('something wrong')
json = response.json()
if json['status'] == 'success':
if 'result' in json:
return json['result']
return None
error = json['error']
raise AssertionError(error['id'], error['message'])
def user_create(self, user: User | None) -> User:
if user == None:
email = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) + '@test.test'
name = ''.join(random.choices(string.ascii_letters, k=10))
password = 'Abcdef1!!1'
user = User(email, password, name)
res = self.parse_response(
self.httpClient.post(
"/v1/user/create", json={
"email": user.email,
"password": user.password,
"name": user.name,
}
)
)
return User(res['email'], res['password'], res['name'], res['id'])
def user_login(self, user: User) -> Auth:
res = self.parse_response(
self.httpClient.post(
"/v1/user/login", json={
"email": user.email+"a",
"password": user.password,
},
)
)
return Auth(res['status'])
def dummy_get(self, auth: Auth):
headers = {"X-Auth": auth.token}
response = self.httpClient.get("/v1/dummy", headers=headers)
if response.status_code != 200:
raise AssertionError('something wrong')
def health_get(self):
response = self.httpClient.get("/health")
if response.status_code != 200:
raise AssertionError('something wrong')

View File

@ -0,0 +1,14 @@
from api import BackendApi, Requests
import requests
backendUrl = "http://localhost:8080"
class TestUser:
def test_create_user(self):
api = BackendApi(Requests(backendUrl))
api.user_create()
def test_login_user(self):
api = BackendApi(Requests(backendUrl))
user = api.user_create()
api.user_login(user)

View File

@ -15,7 +15,10 @@ install:
requirements: requirements:
pip freeze > requirements.txt pip freeze > requirements.txt
run-web: run-integration:
locust -f tests,loads --class-picker --host http://localhost:8080 --processes 16 python3 -m pytest integration/
run-performance-web:
locust -f performance --class-picker --host http://localhost:8080 --processes 16

View File

@ -3,15 +3,13 @@ from locust import FastHttpUser, task
from api import BackendApi, Auth from api import BackendApi, Auth
class DummyGet(FastHttpUser): class DummyGet(FastHttpUser):
api: BackendApi def on_start(self):
auth: Auth self.api = BackendApi(self.client)
user = self.api.user_create()
self.auth = self.api.user_login(user)
@task @task
def dummy_test(self): def dummy_test(self):
self.api.dummy_get(self.auth) self.api.dummy_get(self.auth)
def on_start(self):
self.api = BackendApi(self)
user = self.api.user_create()
self.auth = self.api.user_login(user)

View File

@ -3,11 +3,9 @@ from locust import FastHttpUser, task
from api import BackendApi from api import BackendApi
class HealthGet(FastHttpUser): class HealthGet(FastHttpUser):
api: BackendApi def on_start(self):
self.api = BackendApi(self.client)
@task @task
def user_create_test(self): def user_create_test(self):
self.api.health_get() self.api.health_get()
def on_start(self):
self.api = BackendApi(self)

View File

@ -1,9 +1,9 @@
from locust import LoadTestShape from locust import LoadTestShape
class LowLoad(LoadTestShape): class LowLoad(LoadTestShape):
time_limit = 600 time_limit = 60
spawn_rate = 5 spawn_rate = 2
max_users = 100 max_users = 10
def tick(self) -> (tuple[float, int] | None): def tick(self) -> (tuple[float, int] | None):
user_count = self.spawn_rate * self.get_run_time() user_count = self.spawn_rate * self.get_run_time()

View File

@ -3,11 +3,9 @@ from locust import FastHttpUser, task
from api import BackendApi from api import BackendApi
class ShortlinkCreate(FastHttpUser): class ShortlinkCreate(FastHttpUser):
api: BackendApi def on_start(self):
self.api = BackendApi(self.client)
@task @task
def user_create_test(self): def user_create_test(self):
self.api.shortlink_create("https://ya.ru") self.api.shortlink_create("https://example.com")
def on_start(self):
self.api = BackendApi(self)

View File

@ -3,23 +3,18 @@ from locust import FastHttpUser, task
from api import BackendApi, User from api import BackendApi, User
class UserCreate(FastHttpUser): class UserCreate(FastHttpUser):
api: BackendApi def on_start(self):
self.api = BackendApi(self.client)
@task @task
def user_create_test(self): def user_create_test(self):
self.api.user_create() self.api.user_create()
class UserLogin(FastHttpUser):
def on_start(self): def on_start(self):
self.api = BackendApi(self) self.api = BackendApi(self)
self.user = self.api.user_create()
class UserLogin(FastHttpUser):
api: BackendApi
user: User
@task @task
def user_create_test(self): def user_create_test(self):
self.api.user_login(self.user) self.api.user_login(self.user)
def on_start(self):
self.api = BackendApi(self)
self.user = self.api.user_create()

View File

@ -4,6 +4,7 @@ certifi==2024.7.4
charset-normalizer==3.3.2 charset-normalizer==3.3.2
click==8.1.7 click==8.1.7
ConfigArgParse==1.7 ConfigArgParse==1.7
exceptiongroup==1.2.2
Flask==3.0.3 Flask==3.0.3
Flask-Cors==4.0.1 Flask-Cors==4.0.1
Flask-Login==0.6.3 Flask-Login==0.6.3
@ -11,12 +12,16 @@ gevent==24.2.1
geventhttpclient==2.3.1 geventhttpclient==2.3.1
greenlet==3.0.3 greenlet==3.0.3
idna==3.7 idna==3.7
iniconfig==2.0.0
itsdangerous==2.2.0 itsdangerous==2.2.0
Jinja2==3.1.4 Jinja2==3.1.4
locust==2.31.3 locust==2.31.3
MarkupSafe==2.1.5 MarkupSafe==2.1.5
msgpack==1.0.8 msgpack==1.0.8
packaging==24.2
pluggy==1.5.0
psutil==6.0.0 psutil==6.0.0
pytest==8.3.4
pyzmq==26.1.0 pyzmq==26.1.0
requests==2.32.3 requests==2.32.3
tomli==2.0.1 tomli==2.0.1