Skip to content

Commit

Permalink
better cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy Razon committed Apr 18, 2017
1 parent 95b7fa9 commit c0cd4c7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 18 deletions.
41 changes: 36 additions & 5 deletions jumper_logging_agent/agent.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import atexit
import json
import stat
import os
Expand All @@ -12,6 +13,8 @@
import itertools
import keen
import time

import signal
from future import standard_library
# noinspection PyUnresolvedReferences
from future.builtins import *
Expand Down Expand Up @@ -69,7 +72,6 @@ def __init__(
on_listening=None,
):
self.input_filename = input_filename
self.control_filename = input_filename + '.control'
self.flush_priority = flush_priority
self.flush_threshold = flush_threshold
self.flush_interval = flush_interval
Expand Down Expand Up @@ -132,6 +134,9 @@ def on_data_available(data):
break # self.control_file has input - stop

on_data_available(input_file)
except select.error as e:
if e.args[0] == errno.EINTR:
break
except IOError as e:
log.warn('got exception', exc_info=True)
if e.errno not in (errno.EAGAIN, errno.EPIPE):
Expand All @@ -144,6 +149,8 @@ def on_data_available(data):
input_file.close()
if control_file:
control_file.close()
self.cleanup()
print('Agent stopped')

def flush(self):
events = self.pending_events
Expand All @@ -160,9 +167,18 @@ def write_events(self, events):
event_dict = {k: list(v) for k, v in grouped}
self.event_store.add_events(event_dict)

@property
def control_filename(self):
return agent_control_filename(self.input_filename)

def stop(self):
with open(self.control_filename, b'wb') as f:
f.write(b'stop')
stop_agent(self.input_filename)

def cleanup(self):
try:
os.remove(self.control_filename)
except OSError:
pass

def __enter__(self):
return self.start()
Expand All @@ -171,6 +187,15 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()


def agent_control_filename(agent_input_filename):
return agent_input_filename + '.control'


def stop_agent(agent_input_filename):
with open(agent_control_filename(agent_input_filename), b'wb') as f:
f.write(b'stop')


def extract_class(s):
module_name, class_name = s.rsplit('.', 1)
mod = import_module(module_name)
Expand Down Expand Up @@ -208,7 +233,7 @@ def main():
print('Could not load or instantiate event store %s: %s' % (args.event_store, e))
return 2

log_level = logging.DEBUG if args.verbose else logging.WARN
log_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(format='%(asctime)s %(levelname)8s %(name)10s: %(message)s', level=log_level)

print('Starting agent')
Expand All @@ -225,8 +250,14 @@ def on_listening():
event_store=event_store,
on_listening=on_listening,
)
# atexit.register(lambda: agent.cleanup)

signal.signal(signal.SIGTERM, lambda *a: agent.stop())
signal.signal(signal.SIGINT, lambda *a: agent.stop())

atexit.register(agent.cleanup)

agent.start()
agent.cleanup()
return 0


Expand Down
22 changes: 9 additions & 13 deletions jumper_logging_agent/tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
MAIN_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))


def delete_file(filename):
try:
os.remove(filename)
except OSError:
pass


def random_string(n=5):
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(n))

Expand All @@ -50,9 +57,6 @@ def open_fifo_readwrite(filename):
if e.errno != errno.EEXIST:
raise

# if not is_fifo(filename):
# raise ValueError('file "%s" is not a named pipe' % (filename,))

fd = os.open(filename, os.O_RDWR | os.O_NONBLOCK)
return os.fdopen(fd, 'wb')

Expand Down Expand Up @@ -112,10 +116,7 @@ def setUp(self):
def tearDown(self):
close_local_agent_file()
self.stop_agent()
if os.path.exists(self.agent_filename):
os.remove(self.agent_filename)
if os.path.exists(self.agent_filename + '.control'):
os.remove(self.agent_filename + '.control')
delete_file(self.agent_filename)

def thread_local_agent_file(self):
agent_file = getattr(local, 'agent_file', None)
Expand Down Expand Up @@ -248,14 +249,12 @@ class AgentProcessTests(_AbstractAgentTestCase):
def setUp(self):
super(AgentProcessTests, self).setUp()
self.mock_event_store_json = '/tmp/mock_event_store_' + self.run_id
self.agent_output_filename = '/tmp/agent_output_' + self.run_id
self.agent_print_stdout_thread = None
self.agent_output = []

def tearDown(self):
super(AgentProcessTests, self).tearDown()
if os.path.exists(self.mock_event_store_json):
os.remove(self.mock_event_store_json )
delete_file(self.mock_event_store_json)

def start_agent(self, **kwargs):
args = ['python', '-u', '%s/agent_main.py' % (MAIN_DIR,)]
Expand All @@ -268,7 +267,6 @@ def start_agent(self, **kwargs):
env = os.environ.copy()
env[mock_event_store.ENV_JUMPER_MOCK_EVENT_STORE_JSON] = self.mock_event_store_json

self.agent_output_file = open(self.agent_output_filename, b'w')
self.agent = subprocess.Popen(args, env=env, stdout=subprocess.PIPE)

def agent_print_stdout():
Expand All @@ -283,8 +281,6 @@ def agent_print_stdout():
def stop_agent(self):
if self.agent:
self.agent.terminate()
if self.agent_output_file:
self.agent_output_file.close()
if self.agent_print_stdout_thread:
self.agent_print_stdout_thread.join()

Expand Down

0 comments on commit c0cd4c7

Please sign in to comment.