From eb07bc1ac7e61948f4b9f706a24c313c6afdadff Mon Sep 17 00:00:00 2001 From: Aine Date: Mon, 14 Nov 2022 20:02:13 +0200 Subject: [PATCH] mail queue --- bot/bot.go | 4 +- bot/command.go | 34 +++++++--- bot/email.go | 120 +++++++++++++++++++--------------- bot/mutex.go | 18 +++--- bot/queue.go | 154 ++++++++++++++++++++++++++++++++++++++++++++ bot/settings_bot.go | 12 ++++ cmd/cmd.go | 12 ++++ go.mod | 1 + go.sum | 2 + utils/utils.go | 19 ++++++ 10 files changed, 302 insertions(+), 74 deletions(-) create mode 100644 bot/queue.go diff --git a/bot/bot.go b/bot/bot.go index 5d62e86..e098b25 100644 --- a/bot/bot.go +++ b/bot/bot.go @@ -25,7 +25,7 @@ type Bot struct { sendmail func(string, string, string) error log *logger.Logger lp *linkpearl.Linkpearl - mu map[id.RoomID]*sync.Mutex + mu map[string]*sync.Mutex handledMembershipEvents sync.Map } @@ -43,7 +43,7 @@ func New( rooms: sync.Map{}, log: log, lp: lp, - mu: map[id.RoomID]*sync.Mutex{}, + mu: map[string]*sync.Mutex{}, } users, err := b.initBotUsers() if err != nil { diff --git a/bot/command.go b/bot/command.go index d9f2a13..b3de58f 100644 --- a/bot/command.go +++ b/bot/command.go @@ -13,14 +13,16 @@ import ( ) const ( - commandHelp = "help" - commandStop = "stop" - commandSend = "send" - commandDKIM = "dkim" - commandCatchAll = botOptionCatchAll - commandUsers = botOptionUsers - commandDelete = "delete" - commandMailboxes = "mailboxes" + commandHelp = "help" + commandStop = "stop" + commandSend = "send" + commandDKIM = "dkim" + commandCatchAll = botOptionCatchAll + commandUsers = botOptionUsers + commandQueueItems = botOptionQueueItems + commandQueueTries = botOptionQueueTries + commandDelete = "delete" + commandMailboxes = "mailboxes" ) type ( @@ -181,6 +183,18 @@ func (b *Bot) initCommands() commandList { description: "Get or set catch-all mailbox", allowed: b.allowAdmin, }, + { + key: commandQueueItems, + description: "max amount of emails to process on each queue check", + sanitizer: utils.SanitizeIntString, + allowed: b.allowAdmin, + }, + { + key: commandQueueTries, + description: "max amount of tries per email in queue before removal", + sanitizer: utils.SanitizeIntString, + allowed: b.allowAdmin, + }, { key: commandMailboxes, description: "Show the list of all mailboxes", @@ -359,8 +373,8 @@ func (b *Bot) runSend(ctx context.Context) { } } - b.lock(evt.RoomID) - defer b.unlock(evt.RoomID) + b.lock(evt.RoomID.String()) + defer b.unlock(evt.RoomID.String()) from := mailbox + "@" + b.domains[0] ID := utils.MessageID(evt.ID, b.domains[0]) diff --git a/bot/email.go b/bot/email.go index 77977e0..2f6c080 100644 --- a/bot/email.go +++ b/bot/email.go @@ -15,6 +15,7 @@ import ( // account data keys const ( + acQueueKey = "cc.etke.postmoogle.mailqueue" acMessagePrefix = "cc.etke.postmoogle.message" acLastEventPrefix = "cc.etke.postmoogle.last" ) @@ -34,6 +35,21 @@ func (b *Bot) SetSendmail(sendmail func(string, string, string) error) { b.sendmail = sendmail } +// Sendmail tries to send email immediately, but if it gets 4xx error (greylisting), +// the email will be added to the queue and retried several times after that +func (b *Bot) Sendmail(eventID id.EventID, from, to, data string) error { + err := b.sendmail(from, to, data) + if err != nil { + if strings.HasPrefix(err.Error(), "45") { + b.log.Debug("email %s (from=%s to=%s) was added to the queue: %v", eventID, from, to, err) + return b.enqueueEmail(eventID.String(), from, to, data) + } + return err + } + + return nil +} + // GetDKIMprivkey returns DKIM private key func (b *Bot) GetDKIMprivkey() string { return b.getBotSettings().DKIMPrivateKey() @@ -89,8 +105,8 @@ func (b *Bot) IncomingEmail(ctx context.Context, email *utils.Email) error { b.Error(ctx, roomID, "cannot get settings: %v", err) } - b.lock(roomID) - defer b.unlock(roomID) + b.lock(roomID.String()) + defer b.unlock(roomID.String()) var threadID id.EventID if email.InReplyTo != "" || email.References != "" { @@ -119,6 +135,56 @@ func (b *Bot) IncomingEmail(ctx context.Context, email *utils.Email) error { return nil } +// SendEmailReply sends replies from matrix thread to email thread +func (b *Bot) SendEmailReply(ctx context.Context) { + evt := eventFromContext(ctx) + cfg, err := b.getRoomSettings(evt.RoomID) + if err != nil { + b.Error(ctx, evt.RoomID, "cannot retrieve room settings: %v", err) + return + } + mailbox := cfg.Mailbox() + if mailbox == "" { + b.Error(ctx, evt.RoomID, "mailbox is not configured, kupo") + return + } + + b.lock(evt.RoomID.String()) + defer b.unlock(evt.RoomID.String()) + + fromMailbox := mailbox + "@" + b.domains[0] + meta := b.getParentEmail(evt) + // when email was sent from matrix and reply was sent from matrix again + if fromMailbox != meta.From { + meta.To = meta.From + } + + if meta.To == "" { + b.Error(ctx, evt.RoomID, "cannot find parent email and continue the thread. Please, start a new email thread") + return + } + + meta.ThreadID = b.getThreadID(evt.RoomID, meta.InReplyTo, meta.References) + content := evt.Content.AsMessage() + if meta.Subject == "" { + meta.Subject = strings.SplitN(content.Body, "\n", 1)[0] + } + body := content.Body + + ID := utils.MessageID(evt.ID, b.domains[0]) + meta.References = meta.References + " " + ID + b.log.Debug("send email reply ID=%s meta=%+v", ID, meta) + email := utils.NewEmail(ID, meta.InReplyTo, meta.References, meta.Subject, fromMailbox, meta.To, body, "", nil) + data := email.Compose(b.getBotSettings().DKIMPrivateKey()) + + err = b.Sendmail(evt.ID, meta.From, meta.To, data) + if err != nil { + b.Error(ctx, evt.RoomID, "cannot send email: %v", err) + return + } + b.saveSentMetadata(ctx, meta.ThreadID, email, &cfg) +} + type parentEmail struct { MessageID string ThreadID id.EventID @@ -196,56 +262,6 @@ func (b *Bot) getParentEmail(evt *event.Event) parentEmail { return parent } -// SendEmailReply sends replies from matrix thread to email thread -func (b *Bot) SendEmailReply(ctx context.Context) { - evt := eventFromContext(ctx) - cfg, err := b.getRoomSettings(evt.RoomID) - if err != nil { - b.Error(ctx, evt.RoomID, "cannot retrieve room settings: %v", err) - return - } - mailbox := cfg.Mailbox() - if mailbox == "" { - b.Error(ctx, evt.RoomID, "mailbox is not configured, kupo") - return - } - - b.lock(evt.RoomID) - defer b.unlock(evt.RoomID) - - fromMailbox := mailbox + "@" + b.domains[0] - meta := b.getParentEmail(evt) - // when email was sent from matrix and reply was sent from matrix again - if fromMailbox != meta.From { - meta.To = meta.From - } - - if meta.To == "" { - b.Error(ctx, evt.RoomID, "cannot find parent email and continue the thread. Please, start a new email thread") - return - } - - meta.ThreadID = b.getThreadID(evt.RoomID, meta.InReplyTo, meta.References) - content := evt.Content.AsMessage() - if meta.Subject == "" { - meta.Subject = strings.SplitN(content.Body, "\n", 1)[0] - } - body := content.Body - - ID := utils.MessageID(evt.ID, b.domains[0]) - meta.References = meta.References + " " + ID - b.log.Debug("send email reply ID=%s meta=%+v", ID, meta) - email := utils.NewEmail(ID, meta.InReplyTo, meta.References, meta.Subject, fromMailbox, meta.To, body, "", nil) - data := email.Compose(b.getBotSettings().DKIMPrivateKey()) - - err = b.sendmail(meta.From, meta.To, data) - if err != nil { - b.Error(ctx, evt.RoomID, "cannot send email: %v", err) - return - } - b.saveSentMetadata(ctx, meta.ThreadID, email, &cfg) -} - // saveSentMetadata used to save metadata from !pm sent and thread reply events to a separate notice message // because that metadata is needed to determine email thread relations func (b *Bot) saveSentMetadata(ctx context.Context, threadID id.EventID, email *utils.Email, cfg *roomSettings) { diff --git a/bot/mutex.go b/bot/mutex.go index 8bc091d..5fc0041 100644 --- a/bot/mutex.go +++ b/bot/mutex.go @@ -2,25 +2,23 @@ package bot import ( "sync" - - "maunium.net/go/mautrix/id" ) -func (b *Bot) lock(roomID id.RoomID) { - _, ok := b.mu[roomID] +func (b *Bot) lock(key string) { + _, ok := b.mu[key] if !ok { - b.mu[roomID] = &sync.Mutex{} + b.mu[key] = &sync.Mutex{} } - b.mu[roomID].Lock() + b.mu[key].Lock() } -func (b *Bot) unlock(roomID id.RoomID) { - _, ok := b.mu[roomID] +func (b *Bot) unlock(key string) { + _, ok := b.mu[key] if !ok { return } - b.mu[roomID].Unlock() - delete(b.mu, roomID) + b.mu[key].Unlock() + delete(b.mu, key) } diff --git a/bot/queue.go b/bot/queue.go new file mode 100644 index 0000000..6615c09 --- /dev/null +++ b/bot/queue.go @@ -0,0 +1,154 @@ +package bot + +import ( + "strconv" + "time" +) + +const ( + defaultMaxQueueItems = 1 + defaultMaxQueueTries = 100 +) + +// ProcessQueue starts queue processing +func (b *Bot) ProcessQueue() { + b.log.Debug("staring queue processing...") + cfg := b.getBotSettings() + + maxItems := cfg.QueueItems() + if maxItems == 0 { + maxItems = defaultMaxQueueItems + } + + maxTries := cfg.QueueTries() + if maxTries == 0 { + maxTries = defaultMaxQueueTries + } + + b.popqueue(maxItems, maxTries) + b.log.Debug("ended queue processing") +} + +// popqueue gets emails from queue and tries to send them, +// if an email was sent successfully - it will be removed from queue +func (b *Bot) popqueue(maxItems, maxTries int) { + b.lock(acQueueKey) + defer b.unlock(acQueueKey) + index, err := b.lp.GetAccountData(acQueueKey) + if err != nil { + b.log.Error("cannot get queue index: %v", err) + } + + i := 0 + for id, itemkey := range index { + if i > maxItems { + b.log.Debug("finished re-deliveries from queue") + return + } + if dequeue := b.processQueueItem(itemkey, maxTries); dequeue { + b.log.Debug("email %s has been delivered", id) + err = b.dequeueEmail(id) + if err != nil { + b.log.Error("cannot dequeue email %s: %v", id, err) + } + } + i++ + } +} + +// processQueueItem tries to process an item from queue +// returns should the item be dequeued or not +func (b *Bot) processQueueItem(itemkey string, maxRetries int) bool { + b.lock(itemkey) + defer b.unlock(itemkey) + + item, err := b.lp.GetAccountData(itemkey) + if err != nil { + b.log.Error("cannot retrieve a queue item %s: %v", itemkey, err) + return false + } + attempts, err := strconv.Atoi(item["attempts"]) + if err != nil { + b.log.Error("cannot parse attempts of %s: %v", itemkey, err) + return false + } + if attempts > maxRetries { + return true + } + + err = b.sendmail(item["from"], item["to"], item["data"]) + if err == nil { + b.log.Debug("email %s from queue was delivered") + return true + } + + b.log.Debug("attempted to deliver email id=%s, retry=%s, but it's not ready yet: %v", item["id"], item["attempts"], err) + attempts++ + item["attempts"] = strconv.Itoa(attempts) + err = b.lp.SetAccountData(itemkey, item) + if err != nil { + b.log.Error("cannot update attempt count on email %s: %v", itemkey, err) + } + + return false +} + +// enqueueEmail adds an email to the queue +func (b *Bot) enqueueEmail(id, from, to, data string) error { + itemkey := acQueueKey + "." + id + item := map[string]string{ + "attemptedAt": time.Now().UTC().Format(time.RFC1123Z), + "attempts": "0", + "data": data, + "from": from, + "to": to, + "id": id, + } + + b.lock(itemkey) + defer b.unlock(itemkey) + err := b.lp.SetAccountData(itemkey, item) + if err != nil { + b.log.Error("cannot enqueue email id=%s: %v", id, err) + return err + } + + b.lock(acQueueKey) + defer b.unlock(acQueueKey) + queueIndex, err := b.lp.GetAccountData(acQueueKey) + if err != nil { + b.log.Error("cannot get queue index: %v", err) + return err + } + queueIndex[id] = itemkey + err = b.lp.SetAccountData(acQueueKey, queueIndex) + if err != nil { + b.log.Error("cannot save queue index: %v", err) + return err + } + + return nil +} + +// dequeueEmail removes an email from the queue +func (b *Bot) dequeueEmail(id string) error { + index, err := b.lp.GetAccountData(acQueueKey) + if err != nil { + b.log.Error("cannot get queue index: %v", err) + return err + } + itemkey := index[id] + if itemkey == "" { + itemkey = acQueueKey + "." + id + } + delete(index, id) + err = b.lp.SetAccountData(acQueueKey, index) + if err != nil { + b.log.Error("cannot update queue index: %v", err) + return err + } + + b.lock(itemkey) + defer b.unlock(itemkey) + return b.lp.SetAccountData(itemkey, nil) +} diff --git a/bot/settings_bot.go b/bot/settings_bot.go index 5811949..c17dc64 100644 --- a/bot/settings_bot.go +++ b/bot/settings_bot.go @@ -15,6 +15,8 @@ const ( botOptionCatchAll = "catch-all" botOptionDKIMSignature = "dkim.pub" botOptionDKIMPrivateKey = "dkim.pem" + botOptionQueueItems = "queue:items" + botOptionQueueTries = "queue:tries" ) type botSettings map[string]string @@ -58,6 +60,16 @@ func (s botSettings) DKIMPrivateKey() string { return s.Get(botOptionDKIMPrivateKey) } +// QueueItems option +func (s botSettings) QueueItems() int { + return utils.Int(s.Get(botOptionQueueItems)) +} + +// QueueTries option +func (s botSettings) QueueTries() int { + return utils.Int(s.Get(botOptionQueueTries)) +} + func (b *Bot) initBotUsers() ([]string, error) { config := b.getBotSettings() cfgUsers := config.Users() diff --git a/cmd/cmd.go b/cmd/cmd.go index 1da71f1..ee80381 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -10,6 +10,7 @@ import ( "github.com/getsentry/sentry-go" _ "github.com/lib/pq" _ "github.com/mattn/go-sqlite3" + "github.com/mileusna/crontab" "gitlab.com/etke.cc/go/logger" "gitlab.com/etke.cc/linkpearl" lpcfg "gitlab.com/etke.cc/linkpearl/config" @@ -22,6 +23,7 @@ import ( var ( mxb *bot.Bot + cron *crontab.Crontab smtpm *smtp.Manager log *logger.Logger ) @@ -42,6 +44,7 @@ func main() { initSentry(cfg) initBot(cfg) initSMTP(cfg) + initCron() initShutdown(quit) defer recovery() @@ -111,6 +114,15 @@ func initSMTP(cfg *config.Config) { }) } +func initCron() { + cron = crontab.New() + + err := cron.AddJob("* * * * *", mxb.ProcessQueue) + if err != nil { + log.Error("cannot start ProcessQueue cronjob: %v", err) + } +} + func initShutdown(quit chan struct{}) { listener := make(chan os.Signal, 1) signal.Notify(listener, os.Interrupt, syscall.SIGABRT, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) diff --git a/go.mod b/go.mod index e9997ba..de8668d 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/jhillyerd/enmime v0.10.0 github.com/lib/pq v1.10.7 github.com/mattn/go-sqlite3 v1.14.15 + github.com/mileusna/crontab v1.2.0 github.com/raja/argon2pw v1.0.2-0.20210910183755-a391af63bd39 gitlab.com/etke.cc/go/env v1.0.0 gitlab.com/etke.cc/go/logger v1.1.0 diff --git a/go.sum b/go.sum index 22f8e92..be97ec3 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOj github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mikesmitty/edkey v0.0.0-20170222072505-3356ea4e686a h1:eU8j/ClY2Ty3qdHnn0TyW3ivFoPC/0F1gQZz8yTxbbE= github.com/mikesmitty/edkey v0.0.0-20170222072505-3356ea4e686a/go.mod h1:v8eSC2SMp9/7FTKUncp7fH9IwPfw+ysMObcEz5FWheQ= +github.com/mileusna/crontab v1.2.0 h1:x9ZmE2A4p6CDqMEGQ+GbqsNtnmbdmWMQYShdQu8LvrU= +github.com/mileusna/crontab v1.2.0/go.mod h1:dbns64w/u3tUnGZGf8pAa76ZqOfeBX4olW4U1ZwExmc= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= diff --git a/utils/utils.go b/utils/utils.go index 3dbf321..19cf31c 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -59,6 +59,25 @@ func SanitizeBoolString(str string) string { return strconv.FormatBool(Bool(str)) } +// Int converts string to integer +func Int(str string) int { + if str == "" { + return 0 + } + + i, err := strconv.Atoi(str) + if err != nil { + return 0 + } + + return i +} + +// SanitizeBoolString converts string to integer and back to string +func SanitizeIntString(str string) string { + return strconv.Itoa(Int(str)) +} + // StringSlice converts comma-separated string to slice func StringSlice(str string) []string { if str == "" {