diff --git a/docker-compose.yaml b/docker-compose.yaml index be1f190..6f8f840 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -95,7 +95,7 @@ services: environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller - KAFKA_LISTENERS: PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT @@ -105,6 +105,9 @@ services: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_NUM_PARTITIONS: 3 + ports: + - 9092:9092 + # - 9093:9093 # backend: # build: . diff --git a/helper/go.mod b/helper/go.mod new file mode 100644 index 0000000..f8f530c --- /dev/null +++ b/helper/go.mod @@ -0,0 +1,12 @@ +module helper + +go 1.22.5 + +require gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df + +require ( + github.com/klauspost/compress v1.15.9 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/segmentio/kafka-go v0.4.47 // indirect + gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect +) diff --git a/helper/go.sum b/helper/go.sum new file mode 100644 index 0000000..0d25688 --- /dev/null +++ b/helper/go.sum @@ -0,0 +1,63 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gGKlE2+asNV9m7xrywl36YYNnBG5ZQ0r/BOOxqPpmk= +gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod h1:m7x9LTH6d71AHyAX77c9yqWCCa3UKHcVEj9y7hAtKDk= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df h1:n7WqCuqOuCbNr617RXOY0AWRXxgwEyPp2z+p0+hgMuE= +gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df/go.mod h1:LRQQ+SO6ZHR7tOkpBDuZnXENFzX8qRjMDMyPD6BRkCw= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helper/main.go b/helper/main.go new file mode 100644 index 0000000..19fe077 --- /dev/null +++ b/helper/main.go @@ -0,0 +1,88 @@ +package main + +import ( + "context" + "io" + "log" + + "github.com/segmentio/kafka-go" +) + +// type emailHelper struct { +// dialer *gomail.Dialer +// } + +// func (e *emailHelper) SendEmailForgotPassword(email, token string) { +// link := "https://nucrea.ru?token=" + token + +// const MSG_TEXT = ` +// +// +// +// +//

This message was sent because you forgot a password

+//

To change a password, use this link

+// +// +// ` +// msgText := strings.ReplaceAll(MSG_TEXT, "{{Link}}", link) + +// m := gomail.NewMessage() +// m.SetHeader("From", "email") +// m.SetHeader("To", email) +// m.SetHeader("Subject", "Hello!") +// m.SetBody("text/html", msgText) + +// if err := d.DialAndSend(m); err != nil { +// panic(err) +// } +// } + +func main() { + const ( + SMTP_SERVER = "" + SMTP_PORT = 0 + SMTP_LOGIN = "" + SMTP_PASSWORD = "" + ) + + ctx := context.Background() + + // d := gomail.NewDialer(SMTP_SERVER, SMTP_PORT, SMTP_LOGIN, SMTP_PASSWORD) + + // conn, err := kafka.DialLeader(ctx, "", "") + // if err != nil { + // panic(err) + // } + // defer conn.Close() + + log.Println("starting reader...") + + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "topic-A", + // Partition: 0, + GroupID: "consumer-group-id", + }) + // r.SetOffset(kafka.FirstOffset) + + log.Println("reader started") + + for { + msg, err := r.FetchMessage(ctx) + if err == io.EOF { + log.Fatal("EOF") + return + } + if err != nil { + log.Fatal(err.Error()) + return + } + + log.Printf("offset: %d, partition: %d, key: %s, value: %s\n", msg.Offset, msg.Partition, string(msg.Key), string(msg.Value)) + + if err := r.CommitMessages(ctx, msg); err != nil { + log.Fatalf("failed to commit: %s\n", err.Error()) + } + } +} diff --git a/misc/config.yaml b/misc/config.yaml index 4d101a5..2fbd62d 100644 --- a/misc/config.yaml +++ b/misc/config.yaml @@ -1,3 +1,5 @@ port: 8080 postgres_url: "postgres://postgres:postgres@localhost:5432/postgres" -jwt_signing_key: "./misc/jwt_signing_key" \ No newline at end of file +jwt_signing_key: "./misc/jwt_signing_key" +kafka_url: "localhost:9092" +kafka_topic: "backend_events" \ No newline at end of file diff --git a/src/app.go b/src/app.go index a3cfe43..38933d6 100644 --- a/src/app.go +++ b/src/app.go @@ -92,6 +92,8 @@ func (a *App) Run(p RunParams) { logger.Fatal().Err(err).Msg("failed connecting to postgres") } + kafka := integrations.NewKafka(conf.GetKafkaUrl(), conf.GetKafkaTopic()) + var key *rsa.PrivateKey { keyRawBytes, err := os.ReadFile(conf.GetJwtSigningKey()) @@ -135,9 +137,9 @@ func (a *App) Run(p RunParams) { passwordUtil = utils.NewPasswordUtil() userRepo = repos.NewUserRepo(sqlDb, tracer) - emailRepo = repos.NewEmailRepo() actionTokenRepo = repos.NewActionTokenRepo(sqlDb) shortlinkRepo = repos.NewShortlinkRepo(sqlDb, tracer) + eventRepo = repos.NewEventRepo(kafka) userCache = cache.NewCacheInmemSharded[models.UserDTO](cache.ShardingTypeInteger) jwtCache = cache.NewCacheInmemSharded[string](cache.ShardingTypeJWT) @@ -170,7 +172,7 @@ func (a *App) Run(p RunParams) { UserRepo: userRepo, UserCache: userCache, JwtCache: jwtCache, - EmailRepo: emailRepo, + EventRepo: *eventRepo, ActionTokenRepo: actionTokenRepo, }, ) diff --git a/src/config/config.go b/src/config/config.go index adba7e8..19d69f9 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -4,12 +4,16 @@ type IConfig interface { GetPort() uint16 GetPostgresUrl() string GetJwtSigningKey() string + GetKafkaUrl() string + GetKafkaTopic() string } type Config struct { Port uint16 `yaml:"port"` PostgresUrl string `yaml:"postgres_url"` JwtSigningKey string `yaml:"jwt_signing_key" validate:"file"` + KafkaUrl string `yaml:"kafka_url"` + KafkaTopic string `yaml:"kafka_topic"` } func (c *Config) GetPort() uint16 { @@ -23,3 +27,11 @@ func (c *Config) GetPostgresUrl() string { func (c *Config) GetJwtSigningKey() string { return c.JwtSigningKey } + +func (c *Config) GetKafkaUrl() string { + return c.KafkaUrl +} + +func (c *Config) GetKafkaTopic() string { + return c.KafkaTopic +} diff --git a/src/core/repos/email_repo.go b/src/core/repos/email_repo.go deleted file mode 100644 index e1eb0b4..0000000 --- a/src/core/repos/email_repo.go +++ /dev/null @@ -1,48 +0,0 @@ -package repos - -import ( - "strings" - - "gopkg.in/gomail.v2" -) - -const MSG_TEXT = ` - - - - -

This message was sent because you forgot a password

-

To change a password, use this link

- - -` - -type EmailRepo interface { - SendEmailForgotPassword(email, token string) -} - -func NewEmailRepo() EmailRepo { - return &emailRepo{} -} - -type emailRepo struct { - // mail *gomail.Dialer -} - -func (e *emailRepo) SendEmailForgotPassword(email, token string) { - link := "https://nucrea.ru?token=" + token - msgText := strings.ReplaceAll(MSG_TEXT, "{{Link}}", link) - - m := gomail.NewMessage() - m.SetHeader("From", "email") - m.SetHeader("To", email) - m.SetHeader("Subject", "Hello!") - m.SetBody("text/html", msgText) - - d := gomail.NewDialer("smtp.yandex.ru", 587, "login", "password") - - // Send the email to Bob, Cora and Dan. - if err := d.DialAndSend(m); err != nil { - panic(err) - } -} diff --git a/src/core/repos/event_repo.go b/src/core/repos/event_repo.go new file mode 100644 index 0000000..1ae8216 --- /dev/null +++ b/src/core/repos/event_repo.go @@ -0,0 +1,33 @@ +package repos + +import ( + "backend/src/integrations" + "context" + "encoding/json" +) + +func NewEventRepo(kafka *integrations.Kafka) *EventRepo { + return &EventRepo{ + kafka: kafka, + } +} + +type EventRepo struct { + kafka *integrations.Kafka +} + +func (e *EventRepo) SendEmailForgotPassword(ctx context.Context, email, actionToken string) error { + value := struct { + Email string `json:"email"` + Token string `json:"token"` + }{ + Email: email, + Token: actionToken, + } + valueBytes, err := json.Marshal(value) + if err != nil { + return err + } + + return e.kafka.SendMessage(ctx, "email_forgot_password", valueBytes) +} diff --git a/src/core/services/user_service.go b/src/core/services/user_service.go index 2142fde..a718613 100644 --- a/src/core/services/user_service.go +++ b/src/core/services/user_service.go @@ -41,7 +41,7 @@ type UserServiceDeps struct { UserRepo repos.UserRepo UserCache cache.Cache[string, models.UserDTO] JwtCache cache.Cache[string, string] - EmailRepo repos.EmailRepo + EventRepo repos.EventRepo ActionTokenRepo repos.ActionTokenRepo } @@ -132,8 +132,7 @@ func (u *userService) HelpPasswordForgot(ctx context.Context, userId string) err return err } - u.deps.EmailRepo.SendEmailForgotPassword(user.Email, actionToken.Value) - return nil + return u.deps.EventRepo.SendEmailForgotPassword(ctx, user.Email, actionToken.Value) } func (u *userService) ChangePasswordForgot(ctx context.Context, userId, newPassword, accessCode string) error { diff --git a/src/integrations/kafka.go b/src/integrations/kafka.go index 7c937b9..ebd0b7c 100644 --- a/src/integrations/kafka.go +++ b/src/integrations/kafka.go @@ -2,36 +2,36 @@ package integrations import ( "context" + "time" "github.com/segmentio/kafka-go" ) type Kafka struct { - conn *kafka.Conn + writer *kafka.Writer } -func (k *Kafka) Connect(ctx context.Context) error { - conn, err := kafka.DialContext(ctx, "", "") - if err != nil { - return err +func NewKafka(addr, topic string) *Kafka { + w := &kafka.Writer{ + Addr: kafka.TCP(addr), + Topic: topic, + Balancer: &kafka.RoundRobin{}, + AllowAutoTopicCreation: true, + BatchSize: 100, + BatchTimeout: 100 * time.Millisecond, } - defer conn.Close() - // w := &kafka.Writer{ - // Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), - // Topic: "topic-A", - // Balancer: &kafka.LeastBytes{}, - // } - - return nil -} - -func (k *Kafka) SendMessage() { - k.conn.WriteMessages() - - msg := kafka.Message{ - Topic: "event", - Key: []byte("send_email"), - Value: []byte("value"), + return &Kafka{ + writer: w, } } + +func (k *Kafka) SendMessage(ctx context.Context, key string, value []byte) error { + return k.writer.WriteMessages( + context.Background(), + kafka.Message{ + Key: []byte(key), + Value: value, + }, + ) +}