This repository has been archived by the owner on Nov 9, 2017. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathtests.py
149 lines (119 loc) · 4.53 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import os, sys, signal
import unittest, time, socket
from gearman import GearmanClient, GearmanWorker
from gearman.connection import GearmanConnection
from gearman.manager import GearmanManager
from gearman.server import GearmanServer
from gearman.task import Task
job_servers = ["127.0.0.1"]
class FailedError(Exception):
pass
def echo(job):
return job.arg
def fail(job):
raise FailedError()
def sleep(job):
time.sleep(float(job.arg))
return job.arg
class ObjectWorker(object):
def echo(self, job):
return job.arg
class ClassWorker(object):
@staticmethod
def echo(job):
return job.arg
class GearmanTestCase(unittest.TestCase):
def start_server(self):
self.server_pid = os.fork()
if not self.server_pid:
server = GearmanServer()
server.start()
sys.exit()
connection = GearmanConnection(job_servers[0])
for i in range(10):
try:
connection.connect()
except GearmanConnection.ConnectionError:
time.sleep(0.5)
else:
break
connection.close()
def stop_server(self):
os.kill(self.server_pid, signal.SIGTERM)
os.waitpid(self.server_pid, 0)
class TestConnection(GearmanTestCase):
def setUp(self):
self.start_server()
self.connection = GearmanConnection(job_servers[0])
self.connection.connect()
def tearDown(self):
self.stop_server()
def testNoArgs(self):
self.connection.send_command_blocking("echo_req")
cmd = self.connection.recv_blocking()
self.failUnlessEqual(cmd[0], "echo_res")
def testWithArgs(self):
self.connection.send_command_blocking("submit_job", dict(func="echo", uniq="", arg="tea"))
cmd = self.connection.recv_blocking()
self.failUnlessEqual(cmd[0], 'job_created')
class TestGearman(GearmanTestCase):
def setUp(self):
self.start_server()
self.last_exception = (None, None)
self.worker = GearmanWorker(job_servers)
self.worker.register_function("echo", echo)
self.worker.register_function("fail", fail)
self.worker.register_function("sleep", sleep, timeout=1)
self.worker.register_class(ObjectWorker())
self.worker.register_class(ClassWorker())
class Hooks(object):
@staticmethod
def start(job):
pass
@staticmethod
def complete(job, res):
pass
@staticmethod
def fail(job, exc):
self.last_exception = (job.func, exc)
import thread
self.worker_thread = thread.start_new_thread(self.worker.work, tuple(), dict(hooks=Hooks)) # TODO: Shouldn't use threads.. but we do for now (also, the thread is never terminated)
self.client = GearmanClient(job_servers)
def tearDown(self):
del self.worker
del self.client
self.stop_server()
def testComplete(self):
self.failUnlessEqual(self.client.do_task(Task("echo", "bar")), 'bar')
def testFail(self):
self.failUnlessRaises(self.client.TaskFailed, lambda:self.client.do_task(Task("fail", "bar")))
# self.failUnlessEqual(self.last_exception[0], "fail")
def testCompleteAfterFail(self):
self.failUnlessRaises(self.client.TaskFailed, lambda:self.client.do_task(Task("fail", "bar")))
self.failUnlessEqual(self.client.do_task(Task("echo", "bar")), 'bar')
def testTimeout(self):
self.failUnlessEqual(self.client.do_task(Task("sleep", "0.1")), '0.1')
self.failUnlessRaises(self.client.TaskFailed, lambda:self.client.do_task(Task("sleep", "1.5")))
def testCall(self):
self.failUnlessEqual(self.client("echo", "bar"), 'bar')
def testObjectWorker(self):
self.failUnlessEqual(self.client("ObjectWorker.echo", "foo"), "foo")
def testClassWorker(self):
self.failUnlessEqual(self.client("ClassWorker.echo", "foo"), "foo")
class TestManager(GearmanTestCase):
def setUp(self):
self.start_server()
self.manager = GearmanManager(job_servers[0])
def tearDown(self):
self.stop_server()
def testStatus(self):
status = self.manager.status()
self.failUnless(isinstance(status, dict))
def testVersion(self):
version = self.manager.version()
self.failUnless('.' in version)
def testWorkers(self):
workers = self.manager.workers()
self.failUnless(isinstance(workers, list))
if __name__ == '__main__':
unittest.main()