diff --git a/exec/topology-gocdb-connector.py b/exec/topology-gocdb-connector.py index 212e8813..02ce4e32 100755 --- a/exec/topology-gocdb-connector.py +++ b/exec/topology-gocdb-connector.py @@ -8,6 +8,7 @@ import uvloop from argo_connectors.singleton_config import ConfigClass +from argo_connectors.config import Global, CustomerConf from argo_connectors.exceptions import ConnectorError, ConnectorParseError, ConnectorHttpError from argo_connectors.log import Logger from argo_connectors.tasks.common import write_state @@ -58,17 +59,95 @@ def main(): help='write data for this date', type=str, required=False) args = parser.parse_args() + # logger = Logger(os.path.basename(sys.argv[0])) + + # fixed_date = None + # if args.date and date_check(args.date): + # fixed_date = args.date + + # confpath = args.gloconf[0] if args.gloconf else None + # cglob = Global(sys.argv[0], confpath) + # globopts = cglob.parse() + # pass_extensions = eval(globopts['GeneralPassExtensions'.lower()]) + + # confpath = args.custconf[0] if args.custconf else None + # confcust = CustomerConf(sys.argv[0], confpath) + # confcust.parse() + # confcust.make_dirstruct() + # confcust.make_dirstruct(globopts['InputStateSaveDir'.lower()]) + + # topofeed = confcust.get_topofeed() + # topofeedpaging = confcust.get_topofeedpaging() + # uidservendp = confcust.get_uidserviceendpoints() + # topofetchtype = confcust.get_topofetchtype() + # custname = confcust.get_custname() + # #logger.customer = custname #TODO: VIDIT DAL NAM TREBA + + # auth_custopts = confcust.get_authopts() + # auth_opts = cglob.merge_opts(auth_custopts, 'authentication') + + # auth_complete, missing = cglob.is_complete(auth_opts, 'authentication') + # if not auth_complete: + # logger.error('%s options incomplete, missing %s' % + # ('authentication', ' '.join(missing))) + # raise SystemExit(1) + + # bdii_opts = get_bdii_opts(confcust) + # # print("bdii_opts: ", bdii_opts) # za EGI: {'bdii': 'True', 'bdiihost': 'bdii.egi.cro-ngi.hr', 'bdiiport': '2170', 'bdiiquerybase': 'o=grid', 'bdiiqueryfiltersrm': '(&(objectClass=GlueService)(|(GlueServiceType=srm_v1)(GlueServiceType=srm)))', 'bdiiqueryattributessrm': 'GlueServiceEndpoint', 'bdiiqueryfiltersepath': '(objectClass=GlueSATop)', 'bdiiqueryattributessepath': 'GlueVOInfoAccessControlBaseRule GlueVOInfoPath'} + + # webapi_opts = get_webapi_opts(cglob, confcust) + # # print("webapi_opts: ", webapi_opts) # za EGI: {'webapitoken': '505c3be00e9e30400b72dbfb0c06268aa73f694b', 'webapihost': 'api.devel.argo.grnet.gr'} + + # toposcope = confcust.get_toposcope() + # topofeedendpoints = confcust.get_topofeedendpoints() + # topofeedservicegroups = confcust.get_topofeedservicegroups() + # topofeedsites = confcust.get_topofeedsites() + # notiflag = confcust.get_notif_flag() + + # if toposcope: + # SERVICE_ENDPOINTS_PI = topofeedendpoints + toposcope + # SERVICE_GROUPS_PI = topofeedservicegroups + toposcope + # SITES_PI = topofeedsites + toposcope + + # else: + # SERVICE_ENDPOINTS_PI = topofeedendpoints + # SERVICE_GROUPS_PI = topofeedservicegroups + # SITES_PI = topofeedsites + + # loop = uvloop.new_event_loop() + # asyncio.set_event_loop(loop) + + + ############################################################################################### + loop = uvloop.new_event_loop() asyncio.set_event_loop(loop) - + config = ConfigClass(args) fixed_date = config.get_fixed_date() + ############################################################################################### + + try: - task = TaskGocdbTopology(config, loop) + # task = TaskGocdbTopology( + # loop, logger, sys.argv[0], SERVICE_ENDPOINTS_PI, SERVICE_GROUPS_PI, + # SITES_PI, globopts, auth_opts, webapi_opts, bdii_opts, confcust, + # custname, topofeed, topofetchtype, fixed_date, uidservendp, + # pass_extensions, topofeedpaging, notiflag) + + ############################################################################################### + + + task = TaskGocdbTopology(config, loop) #TODO: OVAKO TREBA IZGLEDATI + + + ############################################################################################### loop.run_until_complete(task.run()) + + except (ConnectorError, ConnectorParseError, ConnectorHttpError, KeyboardInterrupt) as exc: logger.error(repr(exc)) loop.run_until_complete( @@ -81,3 +160,29 @@ def main(): if __name__ == '__main__': main() + + + + + + + +# print("loop: ", loop) # +# print("logger: ", logger) # +# print("sys.argv[0]: ", sys.argv[0]) # /usr/libexec/argo-connectors/topology-gocdb-connector.py +# print("SERVICE_ENDPOINTS_PI: ", SERVICE_ENDPOINTS_PI) # https://goc-sdc.argo.grnet.gr//gocdbpi/private/?method=get_service_endpoint&scope= +# print("SERVICE_GROUPS_PI: ", SERVICE_GROUPS_PI) # https://goc-sdc.argo.grnet.gr//gocdbpi/private/?method=get_service_group&scope= +# print("SITES_PI: ", SITES_PI) # https://goc-sdc.argo.grnet.gr//gocdbpi/private/?method=get_site&scope= +# print("globopts: ", globopts) # {'generalwritejson': 'True', 'generalpublishwebapi': 'False', 'generalpassextensions': 'True', 'generalcompressjson': 'False', 'authenticationhostkey': '/etc/grid-security/hostkey.pem', 'authenticationhostcert': '/etc/grid-security/hostcert.pem', 'authenticationcapath': '/etc/grid-security/certificates', 'authenticationcafile': '/etc/pki/tls/certs/ca-bundle.crt', 'authenticationverifyservercert': 'True', 'authenticationuseplainhttpauth': 'False', 'authenticationhttpuser': 'xxxx', 'authenticationhttppass': 'xxxx', 'connectiontimeout': '180', 'connectionretry': '60', 'connectionsleepretry': '60', 'connectionretryrandom': 'True', 'connectionsleeprandomretrymax': '300', 'inputstatesavedir': '/var/lib/argo-connectors/states/', 'inputstatedays': '3', 'webapihost': 'api.devel.argo.grnet.gr', 'outputtopologygroupofendpoints': 'group_endpoints_DATE.json', 'outputtopologygroupofgroups': 'group_groups_DATE.json'} +# print("auth_opts: ", auth_opts) # {'authenticationhostkey': '/etc/grid-security/hostkey.pem', 'authenticationhostcert': '/etc/grid-security/hostcert.pem', 'authenticationcapath': '/etc/grid-security/certificates', 'authenticationcafile': '/etc/pki/tls/certs/ca-bundle.crt', 'authenticationverifyservercert': 'True', 'authenticationuseplainhttpauth': 'False', 'authenticationhttpuser': 'xxxx', 'authenticationhttppass': 'xxxx'} +# print("webapi_opts: ", webapi_opts) # {'webapitoken': '4473153af6c67a650a74d81d367e9e83f70e2b7b', 'webapihost': 'api.devel.argo.grnet.gr'} +# print("bdii_opts: ", bdii_opts) # None +# print("confcust: ", confcust) # +# print("custname: ", custname) # SDC +# print("topofeed: ", topofeed) # https://goc-sdc.argo.grnet.gr/ +# print("topofetchtype: ", topofetchtype) # ['sites', 'servicegroups'] +# print("fixed_date: ", fixed_date) # None +# print("uidservendp: ", uidservendp) # False +# print("pass_extensions: ", pass_extensions) # True +# print("topofeedpaging: ", topofeedpaging) # False +# print("notiflag: ", notiflag) # True \ No newline at end of file diff --git a/exec/weights-vapor-connector.py b/exec/weights-vapor-connector.py index b5aa728c..bb58dec9 100755 --- a/exec/weights-vapor-connector.py +++ b/exec/weights-vapor-connector.py @@ -7,12 +7,13 @@ import asyncio import uvloop +from argo_connectors.singleton_config import ConfigClass from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError from argo_connectors.tasks.vapor_weights import TaskVaporWeights from argo_connectors.tasks.common import write_weights_metricprofile_state as write_state from argo_connectors.log import Logger -from argo_connectors.config import Global, CustomerConf +#from argo_connectors.config import Global, CustomerConf from argo_connectors.utils import date_check globopts = {} @@ -20,7 +21,7 @@ def main(): - global logger, globopts + #global logger, globopts parser = argparse.ArgumentParser(description="""Fetch weights information from Gstat provider for every job listed in customer.conf""") parser.add_argument('-c', dest='custconf', nargs=1, metavar='customer.conf', @@ -33,22 +34,36 @@ def main(): logger = Logger(os.path.basename(sys.argv[0])) - fixed_date = None - if args.date and date_check(args.date): - fixed_date = args.date + # fixed_date = None + # if args.date and date_check(args.date): + # fixed_date = args.date - confpath = args.gloconf[0] if args.gloconf else None - cglob = Global(sys.argv[0], confpath) - globopts = cglob.parse() + # confpath = args.gloconf[0] if args.gloconf else None + # cglob = Global(sys.argv[0], confpath) + # globopts = cglob.parse() - confpath = args.custconf[0] if args.custconf else None - confcust = CustomerConf(sys.argv[0], confpath) - confcust.parse() - confcust.make_dirstruct() - confcust.make_dirstruct(globopts['InputStateSaveDir'.lower()]) + # confpath = args.custconf[0] if args.custconf else None + # confcust = CustomerConf(sys.argv[0], confpath) + # confcust.parse() + # confcust.make_dirstruct() + # confcust.make_dirstruct(globopts['InputStateSaveDir'.lower()]) + + # VAPORPI = confcust.get_vaporpi() + # feeds = confcust.get_mapfeedjobs(sys.argv[0], deffeed=VAPORPI) + + ##################################################################### + + config = ConfigClass(args) + + fixed_date = config.get_fixed_date() + globopts, pass_extensions, cglob = config.get_globopts_n_pass_ext() + confcust = config.get_confcust(globopts) + VAPORPI = config.vaporrpi_data(confcust) + feeds = config.get_feeds(confcust, VAPORPI) + + + ##################################################################### - VAPORPI = confcust.get_vaporpi() - feeds = confcust.get_mapfeedjobs(sys.argv[0], deffeed=VAPORPI) loop = uvloop.new_event_loop() asyncio.set_event_loop(loop) @@ -64,9 +79,11 @@ def main(): logger.customer = customers try: - task = TaskVaporWeights(loop, logger, sys.argv[0], globopts, - confcust, VAPORPI, jobcust, cglob, - fixed_date) + # task = TaskVaporWeights(loop, logger, sys.argv[0], globopts, + # confcust, VAPORPI, jobcust, cglob, + # fixed_date) + + task = TaskVaporWeights(config, loop, jobcust) loop.run_until_complete(task.run()) except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc: diff --git a/modules/singleton_config.py b/modules/singleton_config.py index 88e44283..0fd9ae2a 100755 --- a/modules/singleton_config.py +++ b/modules/singleton_config.py @@ -70,7 +70,7 @@ def get_globopts_n_pass_ext(self): cglob = Global(sys.argv[0], confpath) globopts = cglob.parse() pass_extensions = eval(globopts['GeneralPassExtensions'.lower()]) - return globopts, pass_extensions + return globopts, pass_extensions, cglob def get_confcust(self, globopts): confpath = self.args.custconf[0] if self.args.custconf else None @@ -146,4 +146,10 @@ def service_data(self, confcust): return SERVICE_ENDPOINTS_PI, SERVICE_GROUPS_PI, SITES_PI - # return loop, logger, sys.argv[0], SERVICE_ENDPOINTS_PI, SERVICE_GROUPS_PI, SITES_PI, globopts, auth_opts, webapi_opts, bdii_opts, confcust, custname, topofeed, topofetchtype, fixed_date, uidservendp, pass_extensions, topofeedpaging, notiflag + def vaporrpi_data(self, confcust): + VAPORPI = confcust.get_vaporpi() + return VAPORPI + + def get_feeds(self, confcust, VAPORPI): + feeds = confcust.get_mapfeedjobs(sys.argv[0], deffeed=VAPORPI) + return feeds diff --git a/modules/tasks/gocdb_topology.py b/modules/tasks/gocdb_topology.py index 5cdb8248..a98f5107 100644 --- a/modules/tasks/gocdb_topology.py +++ b/modules/tasks/gocdb_topology.py @@ -8,6 +8,7 @@ from concurrent.futures import ProcessPoolExecutor from functools import partial +from argo_connectors.singleton_config import ConfigClass from argo_connectors.parse.gocdb_topology import ParseServiceGroups, ParseServiceEndpoints, ParseSites from argo_connectors.parse.gocdb_contacts import ParseServiceEndpointContacts, ParseSitesWithContacts, ParseServiceGroupWithContacts from argo_connectors.exceptions import ConnectorError, ConnectorParseError, ConnectorHttpError @@ -93,7 +94,7 @@ def _parse(self): return count, cursor -class TaskParseTopology(object): +class TaskParseTopology(): def __init__(self, logger, custname, uidservendp, pass_extensions, notiflag): self.logger = logger @@ -156,21 +157,54 @@ def parse_servicegroups(logger, custname, uidservendp, pass_extensions, class TaskParseContacts(object): def __init__(self, logger): self.logger = logger + #print("self.logger: ", self.logger) def parse_siteswith_contacts(self, res): contacts = ParseSitesWithContacts(self.logger, res) + #print("contacts1: ", contacts) return contacts.get_contacts() def parse_servicegroups_contacts(self, res): contacts = ParseServiceGroupWithContacts(self.logger, res) + #print("contacts2: ", contacts) return contacts.get_contacts() def parse_serviceendpoints_contacts(self, res): contacts = ParseServiceEndpointContacts(self.logger, res) + return contacts.get_contacts() class TaskGocdbTopology(TaskParseContacts, TaskParseTopology): + # def __init__(self, loop, logger, connector_name, SERVICE_ENDPOINTS_PI, + # SERVICE_GROUPS_PI, SITES_PI, globopts, auth_opts, webapi_opts, + # bdii_opts, confcust, custname, topofeed, topofetchtype, + # fixed_date, uidservendp, pass_extensions, topofeedpaging, + # notiflag): + # TaskParseTopology.__init__(self, logger, custname, uidservendp, + # pass_extensions, notiflag) + # super(TaskGocdbTopology, self).__init__(logger) + # self.loop = loop + # self.logger = logger + # self.connector_name = connector_name + # self.SERVICE_ENDPOINTS_PI = SERVICE_ENDPOINTS_PI + # self.SERVICE_GROUPS_PI = SERVICE_GROUPS_PI + # self.SITES_PI = SITES_PI + # self.globopts = globopts + # self.auth_opts = auth_opts + # self.webapi_opts = webapi_opts + # self.bdii_opts = bdii_opts + # self.confcust = confcust + # self.custname = custname + # self.topofeed = topofeed + # self.topofetchtype = topofetchtype + # self.fixed_date = fixed_date + # self.uidservendp = uidservendp + # self.pass_extensions = pass_extensions + # self.topofeedpaging = topofeedpaging + # self.notification_flag = notiflag + + def __init__(self, config, loop): self.config = config self.loop = loop @@ -178,7 +212,7 @@ def __init__(self, config, loop): self.logger = self.config.get_logger() self.connector_name = self.config.get_connector_name() self.fixed_date = self.config.get_fixed_date() - self.globopts, self.pass_extensions = self.config.get_globopts_n_pass_ext() + self.globopts, self.pass_extensions, self.cglob = self.config.get_globopts_n_pass_ext() self.confcust = self.config.get_confcust(self.globopts) self.topofeed = self.config.topofeed_data(self.confcust) self.topofeedpaging = self.config.topofeedpaging_data(self.confcust) diff --git a/modules/tasks/vapor_weights.py b/modules/tasks/vapor_weights.py index c4f4986b..1f6380e8 100644 --- a/modules/tasks/vapor_weights.py +++ b/modules/tasks/vapor_weights.py @@ -9,17 +9,32 @@ class TaskVaporWeights(object): - def __init__(self, loop, logger, connector_name, globopts, confcust, feed, - jobcust, cglob, fixed_date): + # def __init__(self, loop, logger, connector_name, globopts, confcust, feed, + # jobcust, cglob, fixed_date): + # self.event_loop = loop + # self.logger = logger + # self.connector_name = connector_name + # self.globopts = globopts + # self.confcust = confcust + # self.feed = feed + # self.jobcust = jobcust + # self.cglob = cglob + # self.fixed_date = fixed_date + + ######################################################## + + def __init__(self, config, loop, jobcust): + self.config = config self.event_loop = loop - self.logger = logger - self.connector_name = connector_name - self.globopts = globopts - self.confcust = confcust - self.feed = feed self.jobcust = jobcust - self.cglob = cglob - self.fixed_date = fixed_date + + self.logger = config.get_logger() + self.connector_name = self.config.get_connector_name() + self.globopts, self.pass_extensions, self.cglob = self.config.get_globopts_n_pass_ext() + self.confcust = self.config.get_confcust(self.globopts) + self.feed = self.config.vaporrpi_data(self.confcust) + self.fixed_date = self.config.get_fixed_date() + async def fetch_data(self): feed_parts = urlparse(self.feed)