91 lines
1.9 KiB
Go
91 lines
1.9 KiB
Go
package main
|
|
|
|
import (
|
|
"backend/pkg/logger"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
type SendEmailEvent struct {
|
|
Email string `json:"email"`
|
|
Token string `json:"token"`
|
|
}
|
|
|
|
func main() {
|
|
ctx := context.Background()
|
|
|
|
config, err := LoadConfig("config.yaml")
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
}
|
|
|
|
emailer, err := NewEmailer(config.SMTP)
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
}
|
|
|
|
r := kafka.NewReader(kafka.ReaderConfig{
|
|
Brokers: config.Kafka.Brokers,
|
|
Topic: config.Kafka.Topic,
|
|
GroupID: config.Kafka.ConsumerGroupId,
|
|
})
|
|
|
|
logger, err := logger.New(ctx, logger.NewLoggerOpts{
|
|
Debug: true,
|
|
OutputFile: config.App.LogFile,
|
|
})
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
}
|
|
|
|
logger.Printf("coworker service started\n")
|
|
|
|
for {
|
|
msg, err := r.FetchMessage(ctx)
|
|
if err == io.EOF {
|
|
log.Fatal("EOF")
|
|
return
|
|
}
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
return
|
|
}
|
|
|
|
log.Printf("offset: %d, partition: %d, key: %s, value: %s\n", msg.Offset, msg.Partition, string(msg.Key), string(msg.Value))
|
|
|
|
if err := r.CommitMessages(ctx, msg); err != nil {
|
|
log.Fatalf("failed to commit: %s\n", err.Error())
|
|
continue
|
|
}
|
|
|
|
if err := handleEvent(config, emailer, msg); err != nil {
|
|
log.Printf("failed to handle event: %s\n", err.Error())
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func handleEvent(config Config, emailer *Emailer, msg kafka.Message) error {
|
|
event := SendEmailEvent{}
|
|
if err := json.Unmarshal(msg.Value, &event); err != nil {
|
|
return err
|
|
}
|
|
|
|
switch string(msg.Key) {
|
|
case "email_forgot_password":
|
|
return emailer.SendRestorePassword(event.Email, event.Token)
|
|
case "email_password_changed":
|
|
return emailer.SendPasswordChanged(event.Email)
|
|
case "email_verify_user":
|
|
link := fmt.Sprintf("%s/verify-user?token=%s", config.App.ServiceUrl, event.Token)
|
|
return emailer.SendVerifyUser(event.Email, link)
|
|
}
|
|
|
|
return fmt.Errorf("unknown event type")
|
|
}
|