Source code for salmon.queue

"""
Simpler queue management than the regular mailbox.Maildir stuff.  You
do get a lot more features from the Python library, so if you need
to do some serious surgery go use that.  This works as a good
API for the 90% case of "put mail in, get mail out" queues.
"""
import errno
import hashlib
import logging
import mailbox
import os
import socket
import time

from salmon import mail

# we calculate this once, since the hostname shouldn't change for every
# email we put in a queue
HASHED_HOSTNAME = hashlib.md5(socket.gethostname().encode("utf-8")).hexdigest()


[docs]class SafeMaildir(mailbox.Maildir): def _create_tmp(self): now = time.time() uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(), mailbox.Maildir._count, HASHED_HOSTNAME) path = os.path.join(self._path, 'tmp', uniq) try: os.stat(path) except OSError as e: if e.errno == errno.ENOENT: mailbox.Maildir._count += 1 try: return mailbox._create_carefully(path) except OSError as e: if e.errno != errno.EEXIST: raise else: raise # Fall through to here if stat succeeded or open raised EEXIST. raise mailbox.ExternalClashError('Name clash prevented file creation: %s' % path)
[docs]class QueueError(Exception): def __init__(self, msg, data): Exception.__init__(self, msg) self._message = msg self.data = data
[docs]class Queue: """ Provides a simplified API for dealing with 'queues' in Salmon. It currently just supports Maildir queues since those are the most robust, but could implement others later. """ def __init__(self, queue_dir, safe=False, pop_limit=0, oversize_dir=None): """ This gives the Maildir queue directory to use, and whether you want this Queue to use the SafeMaildir variant which hashes the hostname so you can expose it publicly. The pop_limit and oversize_queue both set a upper limit on the mail you pop out of the queue. The size is checked before any Salmon processing is done and is based on the size of the file on disk. The purpose is to prevent people from sending 10MB attachments. If a message is over the pop_limit then it is placed into the oversize_dir (which should be a Maildir). The oversize protection only works on pop messages off, not putting them in, get, or any other call. If you use get you can use self.oversize to also check if it's oversize manually. """ self.dir = queue_dir if safe: self.mbox = SafeMaildir(queue_dir) else: self.mbox = mailbox.Maildir(queue_dir) self.pop_limit = pop_limit if oversize_dir: if not os.path.exists(oversize_dir): mailbox.Maildir(oversize_dir) self.oversize_dir = os.path.join(oversize_dir, "new") if not os.path.exists(self.oversize_dir): os.mkdir(self.oversize_dir) else: self.oversize_dir = None
[docs] def push(self, message): """ Pushes the message onto the queue. Remember the order is probably not maintained. It returns the key that gets created. """ if not isinstance(message, (str, bytes)): # bytes is ok, but anything else needs to be turned into str message = str(message) return self.mbox.add(message)
[docs] def pop(self): """ Pops a message off the queue, order is not really maintained like a stack. It returns a (key, message) tuple for that item. """ for key in self.mbox.iterkeys(): over, over_name = self.oversize(key) if over: if self.oversize_dir: logging.info("Message key %s over size limit %d, moving to %s.", key, self.pop_limit, self.oversize_dir) os.rename(over_name, os.path.join(self.oversize_dir, key)) else: logging.info("Message key %s over size limit %d, DELETING (set oversize_dir).", key, self.pop_limit) os.unlink(over_name) else: try: msg = self.get(key) except QueueError as exc: raise exc finally: self.remove(key) return key, msg return None, None
[docs] def get(self, key): """ Get the specific message referenced by the key. The message is NOT removed from the queue. """ msg_file = self.mbox.get_file(key) if not msg_file: return None msg_data = msg_file.read() try: return mail.MailRequest(self.dir, None, None, msg_data) except Exception as exc: logging.exception("Failed to decode message: %s; msg_data: %r", exc, msg_data) return None
[docs] def remove(self, key): """Removes the queue, but not returned.""" self.mbox.remove(key)
def __len__(self): """Returns the number of messages in the queue.""" return len(self.mbox) # synonym of __len__ for backwards compatibility count = __len__
[docs] def clear(self): """ Clears out the contents of the entire queue. Warning: This could be horribly inefficient since it pops messages until the queue is empty. It could also cause an infinite loop if another process is writing to messages to the Queue faster than we can pop. """ # man this is probably a really bad idea while len(self) > 0: self.pop()
[docs] def keys(self): """ Returns the keys in the queue. """ return self.mbox.keys()
[docs] def oversize(self, key): if self.pop_limit: file_name = os.path.join(self.dir, "new", key) return os.path.getsize(file_name) > self.pop_limit, file_name else: return False, None