Skip to content
This repository has been archived by the owner on Jan 10, 2019. It is now read-only.

Update to latest contract and redis versions #10

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
*.pyc
.*.swp
build/
.project
.pydevproject
.settings
28 changes: 16 additions & 12 deletions contract.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@ Delete Feed:
PUBLISH delfeed [feed]\x00[instance uuid]
EXEC // if nil: go back to WATCH

Set Config:
SET feed.config:[feed] //json(config)
Set Config Value:
HSET feed.config:[feed] name value
PUBLISH conffeed [feed]\x00[instance uuid]

Get Config Value:
HGET feed.config:[feed] name

Feed:

Publish:
//id may be provided or generated. An id that has already been published will update that id

max = feed.config:[feed] max_length
WATCH feed.ids:[feed]
delete_ids = ZRANGE feed.ids:[feed] 0 [-max] // eg ZRANGE feed.ids:test 0 -5 if the max is 4
MULTI
Expand Down Expand Up @@ -188,22 +192,22 @@ Job:
LPUSH feed.ids:[feed] [id]
INCR feed.publishes:[feed]
HSET feed.items:[feed] [id] [item]
ZADD feed.published:[feed] [utc epoch] [id]
ZADD feed.published:[feed] [utc epoch milliseconds] [id]
EXEC

High Priority Put:
// id = generated uuid
MULTI
RPUSH feed.ids:[feed] [id]
HSET feed.items:[feed] [id] [item]
ZADD feed.published:[feed] [utc epoch] [id]
ZADD feed.published:[feed] [utc epoch milliseconds] [id]
EXEC

Get:
id = BRPOP feed.ids:[feed] [timeout]
//if error/timeout, abort
MULTI
ZADD feed.claimed:[feed] [utc epoch] [id]
ZADD feed.claimed:[feed] [utc epoch milliseconds] [id]
item = HGET feed:items[feed] [id]
EXEC
//if the id fails to get from feed.ids to feed.claimed, the maintenance will notice eventually
Expand All @@ -214,15 +218,12 @@ Job:
MULTI
ZREM feed.claimed:[feed] [id]
HDEL feed.cancelled:[feed] [id] //just to make sure
INCR feed.finishes:[feed]
//optionally if publishing a result:
LPUSH feed.jobfinished:[feed]\x00[id] [result]
EXPIRE feed.jobfinished:[feed]\x00[id] [timeout]
PUBLISH job.finish:[feed] [id]\x00[result]
HDEL feed.items:[feed] [id]
EXEC // if nil: go back to WATCH and try again

Get Result:
BRPOP feed:jobfinished:[feed]\x00[id] [timeout]

Get Ids:
HKEYS feed.items:[feed]

Expand Down Expand Up @@ -252,7 +253,7 @@ Job:
SREM feed.stalled:[feed] [id]
//if error, abort
LPUSH feed.ids:[feed] [id]
ZADD feed.published:[feed] [utc epoch] [id]
ZADD feed.published:[feed] [utc epoch milliseconds] [id]
EXEC // if nil retry

Retract:
Expand All @@ -267,6 +268,9 @@ Job:
LREM feed.ids:[feed] 1 [id]
EXEC // if fail, retry

getNumOfFailures:
HGET feed.cancelled:[feed] [id]

Maintenance: //maintain job queue -- only ran by one process per jobqueue on occassion -- still a bit hand-wavey
MULTI
keys = HKEYS feed.items:[feed]
Expand All @@ -280,4 +284,4 @@ Job:
LPUSH feed.ids:[feed] [key]

check claimed jobs to see if any have been claimed too long and "Cancel" or "Stall" them
publish stats to a feed
publish stats to a feed
8 changes: 4 additions & 4 deletions test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# You were warned.
# =====================================================================

#[Test]
#host=localhost
#port=6379
#db=10
[Test]
host=localhost
port=6379
db=10
25 changes: 19 additions & 6 deletions tests/test_feed.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
import thoonk
from thoonk.feeds import Feed
import unittest
from ConfigParser import ConfigParser

import threading

class TestLeaf(unittest.TestCase):

def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)

def setUp(self, *args, **kwargs):
conf = ConfigParser()
conf.read('test.cfg')
if conf.sections() == ['Test']:
self.ps = thoonk.Thoonk(host=conf.get('Test', 'host'),
port=conf.getint('Test', 'port'),
db=conf.getint('Test', 'db'))
db=conf.getint('Test', 'db'),
listen=True)
self.ps.redis.flushdb()
else:
print 'No test configuration found in test.cfg'
exit()


def tearDown(self):
self.ps.close()

def test_05_basic_retract(self):
"""Test adding and retracting an item."""
l = self.ps.feed("testfeed")
self.assertEqual(type(l), Feed)
l.publish('foo', id='1')
r = l.get_ids()
v = l.get_all()
Expand All @@ -46,6 +50,10 @@ def test_10_basic_feed(self):
def test_20_basic_feed_items(self):
"""Test items match completely."""
l = self.ps.feed("testfeed")
l.publish("hi", id='1')
l.publish("bye", id='2')
l.publish("thanks", id='3')
l.publish("you're welcome", id='4')
r = l.get_ids()
self.assertEqual(r, ['1', '2', '3', '4'], "Queue results did not match publish: %s" % r)
c = {}
Expand All @@ -56,6 +64,10 @@ def test_20_basic_feed_items(self):
def test_30_basic_feed_retract(self):
"""Testing item retract items match."""
l = self.ps.feed("testfeed")
l.publish("hi", id='1')
l.publish("bye", id='2')
l.publish("thanks", id='3')
l.publish("you're welcome", id='4')
l.retract('3')
r = l.get_ids()
self.assertEqual(r, ['1', '2','4'], "Queue results did not match publish: %s" % r)
Expand All @@ -68,6 +80,7 @@ def test_40_create_delete(self):
"""Testing feed delete"""
l = self.ps.feed("test2")
l.delete_feed()


def test_50_max_length(self):
"""Test feeds with a max length"""
Expand Down
113 changes: 88 additions & 25 deletions tests/test_job.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import thoonk
import unittest
import math
from ConfigParser import ConfigParser
import threading


class TestJob(unittest.TestCase):

def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)

def setUp(self, *args, **kwargs):
conf = ConfigParser()
conf.read('test.cfg')
if conf.sections() == ['Test']:
Expand All @@ -20,48 +18,113 @@ def __init__(self, *args, **kwargs):
print 'No test configuration found in test.cfg'
exit()


def setUp(self):
self.ps = thoonk.Pubsub(db=10, listen=True)
self.ps.redis.flushdb()

def tearDown(self):
self.ps.close()

def test_10_basic_job(self):
"""JOB publish, retrieve, finish, get result"""
"""Test job publish, retrieve, finish flow"""
#publisher
testjob = self.ps.job("testjob")
id = testjob.put(9.0)

self.assertEqual(testjob.get_ids(), [])

id = testjob.put('9.0')

#worker
testjobworker = self.ps.job("testjob")
id_worker, query_worker = testjobworker.get(timeout=3)
result_worker = math.sqrt(float(query_worker))
testjobworker.finish(id_worker, result_worker, True)

#publisher gets result
query_publisher, result_publisher = testjob.get_result(id, 1)
self.assertEqual(float(result_worker), float(result_publisher), "Job results did not match publish.")
id_worker, job_content = testjob.get(timeout=3)
self.assertEqual(job_content, '9.0')
self.assertEqual(testjob.get_failure_count(id), 0)
self.assertEqual(id_worker, id)
testjob.finish(id_worker)

self.assertEqual(testjob.get_ids(), [])

def test_20_cancel_job(self):
"""Test cancelling a job"""
j = self.ps.job("testjob")
#publisher
id = j.put(9.0)
id = j.put('9.0')
#worker claims
id, query = j.get()
id, job_content = j.get()
self.assertEqual(job_content, '9.0')
self.assertEqual(j.get_failure_count(id), 0)
#publisher or worker cancels
j.cancel(id)
id2, query2 = j.get()
id2, job_content2 = j.get()
self.assertEqual(j.get_failure_count(id), 1)
self.assertEqual(job_content2, '9.0')
self.assertEqual(id, id2)
#cancel the work again
j.cancel(id)
# check the cancelled increment again
id3, job_content3 = j.get()
self.assertEqual(j.get_failure_count(id), 2)
self.assertEqual(job_content3, '9.0')
self.assertEqual(id, id3)
#cleanup -- remove the job from the queue
j.retract(id)
self.assertEqual(j.get_ids(), [])

def test_30_no_job(self):
"""Test exception raise when job.get times out"""
j = self.ps.job("testjob")
self.assertEqual(j.get_ids(), [])
self.assertRaises(thoonk.exceptions.Empty, j.get, timeout=1)

class TestJobResult(unittest.TestCase):

def setUp(self, *args, **kwargs):
conf = ConfigParser()
conf.read('test.cfg')
if conf.sections() == ['Test']:
self.ps = thoonk.Thoonk(host=conf.get('Test', 'host'),
port=conf.getint('Test', 'port'),
db=conf.getint('Test', 'db'),
listen=True)
self.ps.redis.flushdb()
else:
print 'No test configuration found in test.cfg'
exit()

def tearDown(self):
self.ps.close()

def test_10_job_result(self):
"""Test job result published"""

suite = unittest.TestLoader().loadTestsFromTestCase(TestJob)
create_event = threading.Event()
def create_handler(name):
self.assertEqual(name, "testjobresult")
create_event.set()
self.ps.register_handler("create", create_handler)

#publisher
testjob = self.ps.job("testjobresult")
self.assertEqual(testjob.get_ids(), [])

# Wait until the create event has been received by the ThoonkListener
create_event.wait()

id = testjob.put('9.0')

#worker
id_worker, job_content = testjob.get(timeout=3)
self.assertEqual(job_content, '9.0')
self.assertEqual(testjob.get_failure_count(id), 0)
self.assertEqual(id_worker, id)

result_event = threading.Event()
def result_handler(name, id, result):
self.assertEqual(name, "testjobresult")
self.assertEqual(id, id_worker)
self.assertEqual(result, "myresult")
result_event.set()

self.ps.register_handler("finish", result_handler)
testjob.finish(id_worker, "myresult")
result_event.wait(1)
self.assertTrue(result_event.isSet(), "No result received!")
self.assertEqual(testjob.get_ids(), [])
self.ps.remove_handler("result", result_handler)

#suite = unittest.TestLoader().loadTestsFromTestCase(TestJob)

Loading