Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 97 additions & 10 deletions mongoqueue/mongoqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

DEFAULT_INSERT = {
"priority": 0,
"time": None,
"period": 0,
"attempts": 0,
"locked_by": None,
"locked_at": None,
Expand Down Expand Up @@ -70,27 +72,84 @@ def repair(self):
"$inc": {"attempts": 1}}
)

def put(self, payload, priority=0):
def put(self, payload, priority=0, time=None, period=None):
"""Place a job into the queue
"""
job = dict(DEFAULT_INSERT)
job['priority'] = priority
job['time'] = time
# Store period as an integer representing the number of seconds
# because BSON format doesn't support timedelta
if period and type(period) == timedelta:
job['period'] = period.total_seconds()
job['payload'] = payload
if self.is_dupe(job):
return
return self.collection.insert(job)

def is_dupe(self, job):
jobs = self.collection.find({
'payload': job['payload'],
'time': job['time'],
'period': job['period'],
'attempts': 0},
limit=1
)
for job in jobs:
if job:
return True
return False

def next(self):
return self._wrap_one(self.collection.find_and_modify(
query={"locked_by": None,
"locked_at": None,
"attempts": {"$lt": self.max_attempts}},
update={"$set": {"attempts": 1,
"locked_by": self.consumer_id,
"locked_at": datetime.now()}},
sort=[('priority', pymongo.DESCENDING)],
new=1,
scheduled_job = self.next_scheduled_job()
free_job = self.next_free_job()
next_job = None

if scheduled_job and scheduled_job['time'] < datetime.utcnow():
next_job = scheduled_job
else:
next_job = free_job

return self._wrap_one(self.collection.find_and_modify({
"_id": next_job["_id"]
},
update={
"$set": {
"locked_by": self.consumer_id,
"locked_at": datetime.now()
}},
new=True,
limit=1
))

def next_scheduled_job(self):
jobs = self.collection.find({
"locked_by": None,
"locked_at": None,
"time": {"$ne": None},
"attempts": {"$lt": self.max_attempts}
},
sort=[('time', pymongo.ASCENDING)],
limit=1
)
for job in jobs:
return job
return None

def next_free_job(self):
jobs = self.collection.find({
"locked_by": None,
"locked_at": None,
"time": None,
"attempts": {"$lt": self.max_attempts}
},
sort=[('priority', pymongo.DESCENDING)],
limit=1
)
for job in jobs:
return job
return None

def _jobs(self):
return self.collection.find(
query={"locked_by": None,
Expand Down Expand Up @@ -153,6 +212,17 @@ def job_id(self):
def complete(self):
"""Job has been completed.
"""
if self._data['period']:
updated_time = self._data['time'] + timedelta(seconds=self._data['period'])

return self._queue.collection.find_and_modify(
{"_id": self.job_id, "locked_by": self._queue.consumer_id},
update={"$set":{
"locked_by": None,
"locked_at": None,
"time": updated_time
}})

return self._queue.collection.find_and_modify(
{"_id": self.job_id, "locked_by": self._queue.consumer_id},
remove=True)
Expand All @@ -166,6 +236,15 @@ def error(self, message=None):
"locked_by": None, "locked_at": None, "last_error": message},
"$inc": {"attempts": 1}})

if self._data['attempts'] == self._queue.max_attempts - 1 and self._data['period']:
updated_time = self._data['time'] + timedelta(seconds=self._data['period'])

self._queue.put(self._data['payload'],
priority=self._data['priority'],
time=updated_time,
period=timedelta(seconds=self._data['period']))


def progress(self, count=0):
"""Note progress on a long running task.
"""
Expand All @@ -181,6 +260,13 @@ def release(self):
update={"$set": {"locked_by": None, "locked_at": None},
"$inc": {"attempts": 1}})

def abort(self):
"""Intentionally terminate execution of a job, and remove it from the queue
"""
return self._queue.collection.find_and_modify(
{"_id": self.job_id, "locked_by": self._queue.consumer_id},
remove=True)

## Context Manager support

def __enter__(self):
Expand All @@ -192,3 +278,4 @@ def __exit__(self, type, value, tb):
else:
error = traceback.format_exc()
self.error(error)
return True
14 changes: 14 additions & 0 deletions mongoqueue/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@ def test_release(self):
job = self.queue.next()
self.assert_job_equal(job, data)

def test_max_attempts(self):
data = {"context_id": "alpha",
"ts": time.time()}
self.queue.put(dict(data))
attempts = 0
for i in xrange(0, self.queue.max_attempts):
job = self.queue.next()
if not job:
break
with job:
attempts += 1
raise Exception()
self.assertEqual(attempts, self.queue.max_attempts)

def test_error(self):
pass

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages

setup(name='mongoqueue',
version="0.7.2",
version="0.7.3",
classifiers=[
'Intended Audience :: Developers',
'Programming Language :: Python',
Expand Down