diff --git a/tools/bin/hawqsync-extract b/tools/bin/hawqsync-extract new file mode 100755 index 0000000000..20823865df --- /dev/null +++ b/tools/bin/hawqsync-extract @@ -0,0 +1,296 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import sys +from optparse import Option, OptionParser +from subprocess import Popen, PIPE +from hashlib import md5 +from json import loads +from time import strftime, sleep, time + +def parseargs(): + parser = OptionParser(usage="HAWQ extract options.") + parser.add_option('-v', '--verbose', action='store_true', default=False) + parser.add_option("-a", "--prompt", action="store_false", + dest="prompt", default=True, + help="Execute without prompt.") + parser.add_option("-l", "--logdir", dest="logDir", + help="Sets the directory for log files") + parser.add_option('-t', '--testMode', action='store_true', default=False, + dest='testMode', help="Execute in test mode") + parser.add_option('-d', '--destinationOnHdfs', action='store_true', + default="/hawq_default", dest='hdfsDir', + help="HDFS directory to copy resulting tarball to") + parser.add_option('-s', '--schemas', dest='schemas', default="", + help="A comma separated list of the schemas containing "+\ + "the tables to extract metadata for") + parser.add_option('-h', dest='hawqUri', default="localhost:5432/gpadmin", + help="The HAWQ master URI. e.g. localhost:5432/gpadmin") + + +def getSchemaObjectList(hawqUri, schemaList=[], isTesting=False): + """Utility function to generate a list of table objects for a list of schemas + Args: + hawqUri (str): the HAWQ master URI to use for connecting + schemaList (List): a list of schemas to find tables within + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int): Zero for success, nonzero otherwise + objectList (str) or message (str): either a list of objects or stderr output + """ + + retVal = -1 + + hawqHost = hawqUri.split(":")[0] + hawqPort, hawqDatabase = hawqUri.split(":")[-1].split("/") + + objectListSql = """ + SELECT n.nspname || '.' || c.relname + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE c.relkind IN ('r','s','') + AND c.relstorage IN ('h', 'a', 'c', 'p','') + AND n.nspname !~ '^pg_toast' + AND n.nspname ~ '^({schemas})$' + ORDER BY n.nspname, c.relname; + \q + """.format(schemas="|".join(schemaList)) + + psqlCommand = "psql -h {h} -p {p} -At {d}".format(h=hawqHost, + p=hawqPort, + d=hawqDatabase) + + + print psqlCommand + + objectList = None + stderr = None + if not isTesting: + psqlProcess = Popen(psqlCommand.split(), stdin=PIPE, + stdout=PIPE, stderr=PIPE) + + (objectList, stderr) = psqlProcess.communicate(objectListSql) + + retVal = psqlProcess.returncode + + if retVal != 0: + return retVal, stderr + + # Sample output to follow + else: + objectList = """\ + model.rsqauredCube + model.mungingScratchspace + model.postMunging + model.fancyAnalytics + development.testtable1 + development.randomtable2 + development.someOtherSweetTable + """; + + retVal = 0 + + # sample yields: 342f414e7519f8c6a9eacce94777ba08 + return retVal, objectList.split("\n") + +def saveMetadataForSchemaObjects(baseDirectory="/tmp", objectList=[], isTesting=False): + """Utility function to export table metadata in a schema to a set of files + (one per table) + Args: + baseDirectory (str): the base directory to create a tarball of + objectList (List): a list of objects to invoke hawq extract with + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, non-zero otherwise + """ + + retVal = -1 + + mkdirCommand = "mkdir -p {d}/hawqExtract".format(d=baseDirectory) + + hawqExtractCommand = "hawq extract {{o}} -o {d}/hawqExtract/{{o}}.yml".format(d=baseDirectory) + + stdout = None + stderr = None + + if isTesting: + return 0 + + mkdirProcess = Popen(mkdirCommand.split(), stdout=PIPE, stderr=PIPE) + + (stdout, stderr) = mkdirProcess.communicate() + + retVal = mkdirProcess.returncode + + if retVal != 0: + return retVal, stderr + + for obj in objectList: + + if len(obj) == 0: + continue + + thisObjectCommand = hawqExtractCommand.format(o=obj) + + print thisObjectCommand + + hawqExtractProcess = Popen(thisObjectCommand.split(), + stdout=PIPE, stderr=PIPE) + + (stdout, stderr) = hawqExtractProcess.communicate() + + rv = hawqExtractProcess.returncode + + retVal |= rv + + if rv != 0: + print rv, stderr + + return retVal + +def createTarball(baseDirectory="/tmp", targetTarball="/tmp/hawqExtract-{t}.tar.bz2", isTesting=False): + """Utility function to create a tarball of the extracted metadata + Args: + baseDirectory (str): the base directory to create a tarball of + targetTarball (str): the target directory and filename of the tarball + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise + message (str): message contains status string + checksum (str): the MD5 checksum of the created tarball, if successful, + negative one otherwise + """ + + checksum = None + + # Example invocation + # tar cjf /tmp/test.tar.bz2 -C /tmp hawqExtract + + theTime = strftime("%Y-%m-%d-%H%M") + + tarCommand = "tar -cjf {t} -C {c} hawqExtract".format(t=targetTarball.format(t=theTime), + c=baseDirectory) + + stdout = None + stderr = None + if isTesting: + return 0, "TESTING BRANCH", -1 + + try: + tarProcess = Popen(tarCommand.split(), stdout=PIPE, stderr=PIPE) + + (stdout, stderr) = tarProcess.communicate() + + except OSError as e: + return -1, str(e), -1 + + + if tarProcess.returncode != 0: + print "Tarball creation failed : " + stderr + return -1, stderr, -1 + + md5Command = "md5sum {f}".format(f=targetTarball.format(t=theTime)) + + try: + md5Process = Popen(md5Command.split(), stdout=PIPE, stderr=PIPE) + + (stdout2, stderr2) = md5Process.communicate() + + checksum = stdout2.split()[0].strip() + + if md5Process.returncode != 0: + return -1, "md5 checksum creation failed : " + stderr2, -1 + else: + return 0, targetTarball.format(t=theTime), checksum + + except OSError as e: + return -1, str(e), -1 + +def copyToHdfs(source, dest, isTesting=False): + """Utility function to copy a source file + to the destination HDFS directory/file + Args: + source (str): the source file on the local FS + dest (str): the target HDFS directory and filename + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise + message (str): message contains status string + """ + + retVal = -1 + + hdfsCommand = "/usr/bin/hdfs dfs -copyFromLocal {s} {d}".format(s=source, + d=dest) + + stdout = None + stderr = None + if not isTesting: + # Force HDFS commands to run as gpadmin user + env = os.environ.copy() + env['HADOOP_USER_NAME'] = 'gpadmin' + hdfsProcess = Popen(hdfsCommand.split(), env=env, stdout=PIPE, stderr=PIPE) + + (stdout, stderr) = hdfsProcess.communicate() + + return hdfsProcess.returncode, stderr + + else: + return 0, "TESTING" + +if __name__ == '__main__': + options, args = parseargs() + + #if options.verbose: + # enable_verbose_logging() + + # TODO - switch prints to this once using gppylibs + #logger, log_filename = setup_hawq_tool_logging('hawq_sync',getLocalHostname(),getUserName(), options.logDir) + + if options.prompt: + # TODO - switch to this once using gppylibs + #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'): + + # TODO - switch to gppylib-based logging + print "---------------------------------------------------------------" + print "" + print "This tool will extract metadata for every table in the schema list." + print "" + print "You must be gpadmin and have /usr/local/hawq/bin/psql in your PATH" + print "" + print " Would you like to continue?" + print "---------------------------------------------------------------" + answer = raw_input("y or n: ") + if "y" not in answer and "Y" not in answer: + print "Exiting." + sys.exit(1) + + retVal, objectList = getSchemaObjectList(options.hawqUri, + schemaList=options.schemas.split(","), + isTesting=False) + + retVal = saveMetadataForSchemaObjects(objectList, isTesting=False) + + retVal, filenameOrStderr, checksum = createTarball(isTesting=False) + + retVal, stderr = copyToHdfs(filenameOrStderr, "/hawq_default", isTesting=False) diff --git a/tools/bin/hawqsync-falcon b/tools/bin/hawqsync-falcon new file mode 100755 index 0000000000..8e613e6488 --- /dev/null +++ b/tools/bin/hawqsync-falcon @@ -0,0 +1,1319 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import sys +from optparse import OptionParser +from subprocess import Popen, PIPE +from hashlib import md5 +from json import loads +from time import strftime, sleep, time +from collections import defaultdict +# TODO - make use of these common HAWQ libs instead of print +#from gppylib.gplog import setup_hawq_tool_logging, enable_verbose_logging +#from gppylib.commands.unix import getLocalHostname, getUserName +try: + from xml.etree import cElementTree as ElementTree +except ImportError, e: + from xml.etree import ElementTree + +def parseargs(): + parser = OptionParser(usage="HAWQ sync options.") + parser.add_option('-v', '--verbose', action='store_true', + default=False) + parser.add_option("-a", "--prompt", action="store_false", + dest="prompt", default=True, + help="Execute without prompt.") + parser.add_option("-l", "--logdir", dest="logDir", + help="Sets the directory for log files") + parser.add_option('-d', '--dryRun', action='store_true', + default=False, + dest='testMode', help="Execute in test mode") + parser.add_option('-u', '--user', dest='userName', default="gpadmin", + help="The user to own Falcon ACLs and run job as") + parser.add_option('--maxMaps', dest='distcpMaxMaps', + default="10", + help="The maximum number of map jobs to allow") + parser.add_option('--mapBandwidth', dest='distcpMaxMBpsPerMap', + default="100", + help="The maximum allowable bandwidth for each map job, in MB/s") + parser.add_option('-s', '--sourceNamenode', dest='sourceNamenode', + default="", + help="The IP or FQDN of the source HDFS namenode") + parser.add_option('-S', '--sourceEntity', dest='sourceClusterEntityName', + default="source", + help="The Falcon cluster entity name of the source") + parser.add_option('-m', '--sourceHawqMaster', dest='sourceHawqMaster', + default="", + help="The IP or FQDN of the source HAWQ master") + parser.add_option('-M', '--targetHawqMaster', dest='targetHawqMaster', + default="", + help="The IP or FQDN of the target HAWQ master") + parser.add_option('-f', '--falconUri', dest='falconUri', + default="http://localhost:15000", + help="The URI to use for issuing Falcon REST calls") + parser.add_option('-t', '--targetNamenode', dest='targetNamenode', + default="", + help="The IP or FQDN of the source HDFS namenode") + parser.add_option('-T', '--targetEntity', dest='targetClusterEntityName', + default="target", + help="The Falcon cluster entity name of the target") + parser.add_option('-e', '--executionEntity', + dest='executionClusterEntityName', + default="source", + help="The Falcon cluster entity name specifying where to execute the job") + parser.add_option('-w', '--workflowHdfsFilename', dest='workflowFilename', + default="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml", + help="The HDFS location of the underlying Oozie workflow to use for sync job") + parser.add_option('-p', '--pathToSync', dest='pathToSync', + default="/tmp/syncTest", + help="The root directory to be syncronized") + parser.add_option('-j', '--jobName', dest='jobName', default="drSync", + help="The Falcon job entity name to be executed") + + (options, args) = parser.parse_args() + return (options, args) + +def extractFilenameAndSize(line, hdfsPort): + """Utility function to extract filename and file + size from a line of output from `hdfs dfs -ls -R` + + """ + + tokens = line.split() + return tokens[-1].split(":" + hdfsPort)[-1], tokens[4] + +def flattenFilelist(data, hdfsPort): + """Utility function to convert a list of output + lines from `hdfs dfs -ls -R` into a single, sorted, + delimited string to be used as a syncronization + fingerprint + + """ + + # Ensure record contains expected number of fields + isValid = lambda r: len(r.strip().split()) == 8 + + # Subset the records to only filename and size fields + filenameAndSize = [extractFilenameAndSize(line, hdfsPort) for line in data.split("\n") if isValid(line)] + + # Reverse sort the list by filename column + sortedFilenameAndSize = sorted(filenameAndSize, key=lambda r: r[0], reverse=True) + + # Flatten a single line into a delimited string + mergeLines = lambda l: "-".join(l) + + # Perform the flatten for every line and join lines into a string + return "\n".join(map(mergeLines, sortedFilenameAndSize)) + +def computeMd5(data): + """Utility function to compute MD5 checksum + + """ + hasher = md5() + hasher.update(data) + + return hasher.hexdigest() + +def getHdfsFingerprint(hdfsUri="", hdfsDir="/hawq_default", isTesting=False): + """Utility function to compute an MD5 + hash from the output of a recursive HDFS + directory listing + + """ + + retVal = -1 + + hdfsPort = hdfsUri.split(":")[-1] + + hdfsCommand = "hdfs dfs -ls -R {u}{d}".format(u=hdfsUri, d=hdfsDir) + #print hdfsCommand + + filelist = None + stderr = None + if not isTesting: + # Force HDFS commands to run as gpadmin user + env = os.environ.copy() + env['HADOOP_USER_NAME'] = 'gpadmin' + hdfsProcess = Popen(hdfsCommand.split(), env=env, + stdout=PIPE, stderr=PIPE) + + (filelist, stderr) = hdfsProcess.communicate() + + retVal = hdfsProcess.returncode + + if retVal != 0: + return retVal, stderr + + # Sample output to follow + else: + filelist = """ + drwx------ - gpadmin gpadmin 0 2016-07-14 12:32 hdfs://sandbox:8020/hawq_default/16385 + drwx------ - gpadmin gpadmin 0 2016-08-04 18:58 hdfs://sandbox:8020/hawq_default/16385/16387 + drwx------ - gpadmin gpadmin 0 2016-07-14 12:14 hdfs://sandbox:8020/hawq_default/16385/16387/18947 + """; + + retVal = 0 + + data = flattenFilelist(filelist, hdfsPort) + + # sample yields: 342f414e7519f8c6a9eacce94777ba08 + return retVal, computeMd5(data) + +def etree_to_dict(t): + """Utility function to turn an XML + element tree into a dictionary + + """ + + d = {t.tag: {} if t.attrib else None} + children = list(t) + if children: + dd = defaultdict(list) + for dc in map(etree_to_dict, children): + for k, v in dc.iteritems(): + dd[k].append(v) + + d[t.tag] = dict((k, v[0]) if len(v) == 1 else dict((k, v)) for k, v in dd.iteritems()) + if t.attrib: + d[t.tag].update(('@' + k, v) for k, v in t.attrib.iteritems()) + if t.text: + text = t.text.strip() + if children or t.attrib: + if text: + d[t.tag]['#text'] = text + else: + d[t.tag] = text + return d + +def xmlToDict(data): + """Wrapper function to convert + XML string into a Python dictionary + + """ + + e = ElementTree.XML(data) + return etree_to_dict(e) + +def getFalconStatus(falconUri="http://localhost:15000", entity="drSyncTest", + user="gpadmin", onlyRuntime=False, isTesting=False, + doDebug=False): + """Get the current status of an existing Falcon process/job entity + + Args: + falconUri (str): the URI for the Falcon server (e.g. http://host:port) + entity (str): the Falcon process entity name to get status for + user (str): the username used for authorization + onlyRuntime (bool): only query for process runtime (e.g. post-completion) + isTesting (bool): NOOP mode bypassing actual REST calls for testing + doDebug (bool): debugging mode for additional verbosity + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise. message can contain process start time or logfile + message (str): message can contain process end time or status string + + """ + + retVal = -1 + + # Example REST call: + # GET http://localhost:15000/api/entities/status/process/drSyncTest?user.name=falcon&fields=status,clusters,tags + + endpoint = "/api/instance/status/process/{e}?user.name={u}&fields=status".format(u=user, e=entity) + + curlCommand = "curl -X GET {0}".format(falconUri + endpoint) + + if doDebug: + print curlCommand + + stdout = None + stderr = None + if not isTesting: + curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE) + + (stdout, stderr) = curlProcess.communicate() + + retVal = curlProcess.returncode + + if doDebug: + print "stdout:", stdout, "stderr:", stderr + + try: + result = loads(stdout) + if 'instances' not in result: + print "No instance was started, try deleting the job and re-running" + return -1, "stdout: {0}, stderr: {1}".format(stdout, stderr) + if onlyRuntime: + # Parse the start/end times in JSON result from cURL + resultPayload = result['instances'][0] + return resultPayload['startTime'], resultPayload['endTime'] + else: + # Parse the logfile/status in JSON result from cURL + resultPayload = result['instances'][0] #['actions'][0] + return resultPayload['logFile'], resultPayload['status'] + except KeyError as e: + print "KeyError in getFalconStatus()", str(e), "\n", stdout + return -1, str(e) + except ValueError as e: + print "ValueError in getFalconStatus()", str(e), "\n", stdout + print "Is Falcon running at : {} ?".format(falconUri) + return -1, str(e) + + # Example output follows: + else: + stdout = """{ + "status":"SUCCEEDED", + "message":"default/STATUS\n", + "requestId":"default/1436392466@qtp-1730704097-152 - 8dd8f7fa-2024-4bdb-a048-c188759c2f47\n", + "instances":[{ + "instance":"2016-08-17T13:22Z", + "status":"SUSPENDED", + "logFile":"http://sandbox2.hortonworks.com:11000/oozie?job=0000014-160805194358788-oozie-oozi-W", + "cluster":"secondaryIDP", + "startTime":"2016-08-17T12:25:23-07:00", + "details":"", + "actions":[{ + "action":"user-action", + "status":"RUNNING" + }, { + "action":"failed-post-processing", + "status":"RUNNING", + "logFile":"http://sandbox2.hortonworks.com:8088/proxy/application_1470437437449_0002/" + }] + }] + }""".replace("\n", ""); + + return 0, loads(stdout)['instances'][0]['actions'][0] + +def doFalconSchedule(falconUri="http://localhost:15000", jobName="drSyncTest", + userName="gpadmin", isTesting=False): + """Schedule an existing Falcon process/job entity for execution + + Args: + falconUri (str): the URI for the Falcon server (e.g. http://host:port) + jobName (str): the Falcon process entity name to get status for + userName (str): the username used for authorization + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int): Zero for success, negative one otherwise + message (str): message status string (e.g. SUCCEEDED) + + """ + + retVal = -1 + + # Example REST call: + # "Content-Type:text/xml" + # POST http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryRun=false + + endpoint = "/api/entities/schedule/process/{n}?user.name={u}&skipDryRun=true".format(n=jobName, u=userName) + + curlCommand = "curl -H Content-Type:text/xml -X POST {0}".format(falconUri + endpoint) + + stdout = None + stderr = None + if not isTesting: + curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE) + + (stdout, stderr) = curlProcess.communicate() + + retVal = curlProcess.returncode + + try: + # Parse the XML result from cURL into a dictionary + resultPayload = xmlToDict(stdout)['result'] + return resultPayload['status'], resultPayload['message'] + except KeyError: + print "Parse error in getFalconSchedule()", stdout + return -1, stdout + except: + print "Parse error in getFalconSchedule()", stdout + return -1, stdout + + # Example output follows: + else: + stdout = """ + + SUCCEEDED + default/drSyncTest(process) scheduled successfully + default/2028387903@qtp-1730704097-6 - 89554f01-91cf-4bbd-97c2-ee175711b2ba + + """.replace("\n", ""); + + return 0, xmlToDict(stdout)['result']['status'] + +# Falcon process entity template used to create/update job attributes +drSyncTemplate=""" + + _falcon_mirroring_type=HDFS + + + + + + 1 + LAST_ONLY + days(7) + GMT{gmtOffset} + + + + + + + + + + + + + + + + + + +""" + +def doFalconSubmit(falconUri="http://localhost:15000", jobParameters=None, + isTesting=False): + """Submit/create a Falcon process/job entity + + Args: + falconUri (str): the URI for the Falcon server (e.g. http://host:port) + jobParameters (dict): a dictionary containing process entity configuration + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int): Zero for success, negative one otherwise + message (str): message status string (e.g. SUCCEEDED) + + """ + + retVal = -1 + + if jobParameters is None: + return retVal, "You must provide a job parameters dictionary" + + # Example REST call: + # "Content-Type:text/xml" + # POST http://localhost:15000/api/entities/submit/process?user.name=falcon + + endpoint = "/api/entities/submit/process?user.name={u}".format(u=jobParameters['userName']) + + thisMinute = int(strftime("%M")) + thisYear = int(strftime("%Y")) + gmtOffset = strftime("%z") + + oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1)) + oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1)) + + # TODO long term would be to encapsulate Falcon functions in a class structure + # -- (i.e. full Python Falcon abstraction/API) + # -- Targeting S3 or Azure Blob will require a different template + payload = drSyncTemplate.format(startTime=oneMinuteLater, + endTime=oneYearLater, + gmtOffset=gmtOffset, + distcpMaxMaps=jobParameters['distcpMaxMaps'], + distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'], + sourceClusterEntityName=jobParameters['sourceClusterEntityName'], + sourceHdfsUri=jobParameters['sourceHdfsUri'], + targetClusterEntityName=jobParameters['targetClusterEntityName'], + targetHdfsUri=jobParameters['targetHdfsUri'], + executionClusterEntityName=jobParameters['executionClusterEntityName'], + workflowFilename=jobParameters['workflowFilename'], + pathToSync=jobParameters['pathToSync'], + name=jobParameters['jobName'], + userName=jobParameters['userName']) + + curlCommand = "curl -X GET {0}".format(falconUri + endpoint) + + stdout = None + stderr = None + if not isTesting: + curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE) + + (stdout, stderr) = curlProcess.communicate() + + retVal = curlProcess.returncode + + try: + # Parse the XML result from cURL into a dictionary + resultPayload = xmlToDict(stdout)['result'] + return resultPayload['status'], resultPayload['message'] + except ElementTree.ParseError: + print "Parse error in getFalconSchedule()", stdout + return -1, stdout + + # Example output follows: + else: + stdout = """ + + SUCCEEDED + falcon/default/Submit successful (process) drSyncTest + falcon/default/2028387903@qtp-1730704097-6 - 7ddba052-527b-462f-823f-e7dd0a1a08fa + + """.replace("\n", ""); + + return 0, xmlToDict(stdout)['result']['status'] + +def doFalconSoar(falconUri="http://localhost:15000", jobParameters=None, + isTesting=False): + """Update, schedule, and monitor a Falcon process/job entity + + Args: + falconUri (str): the URI for the Falcon server (e.g. http://host:port) + jobParameters (dict): a dictionary containing process entity configuration + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int): Zero for success, negative one otherwise + message (str): message status string (e.g. SUCCEEDED) + + """ + + retVal = -1 + + if jobParameters is None: + return retVal, "You must provide a job parameters dictionary" + + # Example REST call: + # "Content-Type:text/xml" + # POST http://localhost:15000/api/entities/update/process/drSyncTest?user.name=falcon + + endpoint = "/api/entities/update/process/{n}?user.name={u}".format(n=jobParameters['jobName'], + u=jobParameters['userName']) + + thisMinute = int(strftime("%M")) + thisYear = int(strftime("%Y")) + gmtOffset = strftime("%z")[:3] + ":00" + + oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1)) + oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1)) + + print "Scheduling for", oneMinuteLater + print "Ending on", oneYearLater + + # TODO encapsulate everything in a class structure + # -- (i.e. full Python Falcon abstraction/API) + # -- Targeting AWS S3 or Azure Blob will require a different template + payload = drSyncTemplate.format(startTime=oneMinuteLater, + endTime=oneYearLater, + gmtOffset=gmtOffset, + distcpMaxMaps=jobParameters['distcpMaxMaps'], + distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'], + sourceClusterEntityName=jobParameters['sourceClusterEntityName'], + sourceHdfsUri=jobParameters['sourceHdfsUri'], + targetClusterEntityName=jobParameters['targetClusterEntityName'], + targetHdfsUri=jobParameters['targetHdfsUri'], + executionClusterEntityName=jobParameters['executionClusterEntityName'], + workflowFilename=jobParameters['workflowFilename'], + pathToSync=jobParameters['pathToSync'], + name=jobParameters['jobName'], + userName=jobParameters['userName']) + + # TODO output for debug level + #from pprint import pprint + #pprint (payload) + + curlCommand = "curl -H Content-Type:text/xml -X POST {uri} -d ".format(uri=falconUri + endpoint) + + stdout = None + stderr = None + if isTesting: + # Example output follows: + stdout = """ + + SUCCEEDED + falcon/update/default/Updated successfully + falcon/update/default/868391317@qtp-1730704097-47 - b2391bd7-3ae0-468e-b39c-5d002099a446 + + """.replace("\n", ""); + + # Parse the XML result from cURL into a dictionary + + return 0, xmlToDict(stdout)['result']['status'] + + # Note, needed to seperate out the payload as it can't be split on spaces + curlProcess = Popen(curlCommand.split() + [payload], + stdout=PIPE, stderr=PIPE) + + (stdout, stderr) = curlProcess.communicate() + + retVal = curlProcess.returncode + + # Curl process did not complete successfully + if retVal != 0: + print "Curl command failed" + return retVal, stderr + + try: + # Parse the XML result from cURL into a dictionary + result = xmlToDict(stdout)['result'] + stderr = result['message'] + + # Falcon REST update operation NOT successful + if "SUCCEEDED" not in result['status']: + print "Falcon REST operation not successful" + return result['status'], stderr + + # We should doFalconSchedule() here + status, message = doFalconSchedule(falconUri=falconUri, + jobName=jobParameters['jobName'], + isTesting=False) + + # Scheduling failed + if "SUCCEEDED" not in status: + return -1, message + + # Reset retVal to catch error between scheduled and running states + retVal = -1 + sleep(5) + + message, status = getFalconStatus(falconUri=falconUri, + entity=jobParameters['jobName']) + + # Continuously poll for hdfs-mirroring status + while "RUNNING" in status: + message, status = getFalconStatus(falconUri=falconUri, + entity=jobParameters['jobName']) + print status + + # flag RUNNING state reached using retVal + retVal = 0 + sleep(10) + + if status == "KILLED": + return -1, message + + # Poll one last time for runtimes + start, finish = getFalconStatus(falconUri=falconUri, + entity=jobParameters['jobName'], + onlyRuntime=True) + + return retVal, "logfile: {0} started: {1} finished: {2}".format(message, start, finish) + + except KeyError: + print "Are you using the correct Falcon server URI?", falconUri + return -1, stdout + + +def hdfsFingerprintsMatch(reference, comparison): + """Helper function to compare two fingerprints / md5 hashes + + Args: + reference (str): the reference MD5 checksum string + comparison (str): the comparison MD5 checksum string + + Returns: + isEqual (bool): True for success, False otherwise + + """ + + return reference == comparison + + +def stopHawq(masterHost=None, isTesting=False): + """Optionally connect to a remote HAWQ master + and do a quick stop of the HAWQ master process + + Args: + masterHost (str): the remote host to SSH to first, otherwise localhost + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise + message (str): message contains a stderr string + + """ + + retVal = -1 + + hawqStopCommand = "hawq stop master -a -M fast" + + if masterHost is not None: + hawqStopCommand = "ssh {h} -- '{c}'".format(h=masterHost, + c=hawqStopCommand) + + stdout = None + stderr = None + if not isTesting: + hawqStopProcess = Popen(hawqStopCommand, + stdout=PIPE, stderr=PIPE, shell=True) + + (stdout, stderr) = hawqStopProcess.communicate() + + return hawqStopProcess.returncode, stderr + + else: + return 0, "TEST MODE" + +def startHawq(masterHost=None, isTesting=False): + """Optionally connect to a remote HAWQ master + and do a start of the HAWQ master process + + Args: + masterHost (str): the remote host to SSH to first, otherwise localhost + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise + message (str): message contains a stderr string + + """ + + retVal = -1 + + hawqStartCommand = "hawq start master -a" + + if masterHost is not None: + hawqStartCommand = "ssh {h} -- '{c}'".format(h=masterHost, + c=hawqStartCommand) + + stdout = None + stderr = None + if not isTesting: + hawqStartProcess = Popen(hawqStartCommand, + stdout=PIPE, stderr=PIPE, shell=True) + + (stdout, stderr) = hawqStartProcess.communicate() + + return hawqStartProcess.returncode, stderr + + else: + return 0, "TEST MODE" + + +def copyToHdfs(source, dest, isTesting=False): + """Utility function to copy a source file + to the destination HDFS directory/file + + Args: + source (str): the source file on the local FS + dest (str): the target HDFS directory and filename + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise + message (str): message contains status string + + """ + + retVal = -1 + + hdfsCommand = "/usr/bin/hdfs dfs -copyFromLocal {s} {d}".format(s=source, + d=dest) + + stdout = None + stderr = None + if not isTesting: + # Force HDFS commands to run as hdfs user + env = os.environ.copy() + env['HADOOP_USER_NAME'] = 'gpadmin' + hdfsProcess = Popen(hdfsCommand.split(), env=env, + stdout=PIPE, stderr=PIPE) + + (stdout, stderr) = hdfsProcess.communicate() + + return hdfsProcess.returncode, stderr + + else: + return 0, "TESTING" + +def checkHdfsSafemode(namenodeHost=None, isTesting=False): + """Utility function to query HDFS for + safemode enabled or disabled + + Args: + namenodeHost (str): the remote host to SSH to first, otherwise localhost + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise + message (str): message contains status string + + """ + + retVal = -1 + + hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode get" + + if namenodeHost is not None: + hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost, + c=hdfsCommand) + + stdout = None + stderr = None + if not isTesting: + # Force HDFS commands to run as hdfs user + env = os.environ.copy() + env['HADOOP_USER_NAME'] = 'hdfs' + hdfsProcess = Popen(hdfsCommand, env=env, + stdout=PIPE, stderr=PIPE, shell=True) + + (stdout, stderr) = hdfsProcess.communicate() + + try: + offOrOn = True if "ON" in stdout.split()[-1] else False + except IndexError as e: + return -1, str(e) + + return hdfsProcess.returncode, offOrOn + + else: + return 0, "TESTING" + +def enableHdfsSafemode(namenodeHost=None, isTesting=False): + """Utility function to enable safemode + + Args: + namenodeHost (str): the remote host to SSH to first, otherwise localhost + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise + message (str): message contains status string + + """ + + retVal = -1 + + hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode enter" + + if namenodeHost is not None: + hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost, + c=hdfsCommand) + + stdout = None + stderr = None + if not isTesting: + # Force HDFS commands to run as hdfs user + env = os.environ.copy() + env['HADOOP_USER_NAME'] = 'hdfs' + hdfsProcess = Popen(hdfsCommand, env=env, + stdout=PIPE, stderr=PIPE, shell=True) + + (stdout, stderr) = hdfsProcess.communicate() + + return hdfsProcess.returncode, stderr + + else: + return 0, "TESTING" + +def disableHdfsSafemode(namenodeHost=None, isTesting=False): + """Utility function to disable safemode + + Args: + namenodeHost (str): the remote host to SSH to first, otherwise localhost + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise + message (str): message contains status string + + """ + + retVal = -1 + + hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode leave" + + if namenodeHost is not None: + hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost, + c=hdfsCommand) + + stdout = None + stderr = None + if not isTesting: + # Force HDFS commands to run as hdfs user + env = os.environ.copy() + env['HADOOP_USER_NAME'] = 'hdfs' + hdfsProcess = Popen(hdfsCommand, env=env, + stdout=PIPE, stderr=PIPE, shell=True) + + (stdout, stderr) = hdfsProcess.communicate() + + return hdfsProcess.returncode, stderr + + else: + return 0, "TESTING" + +def forceHdfsCheckpoint(namenodeHost=None, isTesting=False): + """Utility function to force an HDFS checkpoint + + Args: + namenodeHost (str): the remote host to SSH to first, otherwise localhost + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise + message (str): message contains status string + + """ + + retVal = -1 + + hdfsCommand = "/usr/bin/hdfs dfsadmin -saveNamespace" + + if namenodeHost is not None: + hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost, + c=hdfsCommand) + + stdout = None + stderr = None + if not isTesting: + # Force HDFS commands to run as hdfs user + env = os.environ.copy() + env['HADOOP_USER_NAME'] = 'hdfs' + hdfsProcess = Popen(hdfsCommand, env=env, + stdout=PIPE, stderr=PIPE, shell=True) + + (stdout, stderr) = hdfsProcess.communicate() + + return hdfsProcess.returncode, stderr + + else: + return 0, "TESTING" + + +def createTarball(masterDataBase="/data/hawq/", + targetTarball="/tmp/hawqMdd-{t}.tar", isTesting=False): + """Utility function to create a tarball of the HAWQ MASTER_DATA_DIRECTORY + + Args: + masterDataBase (str): the base directory containing the MASTER_DATA_DIRECTORY + targetTarball (str): the target directory and filename of the tarball + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise + message (str): message contains status string + + """ + + retVal = -1 + checksum = None + + # Example invocation (note: excluding most of pg_log contents) + # tar cpf /tmp/test.tar.bz2 --exclude=csv -C /data/hawq master + + theTime = strftime("%Y-%m-%d-%H%M") + + tarCommand = "tar -cpf {t} --exclude=csv -C {c} master".format(t=targetTarball.format(t=theTime), + c=masterDataBase) + + stdout = None + stderr = None + if not isTesting: + + try: + tarProcess = Popen(tarCommand.split(), stdout=PIPE, stderr=PIPE) + + (stdout, stderr) = tarProcess.communicate() + + except OSError as e: + return -1, str(e), -1 + + if tarProcess.returncode != 0: + print "Tarball creation failed : " + stderr + return -1, stderr, -1 + + md5Command = "md5sum {f}".format(f=targetTarball.format(t=theTime)) + + try: + md5Process = Popen(md5Command.split(), + stdout=PIPE, stderr=PIPE) + + (stdout2, stderr2) = md5Process.communicate() + + checksum = stdout2.split()[0].strip() + + if md5Process.returncode != 0: + return -1, "md5 checksum creation failed : " + stderr2, -1 + else: + return 0, targetTarball.format(t=theTime), checksum + + except OSError as e: + return -1, str(e), -1 + + + else: + return 0, "TEST BRANCH", -1 + +def cleanupTarball(filename, isTesting=False): + """Utility function to delete a tarball of the HAWQ MASTER_DATA_DIRECTORY + + Args: + filename (str): the target directory and filename of the tarball to clean up + isTesting (bool): NOOP mode bypassing actual REST calls for testing + + Returns: + retVal (int) or message (str): Zero for success, negative one otherwise + message (str): message contains status string + + """ + + retVal = -1 + checksum = None + + # Example invocation (note: excluding most of pg_log contents) + # rm -f /tmp/test.tar + + rmCommand = "rm -f {f}".format(f=filename) + + stdout = None + stderr = None + if not isTesting: + + try: + rmProcess = Popen(rmCommand.split(), stdout=PIPE, stderr=PIPE) + + (stdout, stderr) = rmProcess.communicate() + + retVal = rmProcess.returncode + + return retVal, stderr + + except OSError as e: + return -1, str(e) + else: + return 0, "TEST BRANCH" + +if __name__ == '__main__': + options, args = parseargs() + + #if options.verbose: + # enable_verbose_logging() + + # TODO - switch prints to this once using gppylibs + #logger, log_filename = setup_hawq_tool_logging('hawq_sync',getLocalHostname(),getUserName(), options.logDir) + + + # ### HAWQ Extract every non-system table (source) + # Note: the asusmption is this has been done in + # advance of executing this tool. + if options.prompt: + # TODO - switch to this once using gppylibs + #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'): + + # TODO - switch to gppylib-based logging + print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + print "" + print "Please confirm you've performed hawq_extract() on all critical data tables" + print "and saved this information outside of the cluster (e.g. version control)" + print "or are using Falcon with an atomic option (i.e. in HDP-2.5: snapshot-based replication)" + print "" + print "This is critical for data recovery if a sync operation partially completes!" + print "" + print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + answer = raw_input("y or n: ") + if "y" not in answer and "Y" not in answer: + print "Exiting." + sys.exit(1) + + # ### Falcon cluster entities + # Note: the assumption is both source and target + # cluster entities have alredy been created in Falcon + # TODO add a confirmation step, later a REST call to check + if options.prompt: + # TODO - switch to this once using gppylibs + #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'): + + # TODO - switch to gppylib-based logging + print "" + print "Please confirm you've created both source and target Falcon cluster entities:" + print "" + answer = raw_input("y or n: ") + if "y" not in answer and "Y" not in answer: + print "Exiting." + sys.exit(1) + + # ### Stop HAWQ + # + # TODO?: switch to Ambari REST, followed by pkill -5 <> + + # Source + earlier = int(time()) + print "Stopping source HAWQ" if options.verbose else None; + retVal, stderr = stopHawq(isTesting=options.testMode) + print retVal, stderr if options.verbose else None; + + if retVal != 0: + print "Failed to stop source HAWQ master" + print "Error message was " + stderr + print "Exiting" + sys.exit(1) + + # Target + # TODO - either use Ambari REST call or use SSH + print "Stopping target HAWQ" if options.verbose else None; + retVal, stderr = stopHawq(masterHost=options.targetHawqMaster, + isTesting=options.testMode) + print retVal, stderr if options.verbose else None; + + if retVal != 0: + print "Failed to stop target HAWQ master" + print "Error message was " + stderr + print "Restarting source HAWQ" if options.verbose else None; + retVal, stderr = startHawq(isTesting=options.testMode) + later = int(time()) + + if retVal == 0: + print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None; + else: + print "Source HAWQ failed to restart after pre-sync failure." + print "Exiting" + sys.exit(1) + + # ### Create HAWQ Master Data Directory archive + print "Creating MDD tarball" if options.verbose else None; + retVal, filenameOrStderr, md5sum = createTarball(masterDataBase="/data/hawq/", + isTesting=options.testMode) + print retVal, filenameOrStderr, md5sum if options.verbose else None; + + if retVal != 0: + print "Failed to create archive of source HAWQ MASTER_DATA_DIRECTORY" + print "Error message was : " + filenameOrStderr + print "Cleaning up MDD tarball on local FS" + print cleanupTarball(filenameOrStderr, isTesting=options.testMode) + retVal, stderr = startHawq(isTesting=options.testMode) + later = int(time()) + if retVal == 0: + print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None; + else: + print "Source HAWQ failed to restart after pre-sync failure." + print "Exiting." + sys.exit(1) + + # ### Start HAWQ + print "Starting source HAWQ" if options.verbose else None; + retVal, stderr = startHawq(isTesting=options.testMode) + later = int(time()) + + if retVal != 0: + print "Failed to start source HAWQ master" + print "Error message was " + stderr + print "Cleaning up MDD tarball on local FS" + print cleanupTarball(filenameOrStderr, isTesting=options.testMode) + print "Exiting" + sys.exit(1) + + print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None; + + # TODO add a CLI flag to force source into read-write + if checkHdfsSafemode()[1] == True: + print "Source cluster HDFS is read-only, cannot proceed" + print "Cleaning up MDD tarball on local FS" + print cleanupTarball(filenameOrStderr, isTesting=options.testMode) + sys.exit(1) + + # ### Copy MDD archive to HDFS + print "Copying MDD tarball to HDFS" if options.verbose else None; + retVal, stderr = copyToHdfs(source=filenameOrStderr, + dest=options.pathToSync, + isTesting=options.testMode) + + if retVal != 0: + print "Failed to copy MDD tarball to HDFS" + print "Error message was " + stderr + print "Cleaning up MDD tarball on local FS" + print cleanupTarball(filenameOrStderr, isTesting=options.testMode) + print "Exiting" + sys.exit(1) + + # ### Cleanup MDD archive from /tmp + print "Cleaning up MDD tarball on local FS" if options.verbose else None; + retVal, stderr = cleanupTarball(filenameOrStderr, + isTesting=options.testMode) + + if retVal != 0: + print "Failed to clean up MDD tarball" + print "Error message was " + stderr + print "" + print "You will need to manually remove the following file" + print filenameOrStderr + print "" + print "Exiting" + sys.exit(1) + + """ + # ### Force HDFS checkpoint and enable safemode on source + print "Enabling HDFS safemode on source cluster" if options.verbose else None; + retVal, stderr = enableHdfsSafemode(isTesting=options.testMode) + + if retVal != 0: + print "Failed to enable HDFS safemode on source cluster" + print "Error message was " + stderr + print "Exiting" + sys.exit(1) + + print "Forcing HDFS checkpoint on source cluster" if options.verbose else None; + retVal, stderr = forceHdfsCheckpoint(isTesting=options.testMode) + + if retVal != 0: + print "Failed to force HDFS checkpoint on source cluster" + print "Error message was " + stderr + print "Exiting" + sys.exit(1) + """; + + # ### Leave safemode on target HDFS + print "Disabling HDFS safemode on target" if options.verbose else None; + retVal, stderr = disableHdfsSafemode(namenodeHost=options.targetNamenode, + isTesting=options.testMode) + + if retVal != 0: + print "Failed to leave HDFS safemode on target cluster" + print "Error message was " + stderr + print "Exiting" + sys.exit(1) + + # Note, the entity names refer to Falcon + # entities that have been created prior + # to execution of this tool + """ + jobParameters = dict(userName="gpadmin", + distcpMaxMaps="100", + distcpMaxMBpsPerMap="1000", + sourceClusterEntityName="sourceCluster", + sourceHdfsUri="hdfs://{0}:8020".format(sourceNamenode), + targetClusterEntityName="targetCluster", + txMapsargetHdfsUri="hdfs://{0}:8020".format(targetNamenode), + executionClusterEntityName="sourceCluster", + workflowFilename="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml", + pathToSync="/tmp/syncTest", + jobName="drSync") + """; + + jobParameters = dict(userName=options.userName, + distcpMaxMaps=options.distcpMaxMaps, + distcpMaxMBpsPerMap=options.distcpMaxMBpsPerMap, + sourceClusterEntityName=options.sourceClusterEntityName, + sourceHdfsUri="hdfs://{0}:8020".format(options.sourceNamenode), + targetClusterEntityName=options.targetClusterEntityName, + targetHdfsUri="hdfs://{0}:8020".format(options.targetNamenode), + executionClusterEntityName=options.executionClusterEntityName, + workflowFilename=options.workflowFilename, + pathToSync=options.pathToSync, + jobName=options.jobName) + + print jobParameters if options.verbose else None; + + # ### Update and Schedule Job - monitor until completion + print "Falcon Soar" if options.verbose else None; + retVal, stderr = doFalconSoar(falconUri=options.falconUri, + jobParameters=jobParameters, + isTesting=options.testMode) + falconOutput = stderr + + if retVal != 0: + print "Falcon replication job failed" + print "Error message was " + stderr + print "Source cluster will be left in safemode for remediation" + print "Exiting" + sys.exit(1) + + # ### Leave safemode on source HDFS + print "Disable HDFS safemode on source cluster" if options.verbose else None; + retVal, stderr = disableHdfsSafemode(isTesting=options.testMode) + + if retVal != 0: + print "Failed to leave HDFS safemode on source cluster" + print "Error message was " + stderr + print "Exiting" + sys.exit(1) + + # ### Force HDFS checkpoint and enable safemode on target + print "Enabling HDFS safemode on target cluster" if options.verbose else None + retVal, stderr = enableHdfsSafemode(namenodeHost=options.targetNamenode, isTesting=options.testMode) + + if retVal != 0: + print "Failed to enable HDFS safemode on target cluster" + print "Error message was " + stderr + print "Exiting" + sys.exit(1) + + print "Forcing HDFS checkpoint on target cluster" if options.verbose else None + retVal, stderr = forceHdfsCheckpoint(namenodeHost=options.targetNamenode, isTesting=options.testMode) + + if retVal != 0: + print "Failed to force HDFS checkpoint on target cluster" + print "Error message was " + stderr + print "Exiting" + sys.exit(1) + + # ### HDFS Fingerprint comparison + + print "Validating HDFS fingerprints match between source and target clusters" if options.verbose else None + + retVal, md5OrStderr = getHdfsFingerprint(hdfsUri=jobParameters['sourceHdfsUri'], + hdfsDir=jobParameters['pathToSync'], + isTesting=options.testMode) + + if retVal != 0: + print "Failed to generate HDFS fingerprint on source cluster" + print "Error message was " + md5OrStderr + print "Exiting" + sys.exit(1) + + retVal, md5OrStderr2 = getHdfsFingerprint(hdfsUri=jobParameters['targetHdfsUri'], + hdfsDir=jobParameters['pathToSync'], + isTesting=options.testMode) + + if retVal != 0: + print "Failed to generate HDFS fingerprint on target cluster" + print "Error message was " + md5OrStderr2 + print "Exiting" + sys.exit(1) + + isValidSync = hdfsFingerprintsMatch(md5OrStderr, md5OrStderr2) + + if not isValidSync: + print "Source and target cluster HDFS fingerprints do not match." + print "Source checksum : " + md5OrStderr + print "Target checksum : " + md5OrStderr2 + print "This is bad, please check Falcon sync logs : " + falconOutput + sys.exit(1) + else: + print "Source and target HDFS fingerprints match." + + # Target + # TODO - either use Ambari REST call or use SSH + print "Starting target HAWQ" if options.verbose else None; + retVal, stderr = startHawq(masterHost=options.targetHawqMaster, + isTesting=options.testMode) + print retVal, stderr if options.verbose else None; + + if retVal != 0: + print "Failed to start target HAWQ master" + print "Error message was " + stderr + print "Exiting" + sys.exit(1) + else: + print "HAWQ sync completed successfully!" + print """ + ## Manual runbook during DR event + 1. Copy MDD archive from HDFS to target master (CLI) + 2. Restore archive in /data/hawq/ (CLI) + 3. Disable HDFS safemode (CLI) + 4. Start HAWQ (CLI) + 5. Update hawq_segment_configuration (psql) + 6. Deploy a HAWQ standby master in Ambari + """; + sys.exit(0) + diff --git a/tools/doc/gpconfigs/hawqsync-falcon-workflow.xml b/tools/doc/gpconfigs/hawqsync-falcon-workflow.xml new file mode 100644 index 0000000000..084b9b8dab --- /dev/null +++ b/tools/doc/gpconfigs/hawqsync-falcon-workflow.xml @@ -0,0 +1,45 @@ + + + + _falcon_mirroring_type=HDFS + + + + + + 1 + LAST_ONLY + months(1) + GMT-07:00 + + + + + + + + + + + + + + + + diff --git a/tools/doc/gpconfigs/hawqsync-oozie-workflow.xml b/tools/doc/gpconfigs/hawqsync-oozie-workflow.xml new file mode 100644 index 0000000000..8af4de8531 --- /dev/null +++ b/tools/doc/gpconfigs/hawqsync-oozie-workflow.xml @@ -0,0 +1,55 @@ + + + + + + + ${jobTracker} + ${nameNode} + + + mapred.job.priority + ${jobPriority} + + + mapred.job.queue.name + ${queueName} + + + -update + -delete + -m + ${distcpMaxMaps} + -bandwidth + ${distcpMapBandwidth} + -strategy + dynamic + ${drSourceClusterFS}${drSourceDir} + ${drTargetClusterFS}${drTargetDir} + + + + + + + Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + +