Merge branch 'queue' into 'main'

mail queue

See merge request etke.cc/postmoogle!39
This commit is contained in:
Aine
2022-11-15 08:21:47 +00:00
11 changed files with 326 additions and 79 deletions

View File

@@ -263,6 +263,8 @@ If you want to change them - check available options in the help message (`!pm h
* **!pm dkim** - Get DKIM signature
* **!pm catch-all** - Configure catch-all mailbox
* **!pm queue:batch** - max amount of emails to process on each queue check
* **!pm queue:retries** - max amount of tries per email in queue before removal
* **!pm users** - Get or set allowed users patterns
* **!pm mailboxes** - Show the list of all mailboxes
* **!pm delete** <mailbox> - Delete specific mailbox

View File

@@ -25,7 +25,7 @@ type Bot struct {
sendmail func(string, string, string) error
log *logger.Logger
lp *linkpearl.Linkpearl
mu map[id.RoomID]*sync.Mutex
mu map[string]*sync.Mutex
handledMembershipEvents sync.Map
}
@@ -43,7 +43,7 @@ func New(
rooms: sync.Map{},
log: log,
lp: lp,
mu: map[id.RoomID]*sync.Mutex{},
mu: map[string]*sync.Mutex{},
}
users, err := b.initBotUsers()
if err != nil {

View File

@@ -13,14 +13,16 @@ import (
)
const (
commandHelp = "help"
commandStop = "stop"
commandSend = "send"
commandDKIM = "dkim"
commandCatchAll = botOptionCatchAll
commandUsers = botOptionUsers
commandDelete = "delete"
commandMailboxes = "mailboxes"
commandHelp = "help"
commandStop = "stop"
commandSend = "send"
commandDKIM = "dkim"
commandCatchAll = botOptionCatchAll
commandUsers = botOptionUsers
commandQueueBatch = botOptionQueueBatch
commandQueueRetries = botOptionQueueRetries
commandDelete = "delete"
commandMailboxes = "mailboxes"
)
type (
@@ -181,6 +183,18 @@ func (b *Bot) initCommands() commandList {
description: "Get or set catch-all mailbox",
allowed: b.allowAdmin,
},
{
key: commandQueueBatch,
description: "max amount of emails to process on each queue check",
sanitizer: utils.SanitizeIntString,
allowed: b.allowAdmin,
},
{
key: commandQueueRetries,
description: "max amount of tries per email in queue before removal",
sanitizer: utils.SanitizeIntString,
allowed: b.allowAdmin,
},
{
key: commandMailboxes,
description: "Show the list of all mailboxes",
@@ -359,20 +373,25 @@ func (b *Bot) runSend(ctx context.Context) {
}
}
b.lock(evt.RoomID)
defer b.unlock(evt.RoomID)
b.lock(evt.RoomID.String())
defer b.unlock(evt.RoomID.String())
from := mailbox + "@" + b.domains[0]
ID := utils.MessageID(evt.ID, b.domains[0])
for _, to := range tos {
email := utils.NewEmail(ID, "", " "+ID, subject, from, to, body, "", nil)
data := email.Compose(b.getBotSettings().DKIMPrivateKey())
err = b.sendmail(from, to, data)
queued, err := b.Sendmail(evt.ID, from, to, data)
if queued {
b.log.Error("cannot send email: %v", err)
b.saveSentMetadata(ctx, queued, evt.ID, email, &cfg)
continue
}
if err != nil {
b.Error(ctx, evt.RoomID, "cannot send email to %s: %v", to, err)
} else {
b.saveSentMetadata(ctx, evt.ID, email, &cfg)
continue
}
b.saveSentMetadata(ctx, false, evt.ID, email, &cfg)
}
if len(tos) > 1 {
b.SendNotice(ctx, evt.RoomID, "All emails were sent.")

View File

@@ -15,6 +15,7 @@ import (
// account data keys
const (
acQueueKey = "cc.etke.postmoogle.mailqueue"
acMessagePrefix = "cc.etke.postmoogle.message"
acLastEventPrefix = "cc.etke.postmoogle.last"
)
@@ -34,6 +35,21 @@ func (b *Bot) SetSendmail(sendmail func(string, string, string) error) {
b.sendmail = sendmail
}
// Sendmail tries to send email immediately, but if it gets 4xx error (greylisting),
// the email will be added to the queue and retried several times after that
func (b *Bot) Sendmail(eventID id.EventID, from, to, data string) (bool, error) {
err := b.sendmail(from, to, data)
if err != nil {
if strings.HasPrefix(err.Error(), "45") {
b.log.Debug("email %s (from=%s to=%s) was added to the queue: %v", eventID, from, to, err)
return true, b.enqueueEmail(eventID.String(), from, to, data)
}
return false, err
}
return false, nil
}
// GetDKIMprivkey returns DKIM private key
func (b *Bot) GetDKIMprivkey() string {
return b.getBotSettings().DKIMPrivateKey()
@@ -89,8 +105,8 @@ func (b *Bot) IncomingEmail(ctx context.Context, email *utils.Email) error {
b.Error(ctx, roomID, "cannot get settings: %v", err)
}
b.lock(roomID)
defer b.unlock(roomID)
b.lock(roomID.String())
defer b.unlock(roomID.String())
var threadID id.EventID
if email.InReplyTo != "" || email.References != "" {
@@ -119,6 +135,63 @@ func (b *Bot) IncomingEmail(ctx context.Context, email *utils.Email) error {
return nil
}
// SendEmailReply sends replies from matrix thread to email thread
func (b *Bot) SendEmailReply(ctx context.Context) {
evt := eventFromContext(ctx)
cfg, err := b.getRoomSettings(evt.RoomID)
if err != nil {
b.Error(ctx, evt.RoomID, "cannot retrieve room settings: %v", err)
return
}
mailbox := cfg.Mailbox()
if mailbox == "" {
b.Error(ctx, evt.RoomID, "mailbox is not configured, kupo")
return
}
b.lock(evt.RoomID.String())
defer b.unlock(evt.RoomID.String())
fromMailbox := mailbox + "@" + b.domains[0]
meta := b.getParentEmail(evt)
// when email was sent from matrix and reply was sent from matrix again
if fromMailbox != meta.From {
meta.To = meta.From
}
if meta.To == "" {
b.Error(ctx, evt.RoomID, "cannot find parent email and continue the thread. Please, start a new email thread")
return
}
meta.ThreadID = b.getThreadID(evt.RoomID, meta.InReplyTo, meta.References)
content := evt.Content.AsMessage()
if meta.Subject == "" {
meta.Subject = strings.SplitN(content.Body, "\n", 1)[0]
}
body := content.Body
ID := utils.MessageID(evt.ID, b.domains[0])
meta.References = meta.References + " " + ID
b.log.Debug("send email reply ID=%s meta=%+v", ID, meta)
email := utils.NewEmail(ID, meta.InReplyTo, meta.References, meta.Subject, fromMailbox, meta.To, body, "", nil)
data := email.Compose(b.getBotSettings().DKIMPrivateKey())
queued, err := b.Sendmail(evt.ID, meta.From, meta.To, data)
if queued {
b.log.Error("cannot send email: %v", err)
b.saveSentMetadata(ctx, queued, meta.ThreadID, email, &cfg)
return
}
if err != nil {
b.Error(ctx, evt.RoomID, "cannot send email: %v", err)
return
}
b.saveSentMetadata(ctx, queued, meta.ThreadID, email, &cfg)
}
type parentEmail struct {
MessageID string
ThreadID id.EventID
@@ -196,62 +269,17 @@ func (b *Bot) getParentEmail(evt *event.Event) parentEmail {
return parent
}
// SendEmailReply sends replies from matrix thread to email thread
func (b *Bot) SendEmailReply(ctx context.Context) {
evt := eventFromContext(ctx)
cfg, err := b.getRoomSettings(evt.RoomID)
if err != nil {
b.Error(ctx, evt.RoomID, "cannot retrieve room settings: %v", err)
return
}
mailbox := cfg.Mailbox()
if mailbox == "" {
b.Error(ctx, evt.RoomID, "mailbox is not configured, kupo")
return
}
b.lock(evt.RoomID)
defer b.unlock(evt.RoomID)
fromMailbox := mailbox + "@" + b.domains[0]
meta := b.getParentEmail(evt)
// when email was sent from matrix and reply was sent from matrix again
if fromMailbox != meta.From {
meta.To = meta.From
}
if meta.To == "" {
b.Error(ctx, evt.RoomID, "cannot find parent email and continue the thread. Please, start a new email thread")
return
}
meta.ThreadID = b.getThreadID(evt.RoomID, meta.InReplyTo, meta.References)
content := evt.Content.AsMessage()
if meta.Subject == "" {
meta.Subject = strings.SplitN(content.Body, "\n", 1)[0]
}
body := content.Body
ID := utils.MessageID(evt.ID, b.domains[0])
meta.References = meta.References + " " + ID
b.log.Debug("send email reply ID=%s meta=%+v", ID, meta)
email := utils.NewEmail(ID, meta.InReplyTo, meta.References, meta.Subject, fromMailbox, meta.To, body, "", nil)
data := email.Compose(b.getBotSettings().DKIMPrivateKey())
err = b.sendmail(meta.From, meta.To, data)
if err != nil {
b.Error(ctx, evt.RoomID, "cannot send email: %v", err)
return
}
b.saveSentMetadata(ctx, meta.ThreadID, email, &cfg)
}
// saveSentMetadata used to save metadata from !pm sent and thread reply events to a separate notice message
// because that metadata is needed to determine email thread relations
func (b *Bot) saveSentMetadata(ctx context.Context, threadID id.EventID, email *utils.Email, cfg *roomSettings) {
func (b *Bot) saveSentMetadata(ctx context.Context, queued bool, threadID id.EventID, email *utils.Email, cfg *roomSettings) {
text := "Email has been sent to " + email.To
if queued {
text = "Email to " + email.To + " has been queued"
}
evt := eventFromContext(ctx)
content := email.Content(threadID, cfg.ContentOptions())
notice := format.RenderMarkdown("Email has been sent to "+email.To, true, true)
notice := format.RenderMarkdown(text, true, true)
notice.MsgType = event.MsgNotice
msgContent, ok := content.Parsed.(event.MessageEventContent)
if !ok {

View File

@@ -2,25 +2,23 @@ package bot
import (
"sync"
"maunium.net/go/mautrix/id"
)
func (b *Bot) lock(roomID id.RoomID) {
_, ok := b.mu[roomID]
func (b *Bot) lock(key string) {
_, ok := b.mu[key]
if !ok {
b.mu[roomID] = &sync.Mutex{}
b.mu[key] = &sync.Mutex{}
}
b.mu[roomID].Lock()
b.mu[key].Lock()
}
func (b *Bot) unlock(roomID id.RoomID) {
_, ok := b.mu[roomID]
func (b *Bot) unlock(key string) {
_, ok := b.mu[key]
if !ok {
return
}
b.mu[roomID].Unlock()
delete(b.mu, roomID)
b.mu[key].Unlock()
delete(b.mu, key)
}

153
bot/queue.go Normal file
View File

@@ -0,0 +1,153 @@
package bot
import (
"strconv"
)
const (
defaultQueueBatch = 1
defaultQueueRetries = 3
)
// ProcessQueue starts queue processing
func (b *Bot) ProcessQueue() {
b.log.Debug("staring queue processing...")
cfg := b.getBotSettings()
batchSize := cfg.QueueBatch()
if batchSize == 0 {
batchSize = defaultQueueBatch
}
retries := cfg.QueueRetries()
if retries == 0 {
retries = defaultQueueRetries
}
b.popqueue(batchSize, retries)
b.log.Debug("ended queue processing")
}
// popqueue gets emails from queue and tries to send them,
// if an email was sent successfully - it will be removed from queue
func (b *Bot) popqueue(batchSize, maxTries int) {
b.lock(acQueueKey)
defer b.unlock(acQueueKey)
index, err := b.lp.GetAccountData(acQueueKey)
if err != nil {
b.log.Error("cannot get queue index: %v", err)
}
i := 0
for id, itemkey := range index {
if i > batchSize {
b.log.Debug("finished re-deliveries from queue")
return
}
if dequeue := b.processQueueItem(itemkey, maxTries); dequeue {
b.log.Debug("email %s has been delivered", id)
err = b.dequeueEmail(id)
if err != nil {
b.log.Error("cannot dequeue email %s: %v", id, err)
}
}
i++
}
}
// processQueueItem tries to process an item from queue
// returns should the item be dequeued or not
func (b *Bot) processQueueItem(itemkey string, maxRetries int) bool {
b.lock(itemkey)
defer b.unlock(itemkey)
item, err := b.lp.GetAccountData(itemkey)
if err != nil {
b.log.Error("cannot retrieve a queue item %s: %v", itemkey, err)
return false
}
b.log.Debug("processing queue item %+v", item)
attempts, err := strconv.Atoi(item["attempts"])
if err != nil {
b.log.Error("cannot parse attempts of %s: %v", itemkey, err)
return false
}
if attempts > maxRetries {
return true
}
err = b.sendmail(item["from"], item["to"], item["data"])
if err == nil {
b.log.Debug("email %s from queue was delivered")
return true
}
b.log.Debug("attempted to deliver email id=%s, retry=%s, but it's not ready yet: %v", item["id"], item["attempts"], err)
attempts++
item["attempts"] = strconv.Itoa(attempts)
err = b.lp.SetAccountData(itemkey, item)
if err != nil {
b.log.Error("cannot update attempt count on email %s: %v", itemkey, err)
}
return false
}
// enqueueEmail adds an email to the queue
func (b *Bot) enqueueEmail(id, from, to, data string) error {
itemkey := acQueueKey + "." + id
item := map[string]string{
"attempts": "0",
"data": data,
"from": from,
"to": to,
"id": id,
}
b.lock(itemkey)
defer b.unlock(itemkey)
err := b.lp.SetAccountData(itemkey, item)
if err != nil {
b.log.Error("cannot enqueue email id=%s: %v", id, err)
return err
}
b.lock(acQueueKey)
defer b.unlock(acQueueKey)
queueIndex, err := b.lp.GetAccountData(acQueueKey)
if err != nil {
b.log.Error("cannot get queue index: %v", err)
return err
}
queueIndex[id] = itemkey
err = b.lp.SetAccountData(acQueueKey, queueIndex)
if err != nil {
b.log.Error("cannot save queue index: %v", err)
return err
}
return nil
}
// dequeueEmail removes an email from the queue
func (b *Bot) dequeueEmail(id string) error {
index, err := b.lp.GetAccountData(acQueueKey)
if err != nil {
b.log.Error("cannot get queue index: %v", err)
return err
}
itemkey := index[id]
if itemkey == "" {
itemkey = acQueueKey + "." + id
}
delete(index, id)
err = b.lp.SetAccountData(acQueueKey, index)
if err != nil {
b.log.Error("cannot update queue index: %v", err)
return err
}
b.lock(itemkey)
defer b.unlock(itemkey)
return b.lp.SetAccountData(itemkey, nil)
}

View File

@@ -15,6 +15,8 @@ const (
botOptionCatchAll = "catch-all"
botOptionDKIMSignature = "dkim.pub"
botOptionDKIMPrivateKey = "dkim.pem"
botOptionQueueBatch = "queue:batch"
botOptionQueueRetries = "queue:retries"
)
type botSettings map[string]string
@@ -58,6 +60,16 @@ func (s botSettings) DKIMPrivateKey() string {
return s.Get(botOptionDKIMPrivateKey)
}
// QueueBatch option
func (s botSettings) QueueBatch() int {
return utils.Int(s.Get(botOptionQueueBatch))
}
// QueueRetries option
func (s botSettings) QueueRetries() int {
return utils.Int(s.Get(botOptionQueueRetries))
}
func (b *Bot) initBotUsers() ([]string, error) {
config := b.getBotSettings()
cfgUsers := config.Users()

View File

@@ -10,6 +10,7 @@ import (
"github.com/getsentry/sentry-go"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
"github.com/mileusna/crontab"
"gitlab.com/etke.cc/go/logger"
"gitlab.com/etke.cc/linkpearl"
lpcfg "gitlab.com/etke.cc/linkpearl/config"
@@ -22,6 +23,7 @@ import (
var (
mxb *bot.Bot
cron *crontab.Crontab
smtpm *smtp.Manager
log *logger.Logger
)
@@ -42,6 +44,7 @@ func main() {
initSentry(cfg)
initBot(cfg)
initSMTP(cfg)
initCron()
initShutdown(quit)
defer recovery()
@@ -111,6 +114,15 @@ func initSMTP(cfg *config.Config) {
})
}
func initCron() {
cron = crontab.New()
err := cron.AddJob("* * * * *", mxb.ProcessQueue)
if err != nil {
log.Error("cannot start ProcessQueue cronjob: %v", err)
}
}
func initShutdown(quit chan struct{}) {
listener := make(chan os.Signal, 1)
signal.Notify(listener, os.Interrupt, syscall.SIGABRT, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
@@ -134,6 +146,7 @@ func startBot(statusMsg string) {
func shutdown() {
log.Info("Shutting down...")
cron.Shutdown()
smtpm.Stop()
mxb.Stop()

1
go.mod
View File

@@ -13,6 +13,7 @@ require (
github.com/jhillyerd/enmime v0.10.0
github.com/lib/pq v1.10.7
github.com/mattn/go-sqlite3 v1.14.15
github.com/mileusna/crontab v1.2.0
github.com/raja/argon2pw v1.0.2-0.20210910183755-a391af63bd39
gitlab.com/etke.cc/go/env v1.0.0
gitlab.com/etke.cc/go/logger v1.1.0

2
go.sum
View File

@@ -54,6 +54,8 @@ github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOj
github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mikesmitty/edkey v0.0.0-20170222072505-3356ea4e686a h1:eU8j/ClY2Ty3qdHnn0TyW3ivFoPC/0F1gQZz8yTxbbE=
github.com/mikesmitty/edkey v0.0.0-20170222072505-3356ea4e686a/go.mod h1:v8eSC2SMp9/7FTKUncp7fH9IwPfw+ysMObcEz5FWheQ=
github.com/mileusna/crontab v1.2.0 h1:x9ZmE2A4p6CDqMEGQ+GbqsNtnmbdmWMQYShdQu8LvrU=
github.com/mileusna/crontab v1.2.0/go.mod h1:dbns64w/u3tUnGZGf8pAa76ZqOfeBX4olW4U1ZwExmc=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=

View File

@@ -59,6 +59,25 @@ func SanitizeBoolString(str string) string {
return strconv.FormatBool(Bool(str))
}
// Int converts string to integer
func Int(str string) int {
if str == "" {
return 0
}
i, err := strconv.Atoi(str)
if err != nil {
return 0
}
return i
}
// SanitizeBoolString converts string to integer and back to string
func SanitizeIntString(str string) string {
return strconv.Itoa(Int(str))
}
// StringSlice converts comma-separated string to slice
func StringSlice(str string) []string {
if str == "" {