Skip to content
This repository has been archived by the owner on Jan 10, 2019. It is now read-only.

Fix for issues #4 and #5 #6

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
*.pyc
.*.swp
build/
.project
.pydevproject
.settings
21 changes: 15 additions & 6 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ def test_10_basic_job(self):
"""JOB publish, retrieve, finish, get result"""
#publisher
testjob = self.ps.job("testjob")
id = testjob.put(9.0)
id = testjob.put('9.0')

#worker
testjobworker = self.ps.job("testjob")
id_worker, query_worker = testjobworker.get(timeout=3)
id_worker, query_worker, cancelled = testjobworker.get(timeout=3)
result_worker = math.sqrt(float(query_worker))
testjobworker.finish(id_worker, result_worker, True)

Expand All @@ -49,19 +49,28 @@ def test_20_cancel_job(self):
"""Test cancelling a job"""
j = self.ps.job("testjob")
#publisher
id = j.put(9.0)
id = j.put('9.0')
#worker claims
id, query = j.get()
id, query, cancelled = j.get()
self.assertEqual(cancelled, 0)
#publisher or worker cancels
j.cancel(id)
id2, query2 = j.get()
id2, query2, cancelled2 = j.get()
self.assertEqual(cancelled2, 1)
self.assertEqual(id, id2)
#cancel the work again
j.cancel(id)
# check the cancelled increment again
id3, query3, cancelled3 = j.get()
self.assertEqual(cancelled3, 2)
self.assertEqual(id, id3)
#cleanup -- remove the job from the queue
j.retract(id)
self.assertEqual(j.get_ids(), [])


def test_30_no_job(self):
j = self.ps.job("testjob")
self.assertRaises(thoonk.feeds.queue.Empty, j.get, timeout=1)

suite = unittest.TestLoader().loadTestsFromTestCase(TestJob)

181 changes: 181 additions & 0 deletions tests/test_notice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import thoonk
import unittest
import time
from ConfigParser import ConfigParser

class TestNotice(unittest.TestCase):

def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)

conf = ConfigParser()
conf.read('test.cfg')
if conf.sections() == ['Test']:
self.ps = thoonk.Thoonk(host=conf.get('Test', 'host'),
port=conf.getint('Test', 'port'),
db=conf.getint('Test', 'db'),
listen=True)
self.ps.redis.flushdb()
else:
print 'No test configuration found in test.cfg'
exit()


def tearDown(self):
self.ps.close()

"claimed, cancelled, stalled, finished"

def test_05_publish_notice(self):
notice_received = [False]
ids = [None, None]

def received_handler(feed, item, id):
self.assertEqual(feed, "testfeed")
ids[1] = id
notice_received[0] = True

self.ps.register_handler('publish_notice', received_handler)

j = self.ps.feed("testfeed")

self.assertFalse(notice_received[0])

#publisher
ids[0] = j.publish('a')

# block while waiting for notice
i = 0
while not notice_received[0] and i < 3:
i += 1
time.sleep(1)

self.assertEqual(ids[0], ids[1])

self.assertTrue(notice_received[0], "Notice not received")

self.ps.remove_handler('publish_notice', received_handler)

def test_10_job_notices(self):
notices_received = [False]
ids = [None, None]

def publish_handler(feed, item, id):
self.assertEqual(feed, "testjob")
ids[-1] = id
notices_received[-1] = "publish"

def claimed_handler(feed, id):
self.assertEqual(feed, "testjob")
ids[-1] = id
notices_received[-1] = "claimed"

def cancelled_handler(feed, id):
self.assertEqual(feed, "testjob")
ids[-1] = id
notices_received[-1] = "cancelled"

def stalled_handler(feed, id):
self.assertEqual(feed, "testjob")
ids[-1] = id
notices_received[-1] = "stalled"

def retried_handler(feed, id):
self.assertEqual(feed, "testjob")
ids[-1] = id
notices_received[-1] = "retried"

def finished_handler(feed, id, result):
self.assertEqual(feed, "testjob")
ids[-1] = id
notices_received[-1] = "finished"

def do_wait():
i = 0
while not notices_received[-1] and i < 2:
i += 1
time.sleep(1)

self.ps.register_handler('publish_notice', publish_handler)
self.ps.register_handler('claimed_notice', claimed_handler)
self.ps.register_handler('cancelled_notice', cancelled_handler)
self.ps.register_handler('stalled_notice', stalled_handler)
self.ps.register_handler('retried_notice', retried_handler)
self.ps.register_handler('finished_notice', finished_handler)

j = self.ps.job("testjob")

self.assertFalse(notices_received[0])

# create the job
ids[0] = j.put('b')
do_wait()
self.assertEqual(notices_received[0], "publish", "Notice not received")
self.assertEqual(ids[0], ids[-1])

notices_received.append(False); ids.append(None);
# claim the job
id, job, cancelled = j.get()
self.assertEqual(job, 'b')
self.assertEqual(cancelled, 0)
self.assertEqual(ids[0], id)
do_wait()
self.assertEqual(notices_received[-1], "claimed", "Claimed notice not received")
self.assertEqual(ids[0], ids[-1])

notices_received.append(False); ids.append(None);
# cancel the job
j.cancel(id)
do_wait()
self.assertEqual(notices_received[-1], "cancelled", "Cancelled notice not received")
self.assertEqual(ids[0], ids[-1])

notices_received.append(False); ids.append(None);
# get the job again
id, job, cancelled = j.get()
self.assertEqual(job, 'b')
self.assertEqual(cancelled, 1)
self.assertEqual(ids[0], id)
do_wait()
self.assertEqual(notices_received[-1], "claimed", "Claimed notice not received")
self.assertEqual(ids[0], ids[-1])

notices_received.append(False); ids.append(None);
# stall the job
j.stall(id)
do_wait()
self.assertEqual(notices_received[-1], "stalled", "Stalled notice not received")
self.assertEqual(ids[0], ids[-1])

notices_received.append(False); ids.append(None);
# retry the job
j.retry(id)
do_wait()
self.assertEqual(notices_received[-1], "retried", "Retried notice not received")
self.assertEqual(ids[0], ids[-1])

notices_received.append(False); ids.append(None);
# get the job again
id, job, cancelled = j.get()
self.assertEqual(job, 'b')
self.assertEqual(cancelled, 0)
self.assertEqual(ids[0], id)
do_wait()
self.assertEqual(notices_received[-1], "claimed", "Claimed notice not received")
self.assertEqual(ids[0], ids[-1])

notices_received.append(False); ids.append(None);
# finish the job
j.finish(id)
do_wait()
self.assertEqual(notices_received[-1], "finished", "Finished notice not received")
self.assertEqual(ids[0], ids[-1])

self.ps.remove_handler('publish_notice', publish_handler)
self.ps.remove_handler('claimed_notice', claimed_handler)
self.ps.remove_handler('cancelled_notice', cancelled_handler)
self.ps.remove_handler('stalled_notice', stalled_handler)
self.ps.remove_handler('retried_notice', retried_handler)
self.ps.remove_handler('finished_notice', finished_handler)

suite = unittest.TestLoader().loadTestsFromTestCase(TestNotice)
2 changes: 1 addition & 1 deletion thoonk/feeds/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def config(self):
"""
with self.config_lock:
if not self.config_valid:
conf = self.redis.get(self.feed_config)
conf = self.redis.get(self.feed_config) or "{}"
self._config = json.loads(conf)
self.config_valid = True
return self._config
Expand Down
50 changes: 39 additions & 11 deletions thoonk/feeds/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

import time
import uuid
import redis

from thoonk.exceptions import *
from thoonk.feeds import Queue
from thoonk.feeds.queue import Empty


class JobDoesNotExist(Exception):
Expand Down Expand Up @@ -55,7 +57,7 @@ class Job(Queue):
feed.cancelled:[feed] -- A hash table of cancelled jobs.
feed.claimed:[feed] -- A hash table of claimed jobs.
feed.stalled:[feed] -- A hash table of stalled jobs.
feeed.funning:[feed] -- A hash table of running jobs.
feed.running:[feed] -- A hash table of running jobs.
feed.finished:[feed]\x00[id] -- Temporary queue for receiving job
result data.

Expand Down Expand Up @@ -90,24 +92,30 @@ def __init__(self, thoonk, feed, config=None):
"""
Queue.__init__(self, thoonk, feed, config=None)

self.feed_published = 'feed.published:%s' % feed
self.feed_publishes = 'feed.publishes:%s' % feed
self.feed_cancelled = 'feed.cancelled:%s' % feed
self.feed_retried = 'feed.retried:%s' % feed
self.feed_finished = 'feed.finished:%s' % feed
self.feed_job_claimed = 'feed.claimed:%s' % feed
self.feed_job_stalled = 'feed.stalled:%s' % feed
self.feed_job_finished = 'feed.finished:%s\x00%s' % (feed, '%s')
self.feed_job_running = 'feed.running:%s' % feed

def get_channels(self):
return (self.feed_publishes, self.feed_job_claimed, self.feed_job_stalled,
self.feed_finished, self.feed_cancelled, self.feed_retried)

def get_schemas(self):
"""Return the set of Redis keys used exclusively by this feed."""
schema = set((self.feed_job_claimed,
self.feed_job_stalled,
self.feed_job_running,
self.feed_published,
self.feed_publishes,
self.feed_cancelled))

for id in self.get_ids():
schema.add(self.feed_job_finished % id)

return schema.union(Queue.get_schemas(self))

def get_ids(self):
Expand All @@ -127,7 +135,7 @@ def retract(self, id):
pipe = self.redis.pipeline()
pipe.hdel(self.feed_items, id)
pipe.hdel(self.feed_cancelled, id)
pipe.zrem(self.feed_published, id)
pipe.zrem(self.feed_publishes, id)
pipe.srem(self.feed_job_stalled, id)
pipe.zrem(self.feed_job_claimed, id)
pipe.lrem(self.feed_ids, 1, id)
Expand Down Expand Up @@ -164,9 +172,16 @@ def put(self, item, priority=False):
pipe.lpush(self.feed_ids, id)
pipe.incr(self.feed_publishes)
pipe.hset(self.feed_items, id, item)
pipe.zadd(self.feed_published, id, time.time())
pipe.zadd(self.feed_publishes, id, time.time())

results = pipe.execute()

if results[-1]:
# If zadd was successful
self.thoonk._publish(self.feed_publishes, (id, item))
else:
self.thoonk._publish(self.feed_edit, (id, item))

return id

def get(self, timeout=0):
Expand All @@ -178,17 +193,26 @@ def get(self, timeout=0):
Arguments:
timeout -- Optional time in seconds to wait before
raising an exception.

Returns:
id -- The id of the job
job -- The job content
cancelled -- The number of times the job has been cancelled
"""
id = self.redis.brpop(self.feed_ids, timeout)
if id is None:
return # raise exception?
raise Empty
id = id[1]

pipe = self.redis.pipeline()
pipe.zadd(self.feed_job_claimed, id, time.time())
pipe.hget(self.feed_items, id)
pipe.hget(self.feed_cancelled, id)
result = pipe.execute()
return id, result[1]

self.thoonk._publish(self.feed_job_claimed, (id,))

return id, result[1], 0 if result[2] is None else int(result[2])

def finish(self, id, item=None, result=False, timeout=None):
"""
Expand Down Expand Up @@ -219,7 +243,8 @@ def finish(self, id, item=None, result=False, timeout=None):
pipe.expire(self.feed_job_finished % id, timeout)
pipe.hdel(self.feed_items, id)
try:
result = pipe.execute()
pipe.execute()
self.thoonk._publish(self.feed_finished, (id, result if result else ""))
break
except redis.exceptions.WatchError:
pass
Expand Down Expand Up @@ -256,6 +281,7 @@ def cancel(self, id):
pipe.zrem(self.feed_job_claimed, id)
try:
pipe.execute()
self.thoonk._publish(self.feed_cancelled, (id,))
break
except redis.exceptions.WatchError:
pass
Expand All @@ -279,9 +305,10 @@ def stall(self, id):
pipe.zrem(self.feed_job_claimed, id)
pipe.hdel(self.feed_cancelled, id)
pipe.sadd(self.feed_job_stalled, id)
pipe.zrem(self.feed_published, id)
pipe.zrem(self.feed_publishes, id)
try:
pipe.execute()
self.thoonk._publish(self.feed_job_stalled, (id,))
break
except redis.exceptions.WatchError:
pass
Expand All @@ -302,9 +329,10 @@ def retry(self, id):
pipe = self.redis.pipeline()
pipe.srem(self.feed_job_stalled, id)
pipe.lpush(self.feed_ids, id)
pipe.zadd(self.feed_published, time.time(), id)
pipe.zadd(self.feed_publishes, time.time(), id)
try:
results = pipe.execute()
self.thoonk._publish(self.feed_retried, (id,))
if not results[0]:
return # raise exception?
break
Expand Down
Loading