Skip to content

Commit

Permalink
Fixed etcd3 test
Browse files Browse the repository at this point in the history
  • Loading branch information
tcalmant committed Aug 23, 2024
1 parent 85f663f commit 6656175
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions tests/rsa/test_etcd3_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@
# ------------------------------------------------------------------------------


def start_framework_for_advertise(state_queue: Queue):
def start_framework_for_advertise(state_queue: Queue, order_queue: Queue):
"""
Starts a Pelix framework to advertise (via etcd) a helloimpl_xmlrpc
remote service instance. The tests can/will then discover this
service advertisement and test the EndpointEventListener notification
:param state_queue: Queue to communicate status and terminate
:param state_queue: Queue to communicate status
:param order_queue: Queue to receive termination order
"""
try:
# Start the framework
Expand Down Expand Up @@ -120,15 +121,9 @@ def start_framework_for_advertise(state_queue: Queue):
try:
# Send that we are now ready
state_queue.put("ready")
# Loop until ready processed
while True:
if state_queue.empty():
break
# Loop until we receive done message
while True:
state = state_queue.get(timeout=60)
if state is None:
break
# Wait until we receive the termination order
order = order_queue.get(timeout=60)
assert order == "quit"
finally:
# stop the framework gracefully
framework.stop()
Expand All @@ -145,6 +140,7 @@ def setUp(self):
TestEndpointEventListener
"""
self.status_queue = Queue()
self.order_queue = Queue()

# start a local framework
self.framework = create_framework(
Expand Down Expand Up @@ -178,7 +174,7 @@ def prepareFrameworkProcess(self):
"""
# start external framework that publishes remote service
self.publisher_process = WrappedProcess(
target=start_framework_for_advertise, args=[self.status_queue]
target=start_framework_for_advertise, args=[self.status_queue, self.order_queue]
)
self.publisher_process.start()
state = self.status_queue.get(timeout=20)
Expand All @@ -194,6 +190,7 @@ def tearDown(self):
self.publisher_process.close()
finally:
self.status_queue.close()
self.order_queue.close()
self.publisher = None

# Stop the framework
Expand Down Expand Up @@ -224,6 +221,9 @@ def test_handler_1(endpoint_event, matched_filter):
self.assertTrue(isinstance(interfaces, type([])))
self.assertTrue("org.eclipse.ecf.examples.hello.IHello" in interfaces)

# Tell child process to quit
self.order_queue.put("quit")

# set the test_done_event, so tester thread will continue
test_done_event.set()
except Exception as e:
Expand All @@ -245,7 +245,7 @@ def test_handler_2(endpoint_event, matched_filter):
try:
if endpoint_event.get_type() == EndpointEvent.ADDED:
# send shutdown to trigger the removal
self.status_queue.put(None)
self.order_queue.put("quit")
elif endpoint_event.get_type() == EndpointEvent.REMOVED:
# do tests
self.assertTrue(matched_filter, ENDPOINT_LISTENER_SCOPE)
Expand Down

0 comments on commit 6656175

Please sign in to comment.