From eb07bc1ac7e61948f4b9f706a24c313c6afdadff Mon Sep 17 00:00:00 2001 From: Aine Date: Mon, 14 Nov 2022 20:02:13 +0200 Subject: [PATCH 1/4] mail queue --- bot/bot.go | 4 +- bot/command.go | 34 +++++++--- bot/email.go | 120 +++++++++++++++++++--------------- bot/mutex.go | 18 +++--- bot/queue.go | 154 ++++++++++++++++++++++++++++++++++++++++++++ bot/settings_bot.go | 12 ++++ cmd/cmd.go | 12 ++++ go.mod | 1 + go.sum | 2 + utils/utils.go | 19 ++++++ 10 files changed, 302 insertions(+), 74 deletions(-) create mode 100644 bot/queue.go diff --git a/bot/bot.go b/bot/bot.go index 5d62e86..e098b25 100644 --- a/bot/bot.go +++ b/bot/bot.go @@ -25,7 +25,7 @@ type Bot struct { sendmail func(string, string, string) error log *logger.Logger lp *linkpearl.Linkpearl - mu map[id.RoomID]*sync.Mutex + mu map[string]*sync.Mutex handledMembershipEvents sync.Map } @@ -43,7 +43,7 @@ func New( rooms: sync.Map{}, log: log, lp: lp, - mu: map[id.RoomID]*sync.Mutex{}, + mu: map[string]*sync.Mutex{}, } users, err := b.initBotUsers() if err != nil { diff --git a/bot/command.go b/bot/command.go index d9f2a13..b3de58f 100644 --- a/bot/command.go +++ b/bot/command.go @@ -13,14 +13,16 @@ import ( ) const ( - commandHelp = "help" - commandStop = "stop" - commandSend = "send" - commandDKIM = "dkim" - commandCatchAll = botOptionCatchAll - commandUsers = botOptionUsers - commandDelete = "delete" - commandMailboxes = "mailboxes" + commandHelp = "help" + commandStop = "stop" + commandSend = "send" + commandDKIM = "dkim" + commandCatchAll = botOptionCatchAll + commandUsers = botOptionUsers + commandQueueItems = botOptionQueueItems + commandQueueTries = botOptionQueueTries + commandDelete = "delete" + commandMailboxes = "mailboxes" ) type ( @@ -181,6 +183,18 @@ func (b *Bot) initCommands() commandList { description: "Get or set catch-all mailbox", allowed: b.allowAdmin, }, + { + key: commandQueueItems, + description: "max amount of emails to process on each queue check", + sanitizer: utils.SanitizeIntString, + allowed: b.allowAdmin, + }, + { + key: commandQueueTries, + description: "max amount of tries per email in queue before removal", + sanitizer: utils.SanitizeIntString, + allowed: b.allowAdmin, + }, { key: commandMailboxes, description: "Show the list of all mailboxes", @@ -359,8 +373,8 @@ func (b *Bot) runSend(ctx context.Context) { } } - b.lock(evt.RoomID) - defer b.unlock(evt.RoomID) + b.lock(evt.RoomID.String()) + defer b.unlock(evt.RoomID.String()) from := mailbox + "@" + b.domains[0] ID := utils.MessageID(evt.ID, b.domains[0]) diff --git a/bot/email.go b/bot/email.go index 77977e0..2f6c080 100644 --- a/bot/email.go +++ b/bot/email.go @@ -15,6 +15,7 @@ import ( // account data keys const ( + acQueueKey = "cc.etke.postmoogle.mailqueue" acMessagePrefix = "cc.etke.postmoogle.message" acLastEventPrefix = "cc.etke.postmoogle.last" ) @@ -34,6 +35,21 @@ func (b *Bot) SetSendmail(sendmail func(string, string, string) error) { b.sendmail = sendmail } +// Sendmail tries to send email immediately, but if it gets 4xx error (greylisting), +// the email will be added to the queue and retried several times after that +func (b *Bot) Sendmail(eventID id.EventID, from, to, data string) error { + err := b.sendmail(from, to, data) + if err != nil { + if strings.HasPrefix(err.Error(), "45") { + b.log.Debug("email %s (from=%s to=%s) was added to the queue: %v", eventID, from, to, err) + return b.enqueueEmail(eventID.String(), from, to, data) + } + return err + } + + return nil +} + // GetDKIMprivkey returns DKIM private key func (b *Bot) GetDKIMprivkey() string { return b.getBotSettings().DKIMPrivateKey() @@ -89,8 +105,8 @@ func (b *Bot) IncomingEmail(ctx context.Context, email *utils.Email) error { b.Error(ctx, roomID, "cannot get settings: %v", err) } - b.lock(roomID) - defer b.unlock(roomID) + b.lock(roomID.String()) + defer b.unlock(roomID.String()) var threadID id.EventID if email.InReplyTo != "" || email.References != "" { @@ -119,6 +135,56 @@ func (b *Bot) IncomingEmail(ctx context.Context, email *utils.Email) error { return nil } +// SendEmailReply sends replies from matrix thread to email thread +func (b *Bot) SendEmailReply(ctx context.Context) { + evt := eventFromContext(ctx) + cfg, err := b.getRoomSettings(evt.RoomID) + if err != nil { + b.Error(ctx, evt.RoomID, "cannot retrieve room settings: %v", err) + return + } + mailbox := cfg.Mailbox() + if mailbox == "" { + b.Error(ctx, evt.RoomID, "mailbox is not configured, kupo") + return + } + + b.lock(evt.RoomID.String()) + defer b.unlock(evt.RoomID.String()) + + fromMailbox := mailbox + "@" + b.domains[0] + meta := b.getParentEmail(evt) + // when email was sent from matrix and reply was sent from matrix again + if fromMailbox != meta.From { + meta.To = meta.From + } + + if meta.To == "" { + b.Error(ctx, evt.RoomID, "cannot find parent email and continue the thread. Please, start a new email thread") + return + } + + meta.ThreadID = b.getThreadID(evt.RoomID, meta.InReplyTo, meta.References) + content := evt.Content.AsMessage() + if meta.Subject == "" { + meta.Subject = strings.SplitN(content.Body, "\n", 1)[0] + } + body := content.Body + + ID := utils.MessageID(evt.ID, b.domains[0]) + meta.References = meta.References + " " + ID + b.log.Debug("send email reply ID=%s meta=%+v", ID, meta) + email := utils.NewEmail(ID, meta.InReplyTo, meta.References, meta.Subject, fromMailbox, meta.To, body, "", nil) + data := email.Compose(b.getBotSettings().DKIMPrivateKey()) + + err = b.Sendmail(evt.ID, meta.From, meta.To, data) + if err != nil { + b.Error(ctx, evt.RoomID, "cannot send email: %v", err) + return + } + b.saveSentMetadata(ctx, meta.ThreadID, email, &cfg) +} + type parentEmail struct { MessageID string ThreadID id.EventID @@ -196,56 +262,6 @@ func (b *Bot) getParentEmail(evt *event.Event) parentEmail { return parent } -// SendEmailReply sends replies from matrix thread to email thread -func (b *Bot) SendEmailReply(ctx context.Context) { - evt := eventFromContext(ctx) - cfg, err := b.getRoomSettings(evt.RoomID) - if err != nil { - b.Error(ctx, evt.RoomID, "cannot retrieve room settings: %v", err) - return - } - mailbox := cfg.Mailbox() - if mailbox == "" { - b.Error(ctx, evt.RoomID, "mailbox is not configured, kupo") - return - } - - b.lock(evt.RoomID) - defer b.unlock(evt.RoomID) - - fromMailbox := mailbox + "@" + b.domains[0] - meta := b.getParentEmail(evt) - // when email was sent from matrix and reply was sent from matrix again - if fromMailbox != meta.From { - meta.To = meta.From - } - - if meta.To == "" { - b.Error(ctx, evt.RoomID, "cannot find parent email and continue the thread. Please, start a new email thread") - return - } - - meta.ThreadID = b.getThreadID(evt.RoomID, meta.InReplyTo, meta.References) - content := evt.Content.AsMessage() - if meta.Subject == "" { - meta.Subject = strings.SplitN(content.Body, "\n", 1)[0] - } - body := content.Body - - ID := utils.MessageID(evt.ID, b.domains[0]) - meta.References = meta.References + " " + ID - b.log.Debug("send email reply ID=%s meta=%+v", ID, meta) - email := utils.NewEmail(ID, meta.InReplyTo, meta.References, meta.Subject, fromMailbox, meta.To, body, "", nil) - data := email.Compose(b.getBotSettings().DKIMPrivateKey()) - - err = b.sendmail(meta.From, meta.To, data) - if err != nil { - b.Error(ctx, evt.RoomID, "cannot send email: %v", err) - return - } - b.saveSentMetadata(ctx, meta.ThreadID, email, &cfg) -} - // saveSentMetadata used to save metadata from !pm sent and thread reply events to a separate notice message // because that metadata is needed to determine email thread relations func (b *Bot) saveSentMetadata(ctx context.Context, threadID id.EventID, email *utils.Email, cfg *roomSettings) { diff --git a/bot/mutex.go b/bot/mutex.go index 8bc091d..5fc0041 100644 --- a/bot/mutex.go +++ b/bot/mutex.go @@ -2,25 +2,23 @@ package bot import ( "sync" - - "maunium.net/go/mautrix/id" ) -func (b *Bot) lock(roomID id.RoomID) { - _, ok := b.mu[roomID] +func (b *Bot) lock(key string) { + _, ok := b.mu[key] if !ok { - b.mu[roomID] = &sync.Mutex{} + b.mu[key] = &sync.Mutex{} } - b.mu[roomID].Lock() + b.mu[key].Lock() } -func (b *Bot) unlock(roomID id.RoomID) { - _, ok := b.mu[roomID] +func (b *Bot) unlock(key string) { + _, ok := b.mu[key] if !ok { return } - b.mu[roomID].Unlock() - delete(b.mu, roomID) + b.mu[key].Unlock() + delete(b.mu, key) } diff --git a/bot/queue.go b/bot/queue.go new file mode 100644 index 0000000..6615c09 --- /dev/null +++ b/bot/queue.go @@ -0,0 +1,154 @@ +package bot + +import ( + "strconv" + "time" +) + +const ( + defaultMaxQueueItems = 1 + defaultMaxQueueTries = 100 +) + +// ProcessQueue starts queue processing +func (b *Bot) ProcessQueue() { + b.log.Debug("staring queue processing...") + cfg := b.getBotSettings() + + maxItems := cfg.QueueItems() + if maxItems == 0 { + maxItems = defaultMaxQueueItems + } + + maxTries := cfg.QueueTries() + if maxTries == 0 { + maxTries = defaultMaxQueueTries + } + + b.popqueue(maxItems, maxTries) + b.log.Debug("ended queue processing") +} + +// popqueue gets emails from queue and tries to send them, +// if an email was sent successfully - it will be removed from queue +func (b *Bot) popqueue(maxItems, maxTries int) { + b.lock(acQueueKey) + defer b.unlock(acQueueKey) + index, err := b.lp.GetAccountData(acQueueKey) + if err != nil { + b.log.Error("cannot get queue index: %v", err) + } + + i := 0 + for id, itemkey := range index { + if i > maxItems { + b.log.Debug("finished re-deliveries from queue") + return + } + if dequeue := b.processQueueItem(itemkey, maxTries); dequeue { + b.log.Debug("email %s has been delivered", id) + err = b.dequeueEmail(id) + if err != nil { + b.log.Error("cannot dequeue email %s: %v", id, err) + } + } + i++ + } +} + +// processQueueItem tries to process an item from queue +// returns should the item be dequeued or not +func (b *Bot) processQueueItem(itemkey string, maxRetries int) bool { + b.lock(itemkey) + defer b.unlock(itemkey) + + item, err := b.lp.GetAccountData(itemkey) + if err != nil { + b.log.Error("cannot retrieve a queue item %s: %v", itemkey, err) + return false + } + attempts, err := strconv.Atoi(item["attempts"]) + if err != nil { + b.log.Error("cannot parse attempts of %s: %v", itemkey, err) + return false + } + if attempts > maxRetries { + return true + } + + err = b.sendmail(item["from"], item["to"], item["data"]) + if err == nil { + b.log.Debug("email %s from queue was delivered") + return true + } + + b.log.Debug("attempted to deliver email id=%s, retry=%s, but it's not ready yet: %v", item["id"], item["attempts"], err) + attempts++ + item["attempts"] = strconv.Itoa(attempts) + err = b.lp.SetAccountData(itemkey, item) + if err != nil { + b.log.Error("cannot update attempt count on email %s: %v", itemkey, err) + } + + return false +} + +// enqueueEmail adds an email to the queue +func (b *Bot) enqueueEmail(id, from, to, data string) error { + itemkey := acQueueKey + "." + id + item := map[string]string{ + "attemptedAt": time.Now().UTC().Format(time.RFC1123Z), + "attempts": "0", + "data": data, + "from": from, + "to": to, + "id": id, + } + + b.lock(itemkey) + defer b.unlock(itemkey) + err := b.lp.SetAccountData(itemkey, item) + if err != nil { + b.log.Error("cannot enqueue email id=%s: %v", id, err) + return err + } + + b.lock(acQueueKey) + defer b.unlock(acQueueKey) + queueIndex, err := b.lp.GetAccountData(acQueueKey) + if err != nil { + b.log.Error("cannot get queue index: %v", err) + return err + } + queueIndex[id] = itemkey + err = b.lp.SetAccountData(acQueueKey, queueIndex) + if err != nil { + b.log.Error("cannot save queue index: %v", err) + return err + } + + return nil +} + +// dequeueEmail removes an email from the queue +func (b *Bot) dequeueEmail(id string) error { + index, err := b.lp.GetAccountData(acQueueKey) + if err != nil { + b.log.Error("cannot get queue index: %v", err) + return err + } + itemkey := index[id] + if itemkey == "" { + itemkey = acQueueKey + "." + id + } + delete(index, id) + err = b.lp.SetAccountData(acQueueKey, index) + if err != nil { + b.log.Error("cannot update queue index: %v", err) + return err + } + + b.lock(itemkey) + defer b.unlock(itemkey) + return b.lp.SetAccountData(itemkey, nil) +} diff --git a/bot/settings_bot.go b/bot/settings_bot.go index 5811949..c17dc64 100644 --- a/bot/settings_bot.go +++ b/bot/settings_bot.go @@ -15,6 +15,8 @@ const ( botOptionCatchAll = "catch-all" botOptionDKIMSignature = "dkim.pub" botOptionDKIMPrivateKey = "dkim.pem" + botOptionQueueItems = "queue:items" + botOptionQueueTries = "queue:tries" ) type botSettings map[string]string @@ -58,6 +60,16 @@ func (s botSettings) DKIMPrivateKey() string { return s.Get(botOptionDKIMPrivateKey) } +// QueueItems option +func (s botSettings) QueueItems() int { + return utils.Int(s.Get(botOptionQueueItems)) +} + +// QueueTries option +func (s botSettings) QueueTries() int { + return utils.Int(s.Get(botOptionQueueTries)) +} + func (b *Bot) initBotUsers() ([]string, error) { config := b.getBotSettings() cfgUsers := config.Users() diff --git a/cmd/cmd.go b/cmd/cmd.go index 1da71f1..ee80381 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -10,6 +10,7 @@ import ( "github.com/getsentry/sentry-go" _ "github.com/lib/pq" _ "github.com/mattn/go-sqlite3" + "github.com/mileusna/crontab" "gitlab.com/etke.cc/go/logger" "gitlab.com/etke.cc/linkpearl" lpcfg "gitlab.com/etke.cc/linkpearl/config" @@ -22,6 +23,7 @@ import ( var ( mxb *bot.Bot + cron *crontab.Crontab smtpm *smtp.Manager log *logger.Logger ) @@ -42,6 +44,7 @@ func main() { initSentry(cfg) initBot(cfg) initSMTP(cfg) + initCron() initShutdown(quit) defer recovery() @@ -111,6 +114,15 @@ func initSMTP(cfg *config.Config) { }) } +func initCron() { + cron = crontab.New() + + err := cron.AddJob("* * * * *", mxb.ProcessQueue) + if err != nil { + log.Error("cannot start ProcessQueue cronjob: %v", err) + } +} + func initShutdown(quit chan struct{}) { listener := make(chan os.Signal, 1) signal.Notify(listener, os.Interrupt, syscall.SIGABRT, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) diff --git a/go.mod b/go.mod index e9997ba..de8668d 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/jhillyerd/enmime v0.10.0 github.com/lib/pq v1.10.7 github.com/mattn/go-sqlite3 v1.14.15 + github.com/mileusna/crontab v1.2.0 github.com/raja/argon2pw v1.0.2-0.20210910183755-a391af63bd39 gitlab.com/etke.cc/go/env v1.0.0 gitlab.com/etke.cc/go/logger v1.1.0 diff --git a/go.sum b/go.sum index 22f8e92..be97ec3 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOj github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mikesmitty/edkey v0.0.0-20170222072505-3356ea4e686a h1:eU8j/ClY2Ty3qdHnn0TyW3ivFoPC/0F1gQZz8yTxbbE= github.com/mikesmitty/edkey v0.0.0-20170222072505-3356ea4e686a/go.mod h1:v8eSC2SMp9/7FTKUncp7fH9IwPfw+ysMObcEz5FWheQ= +github.com/mileusna/crontab v1.2.0 h1:x9ZmE2A4p6CDqMEGQ+GbqsNtnmbdmWMQYShdQu8LvrU= +github.com/mileusna/crontab v1.2.0/go.mod h1:dbns64w/u3tUnGZGf8pAa76ZqOfeBX4olW4U1ZwExmc= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= diff --git a/utils/utils.go b/utils/utils.go index 3dbf321..19cf31c 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -59,6 +59,25 @@ func SanitizeBoolString(str string) string { return strconv.FormatBool(Bool(str)) } +// Int converts string to integer +func Int(str string) int { + if str == "" { + return 0 + } + + i, err := strconv.Atoi(str) + if err != nil { + return 0 + } + + return i +} + +// SanitizeBoolString converts string to integer and back to string +func SanitizeIntString(str string) string { + return strconv.Itoa(Int(str)) +} + // StringSlice converts comma-separated string to slice func StringSlice(str string) []string { if str == "" { From a8780a32c16ed61ed3ff5d3cdbf9af0d81f1ef2f Mon Sep 17 00:00:00 2001 From: Aine Date: Tue, 15 Nov 2022 09:42:07 +0200 Subject: [PATCH 2/4] explicitly tell about enqueued email --- bot/command.go | 11 ++++++++--- bot/email.go | 28 ++++++++++++++++++++-------- bot/queue.go | 15 +++++++-------- cmd/cmd.go | 1 + 4 files changed, 36 insertions(+), 19 deletions(-) diff --git a/bot/command.go b/bot/command.go index b3de58f..9c7c88c 100644 --- a/bot/command.go +++ b/bot/command.go @@ -381,12 +381,17 @@ func (b *Bot) runSend(ctx context.Context) { for _, to := range tos { email := utils.NewEmail(ID, "", " "+ID, subject, from, to, body, "", nil) data := email.Compose(b.getBotSettings().DKIMPrivateKey()) - err = b.sendmail(from, to, data) + queued, err := b.Sendmail(evt.ID, from, to, data) + if queued { + b.log.Error("cannot send email: %v", err) + b.saveSentMetadata(ctx, queued, evt.ID, email, &cfg) + continue + } if err != nil { b.Error(ctx, evt.RoomID, "cannot send email to %s: %v", to, err) - } else { - b.saveSentMetadata(ctx, evt.ID, email, &cfg) + continue } + b.saveSentMetadata(ctx, false, evt.ID, email, &cfg) } if len(tos) > 1 { b.SendNotice(ctx, evt.RoomID, "All emails were sent.") diff --git a/bot/email.go b/bot/email.go index 2f6c080..9004f96 100644 --- a/bot/email.go +++ b/bot/email.go @@ -37,17 +37,17 @@ func (b *Bot) SetSendmail(sendmail func(string, string, string) error) { // Sendmail tries to send email immediately, but if it gets 4xx error (greylisting), // the email will be added to the queue and retried several times after that -func (b *Bot) Sendmail(eventID id.EventID, from, to, data string) error { +func (b *Bot) Sendmail(eventID id.EventID, from, to, data string) (bool, error) { err := b.sendmail(from, to, data) if err != nil { if strings.HasPrefix(err.Error(), "45") { b.log.Debug("email %s (from=%s to=%s) was added to the queue: %v", eventID, from, to, err) - return b.enqueueEmail(eventID.String(), from, to, data) + return true, b.enqueueEmail(eventID.String(), from, to, data) } - return err + return false, err } - return nil + return false, nil } // GetDKIMprivkey returns DKIM private key @@ -177,12 +177,19 @@ func (b *Bot) SendEmailReply(ctx context.Context) { email := utils.NewEmail(ID, meta.InReplyTo, meta.References, meta.Subject, fromMailbox, meta.To, body, "", nil) data := email.Compose(b.getBotSettings().DKIMPrivateKey()) - err = b.Sendmail(evt.ID, meta.From, meta.To, data) + queued, err := b.Sendmail(evt.ID, meta.From, meta.To, data) + if queued { + b.log.Error("cannot send email: %v", err) + b.saveSentMetadata(ctx, queued, meta.ThreadID, email, &cfg) + return + } + if err != nil { b.Error(ctx, evt.RoomID, "cannot send email: %v", err) return } - b.saveSentMetadata(ctx, meta.ThreadID, email, &cfg) + + b.saveSentMetadata(ctx, queued, meta.ThreadID, email, &cfg) } type parentEmail struct { @@ -264,10 +271,15 @@ func (b *Bot) getParentEmail(evt *event.Event) parentEmail { // saveSentMetadata used to save metadata from !pm sent and thread reply events to a separate notice message // because that metadata is needed to determine email thread relations -func (b *Bot) saveSentMetadata(ctx context.Context, threadID id.EventID, email *utils.Email, cfg *roomSettings) { +func (b *Bot) saveSentMetadata(ctx context.Context, queued bool, threadID id.EventID, email *utils.Email, cfg *roomSettings) { + text := "Email has been sent to " + email.To + if queued { + text = "Email to " + email.To + " has been queued" + } + evt := eventFromContext(ctx) content := email.Content(threadID, cfg.ContentOptions()) - notice := format.RenderMarkdown("Email has been sent to "+email.To, true, true) + notice := format.RenderMarkdown(text, true, true) notice.MsgType = event.MsgNotice msgContent, ok := content.Parsed.(event.MessageEventContent) if !ok { diff --git a/bot/queue.go b/bot/queue.go index 6615c09..b35b33a 100644 --- a/bot/queue.go +++ b/bot/queue.go @@ -2,12 +2,11 @@ package bot import ( "strconv" - "time" ) const ( defaultMaxQueueItems = 1 - defaultMaxQueueTries = 100 + defaultMaxQueueTries = 3 ) // ProcessQueue starts queue processing @@ -67,6 +66,7 @@ func (b *Bot) processQueueItem(itemkey string, maxRetries int) bool { b.log.Error("cannot retrieve a queue item %s: %v", itemkey, err) return false } + b.log.Debug("processing queue item %+v", item) attempts, err := strconv.Atoi(item["attempts"]) if err != nil { b.log.Error("cannot parse attempts of %s: %v", itemkey, err) @@ -97,12 +97,11 @@ func (b *Bot) processQueueItem(itemkey string, maxRetries int) bool { func (b *Bot) enqueueEmail(id, from, to, data string) error { itemkey := acQueueKey + "." + id item := map[string]string{ - "attemptedAt": time.Now().UTC().Format(time.RFC1123Z), - "attempts": "0", - "data": data, - "from": from, - "to": to, - "id": id, + "attempts": "0", + "data": data, + "from": from, + "to": to, + "id": id, } b.lock(itemkey) diff --git a/cmd/cmd.go b/cmd/cmd.go index ee80381..f856433 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -146,6 +146,7 @@ func startBot(statusMsg string) { func shutdown() { log.Info("Shutting down...") + cron.Shutdown() smtpm.Stop() mxb.Stop() From 4ef139f875301786211052f59af451962b4de339 Mon Sep 17 00:00:00 2001 From: Aine Date: Tue, 15 Nov 2022 09:45:43 +0200 Subject: [PATCH 3/4] rename queue config options --- bot/command.go | 24 ++++++++++++------------ bot/queue.go | 22 +++++++++++----------- bot/settings_bot.go | 16 ++++++++-------- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/bot/command.go b/bot/command.go index 9c7c88c..40a6035 100644 --- a/bot/command.go +++ b/bot/command.go @@ -13,16 +13,16 @@ import ( ) const ( - commandHelp = "help" - commandStop = "stop" - commandSend = "send" - commandDKIM = "dkim" - commandCatchAll = botOptionCatchAll - commandUsers = botOptionUsers - commandQueueItems = botOptionQueueItems - commandQueueTries = botOptionQueueTries - commandDelete = "delete" - commandMailboxes = "mailboxes" + commandHelp = "help" + commandStop = "stop" + commandSend = "send" + commandDKIM = "dkim" + commandCatchAll = botOptionCatchAll + commandUsers = botOptionUsers + commandQueueBatch = botOptionQueueBatch + commandQueueRetries = botOptionQueueRetries + commandDelete = "delete" + commandMailboxes = "mailboxes" ) type ( @@ -184,13 +184,13 @@ func (b *Bot) initCommands() commandList { allowed: b.allowAdmin, }, { - key: commandQueueItems, + key: commandQueueBatch, description: "max amount of emails to process on each queue check", sanitizer: utils.SanitizeIntString, allowed: b.allowAdmin, }, { - key: commandQueueTries, + key: commandQueueRetries, description: "max amount of tries per email in queue before removal", sanitizer: utils.SanitizeIntString, allowed: b.allowAdmin, diff --git a/bot/queue.go b/bot/queue.go index b35b33a..865c097 100644 --- a/bot/queue.go +++ b/bot/queue.go @@ -5,8 +5,8 @@ import ( ) const ( - defaultMaxQueueItems = 1 - defaultMaxQueueTries = 3 + defaultQueueBatch = 1 + defaultQueueRetries = 3 ) // ProcessQueue starts queue processing @@ -14,23 +14,23 @@ func (b *Bot) ProcessQueue() { b.log.Debug("staring queue processing...") cfg := b.getBotSettings() - maxItems := cfg.QueueItems() - if maxItems == 0 { - maxItems = defaultMaxQueueItems + batchSize := cfg.QueueBatch() + if batchSize == 0 { + batchSize = defaultQueueBatch } - maxTries := cfg.QueueTries() - if maxTries == 0 { - maxTries = defaultMaxQueueTries + retries := cfg.QueueRetries() + if retries == 0 { + retries = defaultQueueRetries } - b.popqueue(maxItems, maxTries) + b.popqueue(batchSize, retries) b.log.Debug("ended queue processing") } // popqueue gets emails from queue and tries to send them, // if an email was sent successfully - it will be removed from queue -func (b *Bot) popqueue(maxItems, maxTries int) { +func (b *Bot) popqueue(batchSize, maxTries int) { b.lock(acQueueKey) defer b.unlock(acQueueKey) index, err := b.lp.GetAccountData(acQueueKey) @@ -40,7 +40,7 @@ func (b *Bot) popqueue(maxItems, maxTries int) { i := 0 for id, itemkey := range index { - if i > maxItems { + if i > batchSize { b.log.Debug("finished re-deliveries from queue") return } diff --git a/bot/settings_bot.go b/bot/settings_bot.go index c17dc64..b54d783 100644 --- a/bot/settings_bot.go +++ b/bot/settings_bot.go @@ -15,8 +15,8 @@ const ( botOptionCatchAll = "catch-all" botOptionDKIMSignature = "dkim.pub" botOptionDKIMPrivateKey = "dkim.pem" - botOptionQueueItems = "queue:items" - botOptionQueueTries = "queue:tries" + botOptionQueueBatch = "queue:batch" + botOptionQueueRetries = "queue:retries" ) type botSettings map[string]string @@ -60,14 +60,14 @@ func (s botSettings) DKIMPrivateKey() string { return s.Get(botOptionDKIMPrivateKey) } -// QueueItems option -func (s botSettings) QueueItems() int { - return utils.Int(s.Get(botOptionQueueItems)) +// QueueBatch option +func (s botSettings) QueueBatch() int { + return utils.Int(s.Get(botOptionQueueBatch)) } -// QueueTries option -func (s botSettings) QueueTries() int { - return utils.Int(s.Get(botOptionQueueTries)) +// QueueRetries option +func (s botSettings) QueueRetries() int { + return utils.Int(s.Get(botOptionQueueRetries)) } func (b *Bot) initBotUsers() ([]string, error) { From e68d419da407c24ecb0f6d9504c19d635890802e Mon Sep 17 00:00:00 2001 From: Aine Date: Tue, 15 Nov 2022 09:46:35 +0200 Subject: [PATCH 4/4] update readme --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 1dfede3..fbaa9f1 100644 --- a/README.md +++ b/README.md @@ -263,6 +263,8 @@ If you want to change them - check available options in the help message (`!pm h * **!pm dkim** - Get DKIM signature * **!pm catch-all** - Configure catch-all mailbox +* **!pm queue:batch** - max amount of emails to process on each queue check +* **!pm queue:retries** - max amount of tries per email in queue before removal * **!pm users** - Get or set allowed users patterns * **!pm mailboxes** - Show the list of all mailboxes * **!pm delete** <mailbox> - Delete specific mailbox