From fa9476928f0c86df61dd104468636014b8de3b3d Mon Sep 17 00:00:00 2001 From: Dean Hudek Date: Fri, 26 May 2023 10:58:53 +0200 Subject: [PATCH 1/2] added async test for gocdb_downtimes, vapor_weights, webapi_metricprofile --- modules/tasks/gocdb_downtimes.py | 41 ++--- modules/tasks/vapor_weights.py | 71 ++++----- modules/tasks/webapi_metricprofile.py | 28 ++-- tests/test_asynctasks.py | 206 +++++++++++++++++++++++++- 4 files changed, 280 insertions(+), 66 deletions(-) diff --git a/modules/tasks/gocdb_downtimes.py b/modules/tasks/gocdb_downtimes.py index 273ad029..5b4d8cd0 100644 --- a/modules/tasks/gocdb_downtimes.py +++ b/modules/tasks/gocdb_downtimes.py @@ -6,7 +6,7 @@ from argo_connectors.parse.gocdb_downtimes import ParseDowntimes from argo_connectors.io.webapi import WebAPI from argo_connectors.tasks.common import write_state, write_downtimes_json as write_json - +from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError class TaskGocdbDowntimes(object): def __init__(self, loop, logger, connector_name, globopts, auth_opts, @@ -70,24 +70,29 @@ async def send_webapi(self, dts): await webapi.send(dts, downtimes_component=True) async def run(self): - # we don't have multiple tenant definitions in one - # customer file so we can safely assume one tenant/customer - write_empty = self.confcust.send_empty(self.connector_name) - if not write_empty: - res = await self.fetch_data() - dts = self.parse_source(res) - else: - dts = [] + try: + # we don't have multiple tenant definitions in one + # customer file so we can safely assume one tenant/customer + write_empty = self.confcust.send_empty(self.connector_name) + if not write_empty: + res = await self.fetch_data() + dts = self.parse_source(res) + else: + dts = [] - await write_state(self.connector_name, self.globopts, self.confcust, self.timestamp, True) + await write_state(self.connector_name, self.globopts, self.confcust, self.timestamp, True) - if eval(self.globopts['GeneralPublishWebAPI'.lower()]): - await self.send_webapi(dts) + if eval(self.globopts['GeneralPublishWebAPI'.lower()]): + await self.send_webapi(dts) - if dts or write_empty: - cust = list(self.confcust.get_customers())[0] - self.logger.info('Customer:%s Fetched Date:%s Endpoints:%d' % - (self.confcust.get_custname(cust), self.targetdate, len(dts))) + if dts or write_empty: + cust = list(self.confcust.get_customers())[0] + self.logger.info('Customer:%s Fetched Date:%s Endpoints:%d' % + (self.confcust.get_custname(cust), self.targetdate, len(dts))) - if eval(self.globopts['GeneralWriteJson'.lower()]): - write_json(self.logger, self.globopts, self.confcust, dts, self.timestamp) + if eval(self.globopts['GeneralWriteJson'.lower()]): + write_json(self.logger, self.globopts, self.confcust, dts, self.timestamp) + + except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc: + self.logger.error(repr(exc)) + await write_state(self.connector_name, self.globopts, self.confcust, self.timestamp, False) diff --git a/modules/tasks/vapor_weights.py b/modules/tasks/vapor_weights.py index c4f4986b..1025cecb 100644 --- a/modules/tasks/vapor_weights.py +++ b/modules/tasks/vapor_weights.py @@ -6,7 +6,7 @@ from argo_connectors.io.webapi import WebAPI from argo_connectors.parse.vapor import ParseWeights from argo_connectors.tasks.common import write_weights_metricprofile_state as write_state, write_weights_json as write_json - +from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError class TaskVaporWeights(object): def __init__(self, loop, logger, connector_name, globopts, confcust, feed, @@ -57,35 +57,40 @@ async def send_webapi(self, weights, webapi_opts, job): await webapi.send(weights) async def run(self): - for job, cust in self.jobcust: - self.logger.customer = self.confcust.get_custname(cust) - self.logger.job = job - - write_empty = self.confcust.send_empty(self.connector_name, cust) - - if write_empty: - weights = [] - else: - res = await self.fetch_data() - weights = self.parse_source(res) - - webapi_opts = self.get_webapi_opts(cust, job) - - if eval(self.globopts['GeneralPublishWebAPI'.lower()]): - await self.send_webapi(weights, webapi_opts, job) - - if eval(self.globopts['GeneralWriteJson'.lower()]): - write_json(self.logger, self.globopts, cust, job, - self.confcust, self.fixed_date, weights) - - await write_state(self.connector_name, self.globopts, cust, job, self.confcust, self.fixed_date, True) - - if weights or write_empty: - custs = set([cust for job, cust in self.jobcust]) - for cust in custs: - jobs = [job for job, lcust in self.jobcust if cust == lcust] - self.logger.info('Customer:%s Jobs:%s Sites:%d' % - (self.confcust.get_custname(cust), jobs[0] - if len(jobs) == 1 else - '({0})'.format(','.join(jobs)), - len(weights))) + try: + for job, cust in self.jobcust: + self.logger.customer = self.confcust.get_custname(cust) + self.logger.job = job + + write_empty = self.confcust.send_empty(self.connector_name, cust) + + if write_empty: + weights = [] + else: + res = await self.fetch_data() + weights = self.parse_source(res) + + webapi_opts = self.get_webapi_opts(cust, job) + + if eval(self.globopts['GeneralPublishWebAPI'.lower()]): + await self.send_webapi(weights, webapi_opts, job) + + if eval(self.globopts['GeneralWriteJson'.lower()]): + write_json(self.logger, self.globopts, cust, job, + self.confcust, self.fixed_date, weights) + + await write_state(self.connector_name, self.globopts, cust, job, self.confcust, self.fixed_date, True) + + if weights or write_empty: + custs = set([cust for job, cust in self.jobcust]) + for cust in custs: + jobs = [job for job, lcust in self.jobcust if cust == lcust] + self.logger.info('Customer:%s Jobs:%s Sites:%d' % + (self.confcust.get_custname(cust), jobs[0] + if len(jobs) == 1 else + '({0})'.format(','.join(jobs)), + len(weights))) + + except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc: + self.logger.error(repr(exc)) + await write_state(self.connector_name, self.globopts, cust, job, self.confcust, self.fixed_date, False) \ No newline at end of file diff --git a/modules/tasks/webapi_metricprofile.py b/modules/tasks/webapi_metricprofile.py index d90da9d4..0186a0b9 100644 --- a/modules/tasks/webapi_metricprofile.py +++ b/modules/tasks/webapi_metricprofile.py @@ -32,20 +32,20 @@ def parse_source(self, res, profiles): return metric_profiles async def run(self): - for job in self.confcust.get_jobs(self.cust): - self.logger.customer = self.confcust.get_custname(self.cust) - self.logger.job = job + try: + for job in self.confcust.get_jobs(self.cust): + self.logger.customer = self.confcust.get_custname(self.cust) + self.logger.job = job - profiles = self.confcust.get_profiles(job) - webapi_custopts = self.confcust.get_webapiopts(self.cust) - webapi_opts = self.cglob.merge_opts(webapi_custopts, 'webapi') - webapi_complete, missopt = self.cglob.is_complete(webapi_opts, 'webapi') + profiles = self.confcust.get_profiles(job) + webapi_custopts = self.confcust.get_webapiopts(self.cust) + webapi_opts = self.cglob.merge_opts(webapi_custopts, 'webapi') + webapi_complete, missopt = self.cglob.is_complete(webapi_opts, 'webapi') - if not webapi_complete: - self.logger.error('Customer:%s Job:%s %s options incomplete, missing %s' % (self.logger.customer, self.logger.job, 'webapi', ' '.join(missopt))) - continue + if not webapi_complete: + self.logger.error('Customer:%s Job:%s %s options incomplete, missing %s' % (self.logger.customer, self.logger.job, 'webapi', ' '.join(missopt))) + continue - try: res = await self.fetch_data(webapi_opts['webapihost'], webapi_opts['webapitoken']) fetched_profiles = self.parse_source(res, profiles) @@ -57,6 +57,6 @@ async def run(self): self.logger.info('Customer:' + self.logger.customer + ' Job:' + job + ' Profiles:%s Tuples:%d' % (', '.join(profiles), len(fetched_profiles))) - except (ConnectorHttpError, KeyboardInterrupt, ConnectorParseError) as exc: - self.logger.error(repr(exc)) - await write_state(self.connector_name, self.globopts, self.cust, job, self.confcust, self.fixed_date, False) + except (ConnectorHttpError, KeyboardInterrupt, ConnectorParseError) as exc: + self.logger.error(repr(exc)) + await write_state(self.connector_name, self.globopts, self.cust, job, self.confcust, self.fixed_date, False) diff --git a/tests/test_asynctasks.py b/tests/test_asynctasks.py index 47beb6ac..9baad4c9 100644 --- a/tests/test_asynctasks.py +++ b/tests/test_asynctasks.py @@ -12,6 +12,10 @@ from argo_connectors.tasks.gocdb_servicetypes import TaskGocdbServiceTypes from argo_connectors.tasks.gocdb_topology import TaskGocdbTopology, find_next_paging_cursor_count from argo_connectors.tasks.provider_topology import TaskProviderTopology +from argo_connectors.tasks.gocdb_downtimes import TaskGocdbDowntimes +from argo_connectors.tasks.vapor_weights import TaskVaporWeights +from argo_connectors.tasks.webapi_metricprofile import TaskWebApiMetricProfile + from argo_connectors.parse.base import ParseHelpers @@ -97,7 +101,7 @@ async def test_failedNextCursor(self, mock_httpget, mock_fetchldap, class TestFindNextPagingCursorCount(unittest.TestCase): def setUp(self): self.logger = mock.MagicMock() - with open('tests/sample-topofeedpaging.xml') as tf: + with open('../tests/sample-topofeedpaging.xml') as tf: self.res = tf.read() def test_count_n_cursor(self): @@ -504,3 +508,203 @@ async def test_StepsFailedRun(self, mock_writestate): self.assertTrue(self.downtimes_flat.logger.error.call_args[0][0], repr( ConnectorHttpError('fetch_data failed'))) self.assertFalse(self.downtimes_flat.send_webapi.called) + +class GocdbDowntimes(unittest.TestCase): + def setUp(self): + self.loop = asyncio.get_event_loop() + logger = mock.Mock() + logger.customer = CUSTOMER_NAME + mocked_globopts = dict(generalpublishwebapi='True', + generalwritejson='True', + outputdowntimes='downtimes_DATE.json', + inputstatesavedir='/some/mock/path/', + inputstatedays=3 + ) + globopts = mocked_globopts + authopts = mock.Mock() + webapiopts = mock.Mock() + confcust = mock.Mock() + confcust.send_empty.return_value = False + confcust.get_customers.return_value = ['CUSTOMERFOO', 'CUSTOMERBAR'] + custname = CUSTOMER_NAME + downtime_feed = 'https://gocdb-downtimes.com/api/fetch' + now = datetime.datetime.now().strftime('%Y-%m-%d') + start = datetime.datetime.strptime(now, '%Y-%m-%d') + end = datetime.datetime.strptime(now, '%Y-%m-%d') + timestamp = start.strftime('%Y_%m_%d') + start = start.replace(hour=0, minute=0, second=0) + end = end.replace(hour=23, minute=59, second=59) + + self.gocdb_downtimes = TaskGocdbDowntimes(self.loop, logger, 'test_asynctasks_gocdbdowntimes', globopts, + authopts, webapiopts, confcust, + custname, downtime_feed, start, + end, False, timestamp, timestamp) + + @mock.patch('argo_connectors.tasks.gocdb_downtimes.write_json') + @mock.patch('argo_connectors.tasks.gocdb_downtimes.write_state') + @async_test + async def test_StepsSuccessRun(self, mock_writestate, mock_writejson): + self.gocdb_downtimes.fetch_data = mock.AsyncMock() + self.gocdb_downtimes.fetch_data.side_effect = ['downtimes-ok'] + self.gocdb_downtimes.send_webapi = mock.AsyncMock() + self.gocdb_downtimes.parse_source = mock.MagicMock() + await self.gocdb_downtimes.run() + self.assertTrue(self.gocdb_downtimes.fetch_data.called) + self.assertTrue(self.gocdb_downtimes.parse_source.called) + self.gocdb_downtimes.parse_source.assert_called_with('downtimes-ok') + self.assertEqual( + mock_writestate.call_args[0][0], 'test_asynctasks_gocdbdowntimes') + self.assertEqual( + mock_writestate.call_args[0][3], self.gocdb_downtimes.timestamp) + self.assertTrue(mock_writestate.call_args[0][4]) + self.assertTrue(mock_writejson.called, True) + self.assertEqual( + mock_writejson.call_args[0][4], datetime.datetime.now().strftime('%Y_%m_%d')) + self.assertTrue(self.gocdb_downtimes.send_webapi.called) + self.assertTrue(self.gocdb_downtimes.logger.info.called) + + @mock.patch('argo_connectors.tasks.gocdb_downtimes.write_state') + @async_test + async def test_StepsFailedRun(self, mock_writestate): + self.gocdb_downtimes.fetch_data = mock.AsyncMock() + self.gocdb_downtimes.fetch_data.side_effect = [ + ConnectorHttpError('fetch_data failed')] + self.gocdb_downtimes.send_webapi = mock.AsyncMock() + self.gocdb_downtimes.parse_source = mock.MagicMock() + await self.gocdb_downtimes.run() + self.assertTrue(self.gocdb_downtimes.fetch_data.called) + self.assertFalse(self.gocdb_downtimes.parse_source.called) + self.assertEqual( + mock_writestate.call_args[0][0], 'test_asynctasks_gocdbdowntimes') + self.assertEqual( + mock_writestate.call_args[0][3], self.gocdb_downtimes.timestamp) + self.assertFalse(mock_writestate.call_args[0][4]) + self.assertTrue(self.gocdb_downtimes.logger.error.called) + self.assertTrue(self.gocdb_downtimes.logger.error.call_args[0][0], repr( + ConnectorHttpError('fetch_data failed'))) + self.assertFalse(self.gocdb_downtimes.send_webapi.called) + + +class WaporWeights(unittest.TestCase): + def setUp(self): + self.loop = asyncio.get_event_loop() + logger = mock.Mock() + logger.customer = CUSTOMER_NAME + mocked_globopts = dict(generalpublishwebapi='True', + generalwritejson='True', + outputdowntimes='downtimes_DATE.json', + inputstatesavedir='/some/mock/path/', + inputstatedays=3 + ) + globopts = mocked_globopts + confcust = mock.Mock() + confcust.send_empty.return_value = False + confcust.get_customers.return_value = ['CUSTOMERFOO', 'CUSTOMERBAR'] + VAPORPI = 'https://foo-portal.eu/' + jobcust = [('Critical_foo', 'CUSTOMER_FOO')] + cglob = mock.Mock() + + self.vapor_weights = TaskVaporWeights(self.loop, logger, 'test_asynctasks_weights', globopts, + confcust, VAPORPI, jobcust, cglob, + fixed_date=None) + + @mock.patch('argo_connectors.tasks.vapor_weights.write_json') + @mock.patch('argo_connectors.tasks.vapor_weights.write_state') + @async_test + async def test_StepsSuccessRun(self, mock_writestate, mock_writejson): + self.vapor_weights.fetch_data = mock.AsyncMock() + self.vapor_weights.fetch_data.side_effect = ['weights-ok'] + self.vapor_weights.get_webapi_opts = mock.MagicMock() + self.vapor_weights.parse_source = mock.MagicMock() + self.vapor_weights.send_webapi = mock.AsyncMock() + await self.vapor_weights.run() + self.assertTrue(self.vapor_weights.fetch_data.called) + self.assertTrue(self.vapor_weights.parse_source.called) + self.vapor_weights.parse_source.assert_called_with('weights-ok') + self.assertEqual( + mock_writestate.call_args[0][0], 'test_asynctasks_weights') + self.assertEqual( + mock_writestate.call_args[0][3], 'Critical_foo') + self.assertTrue(mock_writestate.call_args[0][4]) + self.assertTrue(mock_writejson.called, True) + self.assertTrue(self.vapor_weights.send_webapi.called) + self.assertTrue(self.vapor_weights.logger.info.called) + + @mock.patch('argo_connectors.tasks.vapor_weights.write_state') + @async_test + async def test_StepsFailedRun(self, mock_writestate): + self.vapor_weights.fetch_data = mock.AsyncMock() + self.vapor_weights.fetch_data.side_effect = [ + ConnectorHttpError('fetch_data failed')] + self.vapor_weights.get_webapi_opts = mock.MagicMock() + self.vapor_weights.parse_source = mock.MagicMock() + self.vapor_weights.send_webapi = mock.AsyncMock() + await self.vapor_weights.run() + self.assertTrue(self.vapor_weights.fetch_data.called) + self.assertFalse(self.vapor_weights.parse_source.called) + self.assertEqual( + mock_writestate.call_args[0][0], 'test_asynctasks_weights') + self.assertTrue(self.vapor_weights.logger.error.called) + self.assertTrue(self.vapor_weights.logger.error.call_args[0][0], repr( + ConnectorHttpError('fetch_data failed'))) + self.assertFalse(self.vapor_weights.send_webapi.called) + + +class MetricprofileWebapi(unittest.TestCase): + def setUp(self): + self.loop = asyncio.get_event_loop() + logger = mock.Mock() + mocked_globopts = dict(generalpublishwebapi='True', + generalwritejson='True', + outputdowntimes='downtimes_DATE.json', + inputstatesavedir='/some/mock/path/', + inputstatedays=3 + ) + globopts = mocked_globopts + confcust = mock.Mock() + confcust.send_empty.return_value = False + confcust.get_jobs.return_value = ['job1', 'job2'] + confcust.get_custname.return_value = 'CUSTOMER_NAME' + confcust.get_profiles.return_value = ['FOO_CUSTOMER_CRITICAL'] + cglob = mock.Mock() + cglob.is_complete.return_value = True, None + cglob.merge_opts.return_value = dict( + webapitoken='foo_token', webapihost='foo.mock.com') + + self.webapi_metricprofile = TaskWebApiMetricProfile( + self.loop, logger, 'test_asynctasks_metricprofile', globopts, cglob, confcust, cust='CUSTOMER_FOO', fixed_date=None + ) + + @mock.patch('argo_connectors.tasks.webapi_metricprofile.write_json') + @mock.patch('argo_connectors.tasks.webapi_metricprofile.write_state') + @async_test + async def test_StepsSuccessRun(self, mock_writestate, mock_writejson): + self.webapi_metricprofile.fetch_data = mock.AsyncMock() + self.webapi_metricprofile.parse_source = mock.MagicMock() + await self.webapi_metricprofile.run() + self.assertTrue(self.webapi_metricprofile.fetch_data.called) + self.assertTrue(self.webapi_metricprofile.parse_source.called) + self.assertEqual( + mock_writestate.call_args[0][0], 'test_asynctasks_metricprofile') + self.assertEqual( + mock_writestate.call_args[0][3], 'job2') + self.assertTrue(mock_writestate.call_args[0][4]) + self.assertTrue(mock_writejson.called, True) + self.assertTrue(self.webapi_metricprofile.logger.info.called) + + @mock.patch('argo_connectors.tasks.webapi_metricprofile.write_state') + @async_test + async def test_StepsFailedRun(self, mock_writestate): + self.webapi_metricprofile.fetch_data = mock.AsyncMock() + self.webapi_metricprofile.fetch_data.side_effect = [ + ConnectorHttpError('fetch_data failed')] + self.webapi_metricprofile.parse_source = mock.MagicMock() + await self.webapi_metricprofile.run() + self.assertTrue(self.webapi_metricprofile.fetch_data.called) + self.assertFalse(self.webapi_metricprofile.parse_source.called) + self.assertEqual( + mock_writestate.call_args[0][0], 'test_asynctasks_metricprofile') + self.assertFalse(mock_writestate.call_args[0][6]) + self.assertTrue(self.webapi_metricprofile.logger.error.called) + self.assertTrue(self.webapi_metricprofile.logger.error.call_args[0][0], repr( + ConnectorHttpError('fetch_data failed'))) \ No newline at end of file From 1741f49f16da5cfe0860bcbccdbd7db93c98adb8 Mon Sep 17 00:00:00 2001 From: Dean Hudek Date: Fri, 26 May 2023 11:09:06 +0200 Subject: [PATCH 2/2] added async tests for agora_topology and flat_topology, added csv sample for flat_topology --- exec/topology-agora-connector.py | 4 +- modules/tasks/agora_topology.py | 62 +++++++------- modules/tasks/flat_topology.py | 66 ++++++++------- tests/sample-csv-topology.csv | 42 ++++++++++ tests/test_asynctasks.py | 134 ++++++++++++++++++++++++++++++- 5 files changed, 246 insertions(+), 62 deletions(-) create mode 100644 tests/sample-csv-topology.csv diff --git a/exec/topology-agora-connector.py b/exec/topology-agora-connector.py index 53f04ac7..f1a4923f 100755 --- a/exec/topology-agora-connector.py +++ b/exec/topology-agora-connector.py @@ -11,7 +11,7 @@ from argo_connectors.log import Logger from argo_connectors.config import Global, CustomerConf from argo_connectors.utils import date_check -from argo_connectors.tasks.agora_topology import TaskProviderTopology +from argo_connectors.tasks.agora_topology import AgoraProviderTopology from argo_connectors.tasks.common import write_state @@ -64,7 +64,7 @@ def main(): asyncio.set_event_loop(loop) try: - task = TaskProviderTopology( + task = AgoraProviderTopology( loop, logger, sys.argv[0], globopts, webapi_opts, confcust, uidservendp, fetchtype, fixed_date ) diff --git a/modules/tasks/agora_topology.py b/modules/tasks/agora_topology.py index f2363899..773e8505 100644 --- a/modules/tasks/agora_topology.py +++ b/modules/tasks/agora_topology.py @@ -5,7 +5,7 @@ from argo_connectors.io.webapi import WebAPI from argo_connectors.parse.agora_topology import ParseAgoraTopo from argo_connectors.tasks.common import write_topo_json as write_json, write_state -from argo_connectors.exceptions import ConnectorError, ConnectorHttpError +from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError, ConnectorError def contains_exception(list): @@ -16,7 +16,7 @@ def contains_exception(list): return (False, None) -class TaskProviderTopology(object): +class AgoraProviderTopology(object): def __init__(self, loop, logger, connector_name, globopts, webapi_opts, confcust, uidservendp, fetchtype, fixed_date): self.loop = loop @@ -71,39 +71,43 @@ async def fetch_data(self, feed): async def run(self): - topofeedproviders = self.confcust.get_topofeedservicegroups() - topofeedresources = self.confcust.get_topofeedendpoints() + try: + topofeedproviders = self.confcust.get_topofeedservicegroups() + topofeedresources = self.confcust.get_topofeedendpoints() - coros = [ - self.fetch_data(topofeedresources), - self.fetch_data(topofeedproviders), - ] + coros = [ + self.fetch_data(topofeedresources), + self.fetch_data(topofeedproviders), + ] - # fetch topology data concurrently in coroutines - fetched_data = await asyncio.gather(*coros, return_exceptions=True) + # fetch topology data concurrently in coroutines + fetched_data = await asyncio.gather(*coros, return_exceptions=True) - exc_raised, exc = contains_exception(fetched_data) - if exc_raised: - raise ConnectorError(repr(exc)) + exc_raised, exc = contains_exception(fetched_data) + if exc_raised: + raise ConnectorError(repr(exc)) - fetched_resources, fetched_providers = fetched_data - if fetched_resources and fetched_providers: - group_providers, group_resources = self.parse_source_topo(fetched_resources, fetched_providers) + fetched_resources, fetched_providers = fetched_data + if fetched_resources and fetched_providers: + group_providers, group_resources = self.parse_source_topo(fetched_resources, fetched_providers) - await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, True) + await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, True) - numgg = len(group_providers) - numge = len(group_resources) + numgg = len(group_providers) + numge = len(group_resources) - # send concurrently to WEB-API in coroutines - if eval(self.globopts['GeneralPublishWebAPI'.lower()]): - await asyncio.gather( - self.send_webapi(self.webapi_opts, group_resources, 'endpoints', self.fixed_date), - self.send_webapi(self.webapi_opts, group_providers, 'groups', self.fixed_date), - loop=self.loop - ) + # send concurrently to WEB-API in coroutines + if eval(self.globopts['GeneralPublishWebAPI'.lower()]): + await asyncio.gather( + self.send_webapi(self.webapi_opts, group_resources, 'endpoints', self.fixed_date), + self.send_webapi(self.webapi_opts, group_providers, 'groups', self.fixed_date), + ) - if eval(self.globopts['GeneralWriteJson'.lower()]): - write_json(self.logger, self.globopts, self.confcust, group_providers, group_resources, self.fixed_date) + if eval(self.globopts['GeneralWriteJson'.lower()]): + write_json(self.logger, self.globopts, self.confcust, group_providers, group_resources, self.fixed_date) - self.logger.info('Customer:' + self.logger.customer + ' Fetched Endpoints:%d' % (numge) + ' Groups(%s):%d' % (self.fetchtype, numgg)) + self.logger.info('Customer:' + self.logger.customer + ' Fetched Endpoints:%d' % (numge) + ' Groups(%s):%d' % (self.fetchtype, numgg)) + + except (ConnectorHttpError, ConnectorParseError, ConnectorError, KeyboardInterrupt) as exc: + self.logger.error(repr(exc)) + await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, False) diff --git a/modules/tasks/flat_topology.py b/modules/tasks/flat_topology.py index 965e2d68..411a9fda 100644 --- a/modules/tasks/flat_topology.py +++ b/modules/tasks/flat_topology.py @@ -9,6 +9,7 @@ from argo_connectors.io.webapi import WebAPI from argo_connectors.mesh.contacts import attach_contacts_topodata from argo_connectors.tasks.common import write_state, write_topo_json as write_json +from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError class TaskFlatTopology(object): @@ -72,33 +73,38 @@ async def send_webapi(self, data, topotype): await webapi.send(data, topotype) async def run(self): - if self._is_feed(self.topofeed): - res = await self.fetch_data() - group_groups, group_endpoints = self.parse_source_topo(res) - contacts = ParseContacts(self.logger, res, self.uidservendp, self.is_csv).get_contacts() - attach_contacts_topodata(self.logger, contacts, group_endpoints) - - elif not self._is_feed(self.topofeed) and not self.is_csv: - try: - with open(self.topofeed) as fp: - js = json.load(fp) - group_groups, group_endpoints = self.parse_source_topo(js) - except IOError as exc: - self.logger.error('Customer:%s : Problem opening %s - %s' % (self.logger.customer, self.topofeed, repr(exc))) - - await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, True) - - numge = len(group_endpoints) - numgg = len(group_groups) - - # send concurrently to WEB-API in coroutines - if eval(self.globopts['GeneralPublishWebAPI'.lower()]): - await asyncio.gather( - self.send_webapi(group_groups, 'groups'), - self.send_webapi(group_endpoints,'endpoints') - ) - - if eval(self.globopts['GeneralWriteJson'.lower()]): - write_json(self.logger, self.globopts, self.confcust, group_groups, group_endpoints, self.fixed_date) - - self.logger.info('Customer:' + self.custname + ' Fetched Endpoints:%d' % (numge) + ' Groups(%s):%d' % (self.fetchtype, numgg)) + try: + if self._is_feed(self.topofeed): + res = await self.fetch_data() + group_groups, group_endpoints = self.parse_source_topo(res) + contacts = ParseContacts(self.logger, res, self.uidservendp, self.is_csv).get_contacts() + attach_contacts_topodata(self.logger, contacts, group_endpoints) + + elif not self._is_feed(self.topofeed) and not self.is_csv: + try: + with open(self.topofeed) as fp: + js = json.load(fp) + group_groups, group_endpoints = self.parse_source_topo(js) + except IOError as exc: + self.logger.error('Customer:%s : Problem opening %s - %s' % (self.logger.customer, self.topofeed, repr(exc))) + + await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, True) + + numge = len(group_endpoints) + numgg = len(group_groups) + + # send concurrently to WEB-API in coroutines + if eval(self.globopts['GeneralPublishWebAPI'.lower()]): + await asyncio.gather( + self.send_webapi(group_groups, 'groups'), + self.send_webapi(group_endpoints,'endpoints') + ) + + if eval(self.globopts['GeneralWriteJson'.lower()]): + write_json(self.logger, self.globopts, self.confcust, group_groups, group_endpoints, self.fixed_date) + + self.logger.info('Customer:' + self.custname + ' Fetched Endpoints:%d' % (numge) + ' Groups(%s):%d' % (self.fetchtype, numgg)) + + except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc: + self.logger.error(repr(exc)) + await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, False) diff --git a/tests/sample-csv-topology.csv b/tests/sample-csv-topology.csv new file mode 100644 index 00000000..bfa4af1c --- /dev/null +++ b/tests/sample-csv-topology.csv @@ -0,0 +1,42 @@ +Service Unique ID,URL,start_time,end_time,SERVICE_TYPE,Severity,CONTACT_EMAIL +neanias_5,https://atmo-flud.neanias.eu/,2/21/2022 8:00,2/22/2022 19:00,Webservice,OUTAGE,some-foo@email.com +neanias_7,https://atmo-stress.neanias.eu/,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_9,https://caesar.neanias.eu:443,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_10,http://caesar-api.neanias.eu/,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_14,https://bathyprocessing.neanias.eu/hub/login,2/21/2022 8:00,2/22/2022 19:00,JupyterHub,OUTAGE,some-foo@email.com +neanias_15,https://uw-map.neanias.eu/,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_16,https://uw-map.neanias.eu/tos,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_17,https://uw-map.neanias.eu/privacy,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_18,https://uw-mos.neanias.eu/api/try,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_20,https://ai-gateway.neanias.eu/hub/login,2/21/2022 8:00,2/22/2022 19:00,JupyterHub,OUTAGE,some-foo@email.com +neanias_21,https://accounting.neanias.eu/api/api/accounting-service/version-info/current,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_25,https://logging.neanias.eu/api/status,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_26,https://minio.ml-models.neanias.eu/minio/health/live,2/21/2022 8:00,2/22/2022 19:00,MinIO,OUTAGE,some-foo@email.com +neanias_27,https://notification.neanias.eu/api/notification/version-info/current,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_28,https://pms.neanias.eu/,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_30,https://vis-gateway.neanias.eu/hub/login,2/21/2022 8:00,2/22/2022 19:00,WebService,OUTAGE,some-foo@email.com +,,,,,, +neanias_5,https://atmo-flud.neanias.eu/,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_7,https://atmo-stress.neanias.eu/,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_9,https://caesar.neanias.eu:443,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_10,http://caesar-api.neanias.eu/,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_14,https://bathyprocessing.neanias.eu/hub/login,3/1/2022 8:00,3/4/2022 19:00,JupyterHub,OUTAGE,some-foo@email.com +neanias_15,https://uw-map.neanias.eu/,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_16,https://uw-map.neanias.eu/tos,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_17,https://uw-map.neanias.eu/privacy,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_18,https://uw-mos.neanias.eu/api/try,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_20,https://ai-gateway.neanias.eu/hub/login,3/1/2022 8:00,3/4/2022 19:00,JupyterHub,OUTAGE,some-foo@email.com +neanias_21,https://accounting.neanias.eu/api/api/accounting-service/version-info/current,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_25,https://logging.neanias.eu/api/status,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_26,https://minio.ml-models.neanias.eu/minio/health/live,3/1/2022 8:00,3/4/2022 19:00,MinIO,OUTAGE,some-foo@email.com +neanias_27,https://notification.neanias.eu/api/notification/version-info/current,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_28,https://pms.neanias.eu/,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_30,https://vis-gateway.neanias.eu/hub/login,3/1/2022 8:00,3/4/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_2,https://files.neanias.eu,2/28/2022 13:00,2/28/2022 15:00,nextcloud,OUTAGE,some-foo@email.com +neanias_1,https://files.dev.neanias.eu,2/28/2022 9:30,2/28/2022 11:00,nextcloud,OUTAGE,some-foo@email.com +neanias_1,https://files.dev.neanias.eu,2/7/2022 8:00,2/17/2022 19:00,nextcloud,OUTAGE,some-foo@email.com +neanias_2,https://files.neanias.eu,2/7/2022 8:00,2/17/2022 19:00,nextcloud,OUTAGE,some-foo@email.com +neanias_4,https://atmo-4cast.neanias.eu/api/forecaststatus/1,2/7/2022 8:00,2/17/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_6,http://atmo-seism.neanias.eu/hub/login,2/7/2022 8:00,2/17/2022 19:00,JupyterHub,OUTAGE,some-foo@email.com +neanias_11,http://vlkb.neanias.eu:8080/vlkb/tap/availability,2/7/2022 8:00,2/17/2022 19:00,WebService,OUTAGE,some-foo@email.com +neanias_29,https://vd-maps.neanias.eu,2/7/2022 8:00,2/17/2022 19:00,WebService,OUTAGE,some-foo@email.com \ No newline at end of file diff --git a/tests/test_asynctasks.py b/tests/test_asynctasks.py index 9baad4c9..e51d7567 100644 --- a/tests/test_asynctasks.py +++ b/tests/test_asynctasks.py @@ -15,6 +15,8 @@ from argo_connectors.tasks.gocdb_downtimes import TaskGocdbDowntimes from argo_connectors.tasks.vapor_weights import TaskVaporWeights from argo_connectors.tasks.webapi_metricprofile import TaskWebApiMetricProfile +from argo_connectors.tasks.flat_topology import TaskFlatTopology +from argo_connectors.tasks.agora_topology import AgoraProviderTopology from argo_connectors.parse.base import ParseHelpers @@ -101,7 +103,7 @@ async def test_failedNextCursor(self, mock_httpget, mock_fetchldap, class TestFindNextPagingCursorCount(unittest.TestCase): def setUp(self): self.logger = mock.MagicMock() - with open('../tests/sample-topofeedpaging.xml') as tf: + with open('tests/sample-topofeedpaging.xml') as tf: self.res = tf.read() def test_count_n_cursor(self): @@ -707,4 +709,134 @@ async def test_StepsFailedRun(self, mock_writestate): self.assertFalse(mock_writestate.call_args[0][6]) self.assertTrue(self.webapi_metricprofile.logger.error.called) self.assertTrue(self.webapi_metricprofile.logger.error.call_args[0][0], repr( + ConnectorHttpError('fetch_data failed'))) + +class TopologyCsv(unittest.TestCase): + def setUp(self): + self.loop = asyncio.get_event_loop() + with open('tests/sample-csv-topology.csv') as tf: + self.res = tf.read() + + logger = mock.Mock() + mocked_globopts = dict(generalpublishwebapi='True', + generalwritejson='True', + outputdowntimes='downtimes_DATE.json', + inputstatesavedir='/some/mock/path/', + inputstatedays=3 + ) + globopts = mocked_globopts + webapi_opts = mock.Mock() + confcust = mock.Mock() + confcust.send_empty.return_value = False + confcust.get_customers.return_value = ['CUSTOMERFOO', 'CUSTOMERBAR'] + custname = CUSTOMER_NAME + topofeed = 'https://topo-csv-foo.com/api/fetch' + fetchtype = 'foo-groups' + + self.flat_topology = TaskFlatTopology(self.loop, logger, 'test_asynctasks_topology_csv', globopts, webapi_opts, + confcust, custname, topofeed, fetchtype, fixed_date=None, + uidservendp=True, is_csv=True) + + @mock.patch('argo_connectors.tasks.flat_topology.write_json') + @mock.patch('argo_connectors.tasks.flat_topology.write_state') + @async_test + async def test_StepsSuccessRun(self, mock_writestate, mock_writejson): + self.flat_topology.fetch_data = mock.AsyncMock() + self.flat_topology.fetch_data.return_value = self.res + self.flat_topology.parse_source_topo = mock.MagicMock() + self.flat_topology.parse_source_topo.return_value = {'type': 'FOO_PROJECT'}, { + 'type': 'FOO_SERVICEGROUPS', 'group': 'mock_group'} + self.flat_topology.send_webapi = mock.AsyncMock() + await self.flat_topology.run() + self.assertTrue(self.flat_topology.fetch_data.called) + self.assertTrue(self.flat_topology.parse_source_topo.called) + self.assertEqual( + mock_writestate.call_args[0][0], 'test_asynctasks_topology_csv') + self.assertTrue(mock_writestate.call_args[0][4]) + self.assertTrue(mock_writejson.called, True) + self.assertTrue(self.flat_topology.send_webapi.called) + self.assertTrue(self.flat_topology.logger.info.called) + + @mock.patch('argo_connectors.tasks.flat_topology.write_state') + @async_test + async def test_FailedSuccessRun(self, mock_writestate): + self.flat_topology.fetch_data = mock.AsyncMock() + self.flat_topology.fetch_data.side_effect = [ + ConnectorHttpError('fetch_data failed')] + self.flat_topology.parse_source_topo = mock.MagicMock() + self.flat_topology.send_webapi = mock.AsyncMock() + await self.flat_topology.run() + self.assertTrue(self.flat_topology.fetch_data.called) + self.assertFalse(self.flat_topology.parse_source_topo.called) + self.assertEqual( + mock_writestate.call_args[0][0], 'test_asynctasks_topology_csv') + self.assertFalse(mock_writestate.call_args[0][4]) + self.assertTrue(self.flat_topology.logger.error.called) + self.assertTrue(self.flat_topology.logger.error.call_args[0][0], repr( + ConnectorHttpError('fetch_data failed'))) + + +class TopologyAgora(unittest.TestCase): + def setUp(self): + self.loop = asyncio.get_event_loop() + logger = mock.Mock() + logger.customer = CUSTOMER_NAME + mocked_globopts = dict(generalpublishwebapi='True', + generalwritejson='True', + outputdowntimes='downtimes_DATE.json', + inputstatesavedir='/some/mock/path/', + inputstatedays=3 + ) + globopts = mocked_globopts + webapi_opts = mock.Mock() + confcust = mock.Mock() + confcust.send_empty.return_value = False + confcust.get_customers.return_value = ['CUSTOMERFOO', 'CUSTOMERBAR'] + confcust.get_topofeedservicegroups.return_value = 'https://agora.ni4os.eu/api/v2/public/providers/' + confcust.get_topofeedendpoints.return_value = 'https://agora.ni4os.eu/api/v2/public/resources/' + + uidservendp = False + fetchtype = 'foo-servicegroups' + + self.agora_topology = AgoraProviderTopology(self.loop, logger, 'test_asynctasks_agora_topology', globopts, webapi_opts, + confcust, uidservendp, fetchtype, fixed_date=None) + + @mock.patch('argo_connectors.tasks.agora_topology.contains_exception') + @mock.patch('argo_connectors.tasks.agora_topology.write_json') + @mock.patch('argo_connectors.tasks.agora_topology.write_state') + @async_test + async def test_StepsSuccessRun(self, mock_writestate, mock_writejson, mock_contains_exception): + self.agora_topology.fetch_data = mock.AsyncMock() + self.agora_topology.fetch_data.side_effect = ['topology-ok'] + mock_contains_exception.return_value = False, None + self.agora_topology.parse_source_topo = mock.MagicMock() + self.agora_topology.parse_source_topo.return_value = [{'group': 'Foo_Providers', 'type': 'FOO_PROVIDERS', 'subgroup': 'foo_subgroup', 'tags': {'info_ext_catalog_id': 'foo-id'}}], [ + {'group': 'some_group', 'type': 'SERVICEGROUPS', 'service': 'catalog.foo.some', 'hostname': 'agora.foo.eu_mock', 'tags': {'hostname': 'agora.foo.com'}}] + self.agora_topology.send_webapi = mock.AsyncMock() + await self.agora_topology.run() + self.assertTrue(self.agora_topology.fetch_data.called) + self.assertTrue(self.agora_topology.parse_source_topo.called) + self.assertEqual( + mock_writestate.call_args[0][0], 'test_asynctasks_agora_topology') + self.assertTrue(mock_writestate.call_args[0][4]) + self.assertTrue(mock_writejson.called, True) + self.assertTrue(self.agora_topology.send_webapi.called) + self.assertTrue(self.agora_topology.logger.info.called) + + @mock.patch('argo_connectors.tasks.agora_topology.write_state') + @async_test + async def test_FailedSuccessRun(self, mock_writestate): + self.agora_topology.fetch_data = mock.AsyncMock() + self.agora_topology.fetch_data.side_effect = [ + ConnectorHttpError('fetch_data failed')] + self.agora_topology.parse_source_topo = mock.MagicMock() + self.agora_topology.send_webapi = mock.AsyncMock() + await self.agora_topology.run() + self.assertTrue(self.agora_topology.fetch_data.called) + self.assertFalse(self.agora_topology.parse_source_topo.called) + self.assertEqual( + mock_writestate.call_args[0][0], 'test_asynctasks_agora_topology') + self.assertFalse(mock_writestate.call_args[0][4]) + self.assertTrue(self.agora_topology.logger.error.called) + self.assertTrue(self.agora_topology.logger.error.call_args[0][0], repr( ConnectorHttpError('fetch_data failed'))) \ No newline at end of file