big refactoring
This commit is contained in:
79
bot/queue/manager.go
Normal file
79
bot/queue/manager.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"gitlab.com/etke.cc/go/logger"
|
||||
"gitlab.com/etke.cc/linkpearl"
|
||||
|
||||
"gitlab.com/etke.cc/postmoogle/bot/config"
|
||||
"gitlab.com/etke.cc/postmoogle/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
acQueueKey = "cc.etke.postmoogle.mailqueue"
|
||||
defaultQueueBatch = 1
|
||||
defaultQueueRetries = 3
|
||||
)
|
||||
|
||||
// Queue manager
|
||||
type Queue struct {
|
||||
mu utils.Mutex
|
||||
lp *linkpearl.Linkpearl
|
||||
cfg *config.Manager
|
||||
log *logger.Logger
|
||||
sendmail func(string, string, string) error
|
||||
}
|
||||
|
||||
// New queue
|
||||
func New(lp *linkpearl.Linkpearl, cfg *config.Manager, log *logger.Logger) *Queue {
|
||||
return &Queue{
|
||||
mu: utils.Mutex{},
|
||||
lp: lp,
|
||||
cfg: cfg,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
// SetSendmail func
|
||||
func (q *Queue) SetSendmail(function func(string, string, string) error) {
|
||||
q.sendmail = function
|
||||
}
|
||||
|
||||
// Process queue
|
||||
func (q *Queue) Process() {
|
||||
q.log.Debug("staring queue processing...")
|
||||
cfg := q.cfg.GetBot()
|
||||
|
||||
batchSize := cfg.QueueBatch()
|
||||
if batchSize == 0 {
|
||||
batchSize = defaultQueueBatch
|
||||
}
|
||||
|
||||
maxRetries := cfg.QueueRetries()
|
||||
if maxRetries == 0 {
|
||||
maxRetries = defaultQueueRetries
|
||||
}
|
||||
|
||||
q.mu.Lock(acQueueKey)
|
||||
defer q.mu.Unlock(acQueueKey)
|
||||
index, err := q.lp.GetAccountData(acQueueKey)
|
||||
if err != nil {
|
||||
q.log.Error("cannot get queue index: %v", err)
|
||||
}
|
||||
|
||||
i := 0
|
||||
for id, itemkey := range index {
|
||||
if i > batchSize {
|
||||
q.log.Debug("finished re-deliveries from queue")
|
||||
return
|
||||
}
|
||||
if dequeue := q.try(itemkey, maxRetries); dequeue {
|
||||
q.log.Debug("email %q has been delivered", id)
|
||||
err = q.Remove(id)
|
||||
if err != nil {
|
||||
q.log.Error("cannot dequeue email %q: %v", id, err)
|
||||
}
|
||||
}
|
||||
i++
|
||||
}
|
||||
q.log.Debug("ended queue processing")
|
||||
}
|
||||
101
bot/queue/queue.go
Normal file
101
bot/queue/queue.go
Normal file
@@ -0,0 +1,101 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// Add to queue
|
||||
func (q *Queue) Add(id, from, to, data string) error {
|
||||
itemkey := acQueueKey + "." + id
|
||||
item := map[string]string{
|
||||
"attempts": "0",
|
||||
"data": data,
|
||||
"from": from,
|
||||
"to": to,
|
||||
"id": id,
|
||||
}
|
||||
|
||||
q.mu.Lock(itemkey)
|
||||
defer q.mu.Unlock(itemkey)
|
||||
err := q.lp.SetAccountData(itemkey, item)
|
||||
if err != nil {
|
||||
q.log.Error("cannot enqueue email id=%q: %v", id, err)
|
||||
return err
|
||||
}
|
||||
|
||||
q.mu.Lock(acQueueKey)
|
||||
defer q.mu.Unlock(acQueueKey)
|
||||
queueIndex, err := q.lp.GetAccountData(acQueueKey)
|
||||
if err != nil {
|
||||
q.log.Error("cannot get queue index: %v", err)
|
||||
return err
|
||||
}
|
||||
queueIndex[id] = itemkey
|
||||
err = q.lp.SetAccountData(acQueueKey, queueIndex)
|
||||
if err != nil {
|
||||
q.log.Error("cannot save queue index: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove from queue
|
||||
func (q *Queue) Remove(id string) error {
|
||||
index, err := q.lp.GetAccountData(acQueueKey)
|
||||
if err != nil {
|
||||
q.log.Error("cannot get queue index: %v", err)
|
||||
return err
|
||||
}
|
||||
itemkey := index[id]
|
||||
if itemkey == "" {
|
||||
itemkey = acQueueKey + "." + id
|
||||
}
|
||||
delete(index, id)
|
||||
err = q.lp.SetAccountData(acQueueKey, index)
|
||||
if err != nil {
|
||||
q.log.Error("cannot update queue index: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
q.mu.Lock(itemkey)
|
||||
defer q.mu.Unlock(itemkey)
|
||||
return q.lp.SetAccountData(itemkey, nil)
|
||||
}
|
||||
|
||||
// try to send email
|
||||
func (q *Queue) try(itemkey string, maxRetries int) bool {
|
||||
q.mu.Lock(itemkey)
|
||||
defer q.mu.Unlock(itemkey)
|
||||
|
||||
item, err := q.lp.GetAccountData(itemkey)
|
||||
if err != nil {
|
||||
q.log.Error("cannot retrieve a queue item %q: %v", itemkey, err)
|
||||
return false
|
||||
}
|
||||
q.log.Debug("processing queue item %+v", item)
|
||||
attempts, err := strconv.Atoi(item["attempts"])
|
||||
if err != nil {
|
||||
q.log.Error("cannot parse attempts of %q: %v", itemkey, err)
|
||||
return false
|
||||
}
|
||||
if attempts > maxRetries {
|
||||
return true
|
||||
}
|
||||
|
||||
err = q.sendmail(item["from"], item["to"], item["data"])
|
||||
if err == nil {
|
||||
q.log.Debug("email %q from queue was delivered")
|
||||
return true
|
||||
}
|
||||
|
||||
q.log.Debug("attempted to deliver email id=%q, retry=%q, but it's not ready yet: %v", item["id"], item["attempts"], err)
|
||||
attempts++
|
||||
item["attempts"] = strconv.Itoa(attempts)
|
||||
err = q.lp.SetAccountData(itemkey, item)
|
||||
if err != nil {
|
||||
q.log.Error("cannot update attempt count on email %q: %v", itemkey, err)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
Reference in New Issue
Block a user