Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARGO-3713 Tests for async tasks of each connector #270

Open
wants to merge 2 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions exec/topology-agora-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from argo_connectors.log import Logger
from argo_connectors.config import Global, CustomerConf
from argo_connectors.utils import date_check
from argo_connectors.tasks.agora_topology import TaskProviderTopology
from argo_connectors.tasks.agora_topology import AgoraProviderTopology
from argo_connectors.tasks.common import write_state


Expand Down Expand Up @@ -64,7 +64,7 @@ def main():
asyncio.set_event_loop(loop)

try:
task = TaskProviderTopology(
task = AgoraProviderTopology(
loop, logger, sys.argv[0], globopts, webapi_opts, confcust,
uidservendp, fetchtype, fixed_date
)
Expand Down
62 changes: 33 additions & 29 deletions modules/tasks/agora_topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from argo_connectors.io.webapi import WebAPI
from argo_connectors.parse.agora_topology import ParseAgoraTopo
from argo_connectors.tasks.common import write_topo_json as write_json, write_state
from argo_connectors.exceptions import ConnectorError, ConnectorHttpError
from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError, ConnectorError


def contains_exception(list):
Expand All @@ -16,7 +16,7 @@ def contains_exception(list):
return (False, None)


class TaskProviderTopology(object):
class AgoraProviderTopology(object):
def __init__(self, loop, logger, connector_name, globopts, webapi_opts,
confcust, uidservendp, fetchtype, fixed_date):
self.loop = loop
Expand Down Expand Up @@ -71,39 +71,43 @@ async def fetch_data(self, feed):


async def run(self):
topofeedproviders = self.confcust.get_topofeedservicegroups()
topofeedresources = self.confcust.get_topofeedendpoints()
try:
topofeedproviders = self.confcust.get_topofeedservicegroups()
topofeedresources = self.confcust.get_topofeedendpoints()

coros = [
self.fetch_data(topofeedresources),
self.fetch_data(topofeedproviders),
]
coros = [
self.fetch_data(topofeedresources),
self.fetch_data(topofeedproviders),
]

# fetch topology data concurrently in coroutines
fetched_data = await asyncio.gather(*coros, return_exceptions=True)
# fetch topology data concurrently in coroutines
fetched_data = await asyncio.gather(*coros, return_exceptions=True)

exc_raised, exc = contains_exception(fetched_data)
if exc_raised:
raise ConnectorError(repr(exc))
exc_raised, exc = contains_exception(fetched_data)
if exc_raised:
raise ConnectorError(repr(exc))

fetched_resources, fetched_providers = fetched_data
if fetched_resources and fetched_providers:
group_providers, group_resources = self.parse_source_topo(fetched_resources, fetched_providers)
fetched_resources, fetched_providers = fetched_data
if fetched_resources and fetched_providers:
group_providers, group_resources = self.parse_source_topo(fetched_resources, fetched_providers)

await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, True)
await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, True)

numgg = len(group_providers)
numge = len(group_resources)
numgg = len(group_providers)
numge = len(group_resources)

# send concurrently to WEB-API in coroutines
if eval(self.globopts['GeneralPublishWebAPI'.lower()]):
await asyncio.gather(
self.send_webapi(self.webapi_opts, group_resources, 'endpoints', self.fixed_date),
self.send_webapi(self.webapi_opts, group_providers, 'groups', self.fixed_date),
loop=self.loop
)
# send concurrently to WEB-API in coroutines
if eval(self.globopts['GeneralPublishWebAPI'.lower()]):
await asyncio.gather(
self.send_webapi(self.webapi_opts, group_resources, 'endpoints', self.fixed_date),
self.send_webapi(self.webapi_opts, group_providers, 'groups', self.fixed_date),
)

if eval(self.globopts['GeneralWriteJson'.lower()]):
write_json(self.logger, self.globopts, self.confcust, group_providers, group_resources, self.fixed_date)
if eval(self.globopts['GeneralWriteJson'.lower()]):
write_json(self.logger, self.globopts, self.confcust, group_providers, group_resources, self.fixed_date)

self.logger.info('Customer:' + self.logger.customer + ' Fetched Endpoints:%d' % (numge) + ' Groups(%s):%d' % (self.fetchtype, numgg))
self.logger.info('Customer:' + self.logger.customer + ' Fetched Endpoints:%d' % (numge) + ' Groups(%s):%d' % (self.fetchtype, numgg))

except (ConnectorHttpError, ConnectorParseError, ConnectorError, KeyboardInterrupt) as exc:
self.logger.error(repr(exc))
await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, False)
66 changes: 36 additions & 30 deletions modules/tasks/flat_topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from argo_connectors.io.webapi import WebAPI
from argo_connectors.mesh.contacts import attach_contacts_topodata
from argo_connectors.tasks.common import write_state, write_topo_json as write_json
from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError


class TaskFlatTopology(object):
Expand Down Expand Up @@ -72,33 +73,38 @@ async def send_webapi(self, data, topotype):
await webapi.send(data, topotype)

async def run(self):
if self._is_feed(self.topofeed):
res = await self.fetch_data()
group_groups, group_endpoints = self.parse_source_topo(res)
contacts = ParseContacts(self.logger, res, self.uidservendp, self.is_csv).get_contacts()
attach_contacts_topodata(self.logger, contacts, group_endpoints)

elif not self._is_feed(self.topofeed) and not self.is_csv:
try:
with open(self.topofeed) as fp:
js = json.load(fp)
group_groups, group_endpoints = self.parse_source_topo(js)
except IOError as exc:
self.logger.error('Customer:%s : Problem opening %s - %s' % (self.logger.customer, self.topofeed, repr(exc)))

await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, True)

numge = len(group_endpoints)
numgg = len(group_groups)

# send concurrently to WEB-API in coroutines
if eval(self.globopts['GeneralPublishWebAPI'.lower()]):
await asyncio.gather(
self.send_webapi(group_groups, 'groups'),
self.send_webapi(group_endpoints,'endpoints')
)

if eval(self.globopts['GeneralWriteJson'.lower()]):
write_json(self.logger, self.globopts, self.confcust, group_groups, group_endpoints, self.fixed_date)

self.logger.info('Customer:' + self.custname + ' Fetched Endpoints:%d' % (numge) + ' Groups(%s):%d' % (self.fetchtype, numgg))
try:
if self._is_feed(self.topofeed):
res = await self.fetch_data()
group_groups, group_endpoints = self.parse_source_topo(res)
contacts = ParseContacts(self.logger, res, self.uidservendp, self.is_csv).get_contacts()
attach_contacts_topodata(self.logger, contacts, group_endpoints)

elif not self._is_feed(self.topofeed) and not self.is_csv:
try:
with open(self.topofeed) as fp:
js = json.load(fp)
group_groups, group_endpoints = self.parse_source_topo(js)
except IOError as exc:
self.logger.error('Customer:%s : Problem opening %s - %s' % (self.logger.customer, self.topofeed, repr(exc)))

await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, True)

numge = len(group_endpoints)
numgg = len(group_groups)

# send concurrently to WEB-API in coroutines
if eval(self.globopts['GeneralPublishWebAPI'.lower()]):
await asyncio.gather(
self.send_webapi(group_groups, 'groups'),
self.send_webapi(group_endpoints,'endpoints')
)

if eval(self.globopts['GeneralWriteJson'.lower()]):
write_json(self.logger, self.globopts, self.confcust, group_groups, group_endpoints, self.fixed_date)

self.logger.info('Customer:' + self.custname + ' Fetched Endpoints:%d' % (numge) + ' Groups(%s):%d' % (self.fetchtype, numgg))

except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc:
self.logger.error(repr(exc))
await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, False)
41 changes: 23 additions & 18 deletions modules/tasks/gocdb_downtimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from argo_connectors.parse.gocdb_downtimes import ParseDowntimes
from argo_connectors.io.webapi import WebAPI
from argo_connectors.tasks.common import write_state, write_downtimes_json as write_json

from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError

class TaskGocdbDowntimes(object):
def __init__(self, loop, logger, connector_name, globopts, auth_opts,
Expand Down Expand Up @@ -70,24 +70,29 @@ async def send_webapi(self, dts):
await webapi.send(dts, downtimes_component=True)

async def run(self):
# we don't have multiple tenant definitions in one
# customer file so we can safely assume one tenant/customer
write_empty = self.confcust.send_empty(self.connector_name)
if not write_empty:
res = await self.fetch_data()
dts = self.parse_source(res)
else:
dts = []
try:
# we don't have multiple tenant definitions in one
# customer file so we can safely assume one tenant/customer
write_empty = self.confcust.send_empty(self.connector_name)
if not write_empty:
res = await self.fetch_data()
dts = self.parse_source(res)
else:
dts = []

await write_state(self.connector_name, self.globopts, self.confcust, self.timestamp, True)
await write_state(self.connector_name, self.globopts, self.confcust, self.timestamp, True)

if eval(self.globopts['GeneralPublishWebAPI'.lower()]):
await self.send_webapi(dts)
if eval(self.globopts['GeneralPublishWebAPI'.lower()]):
await self.send_webapi(dts)

if dts or write_empty:
cust = list(self.confcust.get_customers())[0]
self.logger.info('Customer:%s Fetched Date:%s Endpoints:%d' %
(self.confcust.get_custname(cust), self.targetdate, len(dts)))
if dts or write_empty:
cust = list(self.confcust.get_customers())[0]
self.logger.info('Customer:%s Fetched Date:%s Endpoints:%d' %
(self.confcust.get_custname(cust), self.targetdate, len(dts)))

if eval(self.globopts['GeneralWriteJson'.lower()]):
write_json(self.logger, self.globopts, self.confcust, dts, self.timestamp)
if eval(self.globopts['GeneralWriteJson'.lower()]):
write_json(self.logger, self.globopts, self.confcust, dts, self.timestamp)

except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc:
self.logger.error(repr(exc))
await write_state(self.connector_name, self.globopts, self.confcust, self.timestamp, False)
71 changes: 38 additions & 33 deletions modules/tasks/vapor_weights.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from argo_connectors.io.webapi import WebAPI
from argo_connectors.parse.vapor import ParseWeights
from argo_connectors.tasks.common import write_weights_metricprofile_state as write_state, write_weights_json as write_json

from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError

class TaskVaporWeights(object):
def __init__(self, loop, logger, connector_name, globopts, confcust, feed,
Expand Down Expand Up @@ -57,35 +57,40 @@ async def send_webapi(self, weights, webapi_opts, job):
await webapi.send(weights)

async def run(self):
for job, cust in self.jobcust:
self.logger.customer = self.confcust.get_custname(cust)
self.logger.job = job

write_empty = self.confcust.send_empty(self.connector_name, cust)

if write_empty:
weights = []
else:
res = await self.fetch_data()
weights = self.parse_source(res)

webapi_opts = self.get_webapi_opts(cust, job)

if eval(self.globopts['GeneralPublishWebAPI'.lower()]):
await self.send_webapi(weights, webapi_opts, job)

if eval(self.globopts['GeneralWriteJson'.lower()]):
write_json(self.logger, self.globopts, cust, job,
self.confcust, self.fixed_date, weights)

await write_state(self.connector_name, self.globopts, cust, job, self.confcust, self.fixed_date, True)

if weights or write_empty:
custs = set([cust for job, cust in self.jobcust])
for cust in custs:
jobs = [job for job, lcust in self.jobcust if cust == lcust]
self.logger.info('Customer:%s Jobs:%s Sites:%d' %
(self.confcust.get_custname(cust), jobs[0]
if len(jobs) == 1 else
'({0})'.format(','.join(jobs)),
len(weights)))
try:
for job, cust in self.jobcust:
self.logger.customer = self.confcust.get_custname(cust)
self.logger.job = job

write_empty = self.confcust.send_empty(self.connector_name, cust)

if write_empty:
weights = []
else:
res = await self.fetch_data()
weights = self.parse_source(res)

webapi_opts = self.get_webapi_opts(cust, job)

if eval(self.globopts['GeneralPublishWebAPI'.lower()]):
await self.send_webapi(weights, webapi_opts, job)

if eval(self.globopts['GeneralWriteJson'.lower()]):
write_json(self.logger, self.globopts, cust, job,
self.confcust, self.fixed_date, weights)

await write_state(self.connector_name, self.globopts, cust, job, self.confcust, self.fixed_date, True)

if weights or write_empty:
custs = set([cust for job, cust in self.jobcust])
for cust in custs:
jobs = [job for job, lcust in self.jobcust if cust == lcust]
self.logger.info('Customer:%s Jobs:%s Sites:%d' %
(self.confcust.get_custname(cust), jobs[0]
if len(jobs) == 1 else
'({0})'.format(','.join(jobs)),
len(weights)))

except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc:
self.logger.error(repr(exc))
await write_state(self.connector_name, self.globopts, cust, job, self.confcust, self.fixed_date, False)
28 changes: 14 additions & 14 deletions modules/tasks/webapi_metricprofile.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ def parse_source(self, res, profiles):
return metric_profiles

async def run(self):
for job in self.confcust.get_jobs(self.cust):
self.logger.customer = self.confcust.get_custname(self.cust)
self.logger.job = job
try:
for job in self.confcust.get_jobs(self.cust):
self.logger.customer = self.confcust.get_custname(self.cust)
self.logger.job = job

profiles = self.confcust.get_profiles(job)
webapi_custopts = self.confcust.get_webapiopts(self.cust)
webapi_opts = self.cglob.merge_opts(webapi_custopts, 'webapi')
webapi_complete, missopt = self.cglob.is_complete(webapi_opts, 'webapi')
profiles = self.confcust.get_profiles(job)
webapi_custopts = self.confcust.get_webapiopts(self.cust)
webapi_opts = self.cglob.merge_opts(webapi_custopts, 'webapi')
webapi_complete, missopt = self.cglob.is_complete(webapi_opts, 'webapi')

if not webapi_complete:
self.logger.error('Customer:%s Job:%s %s options incomplete, missing %s' % (self.logger.customer, self.logger.job, 'webapi', ' '.join(missopt)))
continue
if not webapi_complete:
self.logger.error('Customer:%s Job:%s %s options incomplete, missing %s' % (self.logger.customer, self.logger.job, 'webapi', ' '.join(missopt)))
continue

try:
res = await self.fetch_data(webapi_opts['webapihost'], webapi_opts['webapitoken'])

fetched_profiles = self.parse_source(res, profiles)
Expand All @@ -57,6 +57,6 @@ async def run(self):

self.logger.info('Customer:' + self.logger.customer + ' Job:' + job + ' Profiles:%s Tuples:%d' % (', '.join(profiles), len(fetched_profiles)))

except (ConnectorHttpError, KeyboardInterrupt, ConnectorParseError) as exc:
self.logger.error(repr(exc))
await write_state(self.connector_name, self.globopts, self.cust, job, self.confcust, self.fixed_date, False)
except (ConnectorHttpError, KeyboardInterrupt, ConnectorParseError) as exc:
self.logger.error(repr(exc))
await write_state(self.connector_name, self.globopts, self.cust, job, self.confcust, self.fixed_date, False)
Loading