Hermes/cmd/notifyer/main.go

91 lines
1.9 KiB
Go

package main
import (
"backend/pkg/logger"
"context"
"encoding/json"
"fmt"
"io"
"log"
"github.com/segmentio/kafka-go"
)
type SendEmailEvent struct {
Email string `json:"email"`
Token string `json:"token"`
}
func main() {
ctx := context.Background()
config, err := LoadConfig("config.yaml")
if err != nil {
log.Fatal(err.Error())
}
emailer, err := NewEmailer(config.SMTP)
if err != nil {
log.Fatal(err.Error())
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.Kafka.Brokers,
Topic: config.Kafka.Topic,
GroupID: config.Kafka.ConsumerGroupId,
})
logger, err := logger.New(ctx, logger.NewLoggerOpts{
Debug: true,
OutputFile: config.App.LogFile,
})
if err != nil {
log.Fatal(err.Error())
}
logger.Printf("notifyer service started\n")
for {
msg, err := r.FetchMessage(ctx)
if err == io.EOF {
log.Fatal("EOF")
return
}
if err != nil {
log.Fatal(err.Error())
return
}
log.Printf("offset: %d, partition: %d, key: %s, value: %s\n", msg.Offset, msg.Partition, string(msg.Key), string(msg.Value))
if err := r.CommitMessages(ctx, msg); err != nil {
log.Fatalf("failed to commit: %s\n", err.Error())
continue
}
if err := handleEvent(config, emailer, msg); err != nil {
log.Printf("failed to handle event: %s\n", err.Error())
continue
}
}
}
func handleEvent(config Config, emailer *Emailer, msg kafka.Message) error {
event := SendEmailEvent{}
if err := json.Unmarshal(msg.Value, &event); err != nil {
return err
}
switch string(msg.Key) {
case "email_forgot_password":
return emailer.SendRestorePassword(event.Email, event.Token)
case "email_password_changed":
return emailer.SendPasswordChanged(event.Email)
case "email_verify_user":
link := fmt.Sprintf("%s/verify-user?token=%s", config.App.ServiceUrl, event.Token)
return emailer.SendVerifyUser(event.Email, link)
}
return fmt.Errorf("unknown event type")
}