mail queue
This commit is contained in:
@@ -25,7 +25,7 @@ type Bot struct {
|
||||
sendmail func(string, string, string) error
|
||||
log *logger.Logger
|
||||
lp *linkpearl.Linkpearl
|
||||
mu map[id.RoomID]*sync.Mutex
|
||||
mu map[string]*sync.Mutex
|
||||
handledMembershipEvents sync.Map
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ func New(
|
||||
rooms: sync.Map{},
|
||||
log: log,
|
||||
lp: lp,
|
||||
mu: map[id.RoomID]*sync.Mutex{},
|
||||
mu: map[string]*sync.Mutex{},
|
||||
}
|
||||
users, err := b.initBotUsers()
|
||||
if err != nil {
|
||||
|
||||
@@ -13,14 +13,16 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
commandHelp = "help"
|
||||
commandStop = "stop"
|
||||
commandSend = "send"
|
||||
commandDKIM = "dkim"
|
||||
commandCatchAll = botOptionCatchAll
|
||||
commandUsers = botOptionUsers
|
||||
commandDelete = "delete"
|
||||
commandMailboxes = "mailboxes"
|
||||
commandHelp = "help"
|
||||
commandStop = "stop"
|
||||
commandSend = "send"
|
||||
commandDKIM = "dkim"
|
||||
commandCatchAll = botOptionCatchAll
|
||||
commandUsers = botOptionUsers
|
||||
commandQueueItems = botOptionQueueItems
|
||||
commandQueueTries = botOptionQueueTries
|
||||
commandDelete = "delete"
|
||||
commandMailboxes = "mailboxes"
|
||||
)
|
||||
|
||||
type (
|
||||
@@ -181,6 +183,18 @@ func (b *Bot) initCommands() commandList {
|
||||
description: "Get or set catch-all mailbox",
|
||||
allowed: b.allowAdmin,
|
||||
},
|
||||
{
|
||||
key: commandQueueItems,
|
||||
description: "max amount of emails to process on each queue check",
|
||||
sanitizer: utils.SanitizeIntString,
|
||||
allowed: b.allowAdmin,
|
||||
},
|
||||
{
|
||||
key: commandQueueTries,
|
||||
description: "max amount of tries per email in queue before removal",
|
||||
sanitizer: utils.SanitizeIntString,
|
||||
allowed: b.allowAdmin,
|
||||
},
|
||||
{
|
||||
key: commandMailboxes,
|
||||
description: "Show the list of all mailboxes",
|
||||
@@ -359,8 +373,8 @@ func (b *Bot) runSend(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
b.lock(evt.RoomID)
|
||||
defer b.unlock(evt.RoomID)
|
||||
b.lock(evt.RoomID.String())
|
||||
defer b.unlock(evt.RoomID.String())
|
||||
|
||||
from := mailbox + "@" + b.domains[0]
|
||||
ID := utils.MessageID(evt.ID, b.domains[0])
|
||||
|
||||
120
bot/email.go
120
bot/email.go
@@ -15,6 +15,7 @@ import (
|
||||
|
||||
// account data keys
|
||||
const (
|
||||
acQueueKey = "cc.etke.postmoogle.mailqueue"
|
||||
acMessagePrefix = "cc.etke.postmoogle.message"
|
||||
acLastEventPrefix = "cc.etke.postmoogle.last"
|
||||
)
|
||||
@@ -34,6 +35,21 @@ func (b *Bot) SetSendmail(sendmail func(string, string, string) error) {
|
||||
b.sendmail = sendmail
|
||||
}
|
||||
|
||||
// Sendmail tries to send email immediately, but if it gets 4xx error (greylisting),
|
||||
// the email will be added to the queue and retried several times after that
|
||||
func (b *Bot) Sendmail(eventID id.EventID, from, to, data string) error {
|
||||
err := b.sendmail(from, to, data)
|
||||
if err != nil {
|
||||
if strings.HasPrefix(err.Error(), "45") {
|
||||
b.log.Debug("email %s (from=%s to=%s) was added to the queue: %v", eventID, from, to, err)
|
||||
return b.enqueueEmail(eventID.String(), from, to, data)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDKIMprivkey returns DKIM private key
|
||||
func (b *Bot) GetDKIMprivkey() string {
|
||||
return b.getBotSettings().DKIMPrivateKey()
|
||||
@@ -89,8 +105,8 @@ func (b *Bot) IncomingEmail(ctx context.Context, email *utils.Email) error {
|
||||
b.Error(ctx, roomID, "cannot get settings: %v", err)
|
||||
}
|
||||
|
||||
b.lock(roomID)
|
||||
defer b.unlock(roomID)
|
||||
b.lock(roomID.String())
|
||||
defer b.unlock(roomID.String())
|
||||
|
||||
var threadID id.EventID
|
||||
if email.InReplyTo != "" || email.References != "" {
|
||||
@@ -119,6 +135,56 @@ func (b *Bot) IncomingEmail(ctx context.Context, email *utils.Email) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendEmailReply sends replies from matrix thread to email thread
|
||||
func (b *Bot) SendEmailReply(ctx context.Context) {
|
||||
evt := eventFromContext(ctx)
|
||||
cfg, err := b.getRoomSettings(evt.RoomID)
|
||||
if err != nil {
|
||||
b.Error(ctx, evt.RoomID, "cannot retrieve room settings: %v", err)
|
||||
return
|
||||
}
|
||||
mailbox := cfg.Mailbox()
|
||||
if mailbox == "" {
|
||||
b.Error(ctx, evt.RoomID, "mailbox is not configured, kupo")
|
||||
return
|
||||
}
|
||||
|
||||
b.lock(evt.RoomID.String())
|
||||
defer b.unlock(evt.RoomID.String())
|
||||
|
||||
fromMailbox := mailbox + "@" + b.domains[0]
|
||||
meta := b.getParentEmail(evt)
|
||||
// when email was sent from matrix and reply was sent from matrix again
|
||||
if fromMailbox != meta.From {
|
||||
meta.To = meta.From
|
||||
}
|
||||
|
||||
if meta.To == "" {
|
||||
b.Error(ctx, evt.RoomID, "cannot find parent email and continue the thread. Please, start a new email thread")
|
||||
return
|
||||
}
|
||||
|
||||
meta.ThreadID = b.getThreadID(evt.RoomID, meta.InReplyTo, meta.References)
|
||||
content := evt.Content.AsMessage()
|
||||
if meta.Subject == "" {
|
||||
meta.Subject = strings.SplitN(content.Body, "\n", 1)[0]
|
||||
}
|
||||
body := content.Body
|
||||
|
||||
ID := utils.MessageID(evt.ID, b.domains[0])
|
||||
meta.References = meta.References + " " + ID
|
||||
b.log.Debug("send email reply ID=%s meta=%+v", ID, meta)
|
||||
email := utils.NewEmail(ID, meta.InReplyTo, meta.References, meta.Subject, fromMailbox, meta.To, body, "", nil)
|
||||
data := email.Compose(b.getBotSettings().DKIMPrivateKey())
|
||||
|
||||
err = b.Sendmail(evt.ID, meta.From, meta.To, data)
|
||||
if err != nil {
|
||||
b.Error(ctx, evt.RoomID, "cannot send email: %v", err)
|
||||
return
|
||||
}
|
||||
b.saveSentMetadata(ctx, meta.ThreadID, email, &cfg)
|
||||
}
|
||||
|
||||
type parentEmail struct {
|
||||
MessageID string
|
||||
ThreadID id.EventID
|
||||
@@ -196,56 +262,6 @@ func (b *Bot) getParentEmail(evt *event.Event) parentEmail {
|
||||
return parent
|
||||
}
|
||||
|
||||
// SendEmailReply sends replies from matrix thread to email thread
|
||||
func (b *Bot) SendEmailReply(ctx context.Context) {
|
||||
evt := eventFromContext(ctx)
|
||||
cfg, err := b.getRoomSettings(evt.RoomID)
|
||||
if err != nil {
|
||||
b.Error(ctx, evt.RoomID, "cannot retrieve room settings: %v", err)
|
||||
return
|
||||
}
|
||||
mailbox := cfg.Mailbox()
|
||||
if mailbox == "" {
|
||||
b.Error(ctx, evt.RoomID, "mailbox is not configured, kupo")
|
||||
return
|
||||
}
|
||||
|
||||
b.lock(evt.RoomID)
|
||||
defer b.unlock(evt.RoomID)
|
||||
|
||||
fromMailbox := mailbox + "@" + b.domains[0]
|
||||
meta := b.getParentEmail(evt)
|
||||
// when email was sent from matrix and reply was sent from matrix again
|
||||
if fromMailbox != meta.From {
|
||||
meta.To = meta.From
|
||||
}
|
||||
|
||||
if meta.To == "" {
|
||||
b.Error(ctx, evt.RoomID, "cannot find parent email and continue the thread. Please, start a new email thread")
|
||||
return
|
||||
}
|
||||
|
||||
meta.ThreadID = b.getThreadID(evt.RoomID, meta.InReplyTo, meta.References)
|
||||
content := evt.Content.AsMessage()
|
||||
if meta.Subject == "" {
|
||||
meta.Subject = strings.SplitN(content.Body, "\n", 1)[0]
|
||||
}
|
||||
body := content.Body
|
||||
|
||||
ID := utils.MessageID(evt.ID, b.domains[0])
|
||||
meta.References = meta.References + " " + ID
|
||||
b.log.Debug("send email reply ID=%s meta=%+v", ID, meta)
|
||||
email := utils.NewEmail(ID, meta.InReplyTo, meta.References, meta.Subject, fromMailbox, meta.To, body, "", nil)
|
||||
data := email.Compose(b.getBotSettings().DKIMPrivateKey())
|
||||
|
||||
err = b.sendmail(meta.From, meta.To, data)
|
||||
if err != nil {
|
||||
b.Error(ctx, evt.RoomID, "cannot send email: %v", err)
|
||||
return
|
||||
}
|
||||
b.saveSentMetadata(ctx, meta.ThreadID, email, &cfg)
|
||||
}
|
||||
|
||||
// saveSentMetadata used to save metadata from !pm sent and thread reply events to a separate notice message
|
||||
// because that metadata is needed to determine email thread relations
|
||||
func (b *Bot) saveSentMetadata(ctx context.Context, threadID id.EventID, email *utils.Email, cfg *roomSettings) {
|
||||
|
||||
18
bot/mutex.go
18
bot/mutex.go
@@ -2,25 +2,23 @@ package bot
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"maunium.net/go/mautrix/id"
|
||||
)
|
||||
|
||||
func (b *Bot) lock(roomID id.RoomID) {
|
||||
_, ok := b.mu[roomID]
|
||||
func (b *Bot) lock(key string) {
|
||||
_, ok := b.mu[key]
|
||||
if !ok {
|
||||
b.mu[roomID] = &sync.Mutex{}
|
||||
b.mu[key] = &sync.Mutex{}
|
||||
}
|
||||
|
||||
b.mu[roomID].Lock()
|
||||
b.mu[key].Lock()
|
||||
}
|
||||
|
||||
func (b *Bot) unlock(roomID id.RoomID) {
|
||||
_, ok := b.mu[roomID]
|
||||
func (b *Bot) unlock(key string) {
|
||||
_, ok := b.mu[key]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
b.mu[roomID].Unlock()
|
||||
delete(b.mu, roomID)
|
||||
b.mu[key].Unlock()
|
||||
delete(b.mu, key)
|
||||
}
|
||||
|
||||
154
bot/queue.go
Normal file
154
bot/queue.go
Normal file
@@ -0,0 +1,154 @@
|
||||
package bot
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMaxQueueItems = 1
|
||||
defaultMaxQueueTries = 100
|
||||
)
|
||||
|
||||
// ProcessQueue starts queue processing
|
||||
func (b *Bot) ProcessQueue() {
|
||||
b.log.Debug("staring queue processing...")
|
||||
cfg := b.getBotSettings()
|
||||
|
||||
maxItems := cfg.QueueItems()
|
||||
if maxItems == 0 {
|
||||
maxItems = defaultMaxQueueItems
|
||||
}
|
||||
|
||||
maxTries := cfg.QueueTries()
|
||||
if maxTries == 0 {
|
||||
maxTries = defaultMaxQueueTries
|
||||
}
|
||||
|
||||
b.popqueue(maxItems, maxTries)
|
||||
b.log.Debug("ended queue processing")
|
||||
}
|
||||
|
||||
// popqueue gets emails from queue and tries to send them,
|
||||
// if an email was sent successfully - it will be removed from queue
|
||||
func (b *Bot) popqueue(maxItems, maxTries int) {
|
||||
b.lock(acQueueKey)
|
||||
defer b.unlock(acQueueKey)
|
||||
index, err := b.lp.GetAccountData(acQueueKey)
|
||||
if err != nil {
|
||||
b.log.Error("cannot get queue index: %v", err)
|
||||
}
|
||||
|
||||
i := 0
|
||||
for id, itemkey := range index {
|
||||
if i > maxItems {
|
||||
b.log.Debug("finished re-deliveries from queue")
|
||||
return
|
||||
}
|
||||
if dequeue := b.processQueueItem(itemkey, maxTries); dequeue {
|
||||
b.log.Debug("email %s has been delivered", id)
|
||||
err = b.dequeueEmail(id)
|
||||
if err != nil {
|
||||
b.log.Error("cannot dequeue email %s: %v", id, err)
|
||||
}
|
||||
}
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
// processQueueItem tries to process an item from queue
|
||||
// returns should the item be dequeued or not
|
||||
func (b *Bot) processQueueItem(itemkey string, maxRetries int) bool {
|
||||
b.lock(itemkey)
|
||||
defer b.unlock(itemkey)
|
||||
|
||||
item, err := b.lp.GetAccountData(itemkey)
|
||||
if err != nil {
|
||||
b.log.Error("cannot retrieve a queue item %s: %v", itemkey, err)
|
||||
return false
|
||||
}
|
||||
attempts, err := strconv.Atoi(item["attempts"])
|
||||
if err != nil {
|
||||
b.log.Error("cannot parse attempts of %s: %v", itemkey, err)
|
||||
return false
|
||||
}
|
||||
if attempts > maxRetries {
|
||||
return true
|
||||
}
|
||||
|
||||
err = b.sendmail(item["from"], item["to"], item["data"])
|
||||
if err == nil {
|
||||
b.log.Debug("email %s from queue was delivered")
|
||||
return true
|
||||
}
|
||||
|
||||
b.log.Debug("attempted to deliver email id=%s, retry=%s, but it's not ready yet: %v", item["id"], item["attempts"], err)
|
||||
attempts++
|
||||
item["attempts"] = strconv.Itoa(attempts)
|
||||
err = b.lp.SetAccountData(itemkey, item)
|
||||
if err != nil {
|
||||
b.log.Error("cannot update attempt count on email %s: %v", itemkey, err)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// enqueueEmail adds an email to the queue
|
||||
func (b *Bot) enqueueEmail(id, from, to, data string) error {
|
||||
itemkey := acQueueKey + "." + id
|
||||
item := map[string]string{
|
||||
"attemptedAt": time.Now().UTC().Format(time.RFC1123Z),
|
||||
"attempts": "0",
|
||||
"data": data,
|
||||
"from": from,
|
||||
"to": to,
|
||||
"id": id,
|
||||
}
|
||||
|
||||
b.lock(itemkey)
|
||||
defer b.unlock(itemkey)
|
||||
err := b.lp.SetAccountData(itemkey, item)
|
||||
if err != nil {
|
||||
b.log.Error("cannot enqueue email id=%s: %v", id, err)
|
||||
return err
|
||||
}
|
||||
|
||||
b.lock(acQueueKey)
|
||||
defer b.unlock(acQueueKey)
|
||||
queueIndex, err := b.lp.GetAccountData(acQueueKey)
|
||||
if err != nil {
|
||||
b.log.Error("cannot get queue index: %v", err)
|
||||
return err
|
||||
}
|
||||
queueIndex[id] = itemkey
|
||||
err = b.lp.SetAccountData(acQueueKey, queueIndex)
|
||||
if err != nil {
|
||||
b.log.Error("cannot save queue index: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// dequeueEmail removes an email from the queue
|
||||
func (b *Bot) dequeueEmail(id string) error {
|
||||
index, err := b.lp.GetAccountData(acQueueKey)
|
||||
if err != nil {
|
||||
b.log.Error("cannot get queue index: %v", err)
|
||||
return err
|
||||
}
|
||||
itemkey := index[id]
|
||||
if itemkey == "" {
|
||||
itemkey = acQueueKey + "." + id
|
||||
}
|
||||
delete(index, id)
|
||||
err = b.lp.SetAccountData(acQueueKey, index)
|
||||
if err != nil {
|
||||
b.log.Error("cannot update queue index: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
b.lock(itemkey)
|
||||
defer b.unlock(itemkey)
|
||||
return b.lp.SetAccountData(itemkey, nil)
|
||||
}
|
||||
@@ -15,6 +15,8 @@ const (
|
||||
botOptionCatchAll = "catch-all"
|
||||
botOptionDKIMSignature = "dkim.pub"
|
||||
botOptionDKIMPrivateKey = "dkim.pem"
|
||||
botOptionQueueItems = "queue:items"
|
||||
botOptionQueueTries = "queue:tries"
|
||||
)
|
||||
|
||||
type botSettings map[string]string
|
||||
@@ -58,6 +60,16 @@ func (s botSettings) DKIMPrivateKey() string {
|
||||
return s.Get(botOptionDKIMPrivateKey)
|
||||
}
|
||||
|
||||
// QueueItems option
|
||||
func (s botSettings) QueueItems() int {
|
||||
return utils.Int(s.Get(botOptionQueueItems))
|
||||
}
|
||||
|
||||
// QueueTries option
|
||||
func (s botSettings) QueueTries() int {
|
||||
return utils.Int(s.Get(botOptionQueueTries))
|
||||
}
|
||||
|
||||
func (b *Bot) initBotUsers() ([]string, error) {
|
||||
config := b.getBotSettings()
|
||||
cfgUsers := config.Users()
|
||||
|
||||
Reference in New Issue
Block a user