diff --git a/tests/rsa/test_etcd3_discovery.py b/tests/rsa/test_etcd3_discovery.py index 98b3bd5f..d6353f8e 100644 --- a/tests/rsa/test_etcd3_discovery.py +++ b/tests/rsa/test_etcd3_discovery.py @@ -62,13 +62,14 @@ # ------------------------------------------------------------------------------ -def start_framework_for_advertise(state_queue: Queue): +def start_framework_for_advertise(state_queue: Queue, order_queue: Queue): """ Starts a Pelix framework to advertise (via etcd) a helloimpl_xmlrpc remote service instance. The tests can/will then discover this service advertisement and test the EndpointEventListener notification - :param state_queue: Queue to communicate status and terminate + :param state_queue: Queue to communicate status + :param order_queue: Queue to receive termination order """ try: # Start the framework @@ -120,15 +121,9 @@ def start_framework_for_advertise(state_queue: Queue): try: # Send that we are now ready state_queue.put("ready") - # Loop until ready processed - while True: - if state_queue.empty(): - break - # Loop until we receive done message - while True: - state = state_queue.get(timeout=60) - if state is None: - break + # Wait until we receive the termination order + order = order_queue.get(timeout=60) + assert order == "quit" finally: # stop the framework gracefully framework.stop() @@ -145,6 +140,7 @@ def setUp(self): TestEndpointEventListener """ self.status_queue = Queue() + self.order_queue = Queue() # start a local framework self.framework = create_framework( @@ -178,7 +174,7 @@ def prepareFrameworkProcess(self): """ # start external framework that publishes remote service self.publisher_process = WrappedProcess( - target=start_framework_for_advertise, args=[self.status_queue] + target=start_framework_for_advertise, args=[self.status_queue, self.order_queue] ) self.publisher_process.start() state = self.status_queue.get(timeout=20) @@ -194,6 +190,7 @@ def tearDown(self): self.publisher_process.close() finally: self.status_queue.close() + self.order_queue.close() self.publisher = None # Stop the framework @@ -224,6 +221,9 @@ def test_handler_1(endpoint_event, matched_filter): self.assertTrue(isinstance(interfaces, type([]))) self.assertTrue("org.eclipse.ecf.examples.hello.IHello" in interfaces) + # Tell child process to quit + self.order_queue.put("quit") + # set the test_done_event, so tester thread will continue test_done_event.set() except Exception as e: @@ -245,7 +245,7 @@ def test_handler_2(endpoint_event, matched_filter): try: if endpoint_event.get_type() == EndpointEvent.ADDED: # send shutdown to trigger the removal - self.status_queue.put(None) + self.order_queue.put("quit") elif endpoint_event.get_type() == EndpointEvent.REMOVED: # do tests self.assertTrue(matched_filter, ENDPOINT_LISTENER_SCOPE)