diff --git a/tools/bin/hawqsync-extract b/tools/bin/hawqsync-extract
new file mode 100755
index 0000000000..20823865df
--- /dev/null
+++ b/tools/bin/hawqsync-extract
@@ -0,0 +1,296 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from optparse import Option, OptionParser
+from subprocess import Popen, PIPE
+from hashlib import md5
+from json import loads
+from time import strftime, sleep, time
+
+def parseargs():
+ parser = OptionParser(usage="HAWQ extract options.")
+ parser.add_option('-v', '--verbose', action='store_true', default=False)
+ parser.add_option("-a", "--prompt", action="store_false",
+ dest="prompt", default=True,
+ help="Execute without prompt.")
+ parser.add_option("-l", "--logdir", dest="logDir",
+ help="Sets the directory for log files")
+ parser.add_option('-t', '--testMode', action='store_true', default=False,
+ dest='testMode', help="Execute in test mode")
+ parser.add_option('-d', '--destinationOnHdfs', action='store_true',
+ default="/hawq_default", dest='hdfsDir',
+ help="HDFS directory to copy resulting tarball to")
+ parser.add_option('-s', '--schemas', dest='schemas', default="",
+ help="A comma separated list of the schemas containing "+\
+ "the tables to extract metadata for")
+ parser.add_option('-h', dest='hawqUri', default="localhost:5432/gpadmin",
+ help="The HAWQ master URI. e.g. localhost:5432/gpadmin")
+
+
+def getSchemaObjectList(hawqUri, schemaList=[], isTesting=False):
+ """Utility function to generate a list of table objects for a list of schemas
+ Args:
+ hawqUri (str): the HAWQ master URI to use for connecting
+ schemaList (List): a list of schemas to find tables within
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int): Zero for success, nonzero otherwise
+ objectList (str) or message (str): either a list of objects or stderr output
+ """
+
+ retVal = -1
+
+ hawqHost = hawqUri.split(":")[0]
+ hawqPort, hawqDatabase = hawqUri.split(":")[-1].split("/")
+
+ objectListSql = """
+ SELECT n.nspname || '.' || c.relname
+ FROM pg_catalog.pg_class c
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
+ WHERE c.relkind IN ('r','s','')
+ AND c.relstorage IN ('h', 'a', 'c', 'p','')
+ AND n.nspname !~ '^pg_toast'
+ AND n.nspname ~ '^({schemas})$'
+ ORDER BY n.nspname, c.relname;
+ \q
+ """.format(schemas="|".join(schemaList))
+
+ psqlCommand = "psql -h {h} -p {p} -At {d}".format(h=hawqHost,
+ p=hawqPort,
+ d=hawqDatabase)
+
+
+ print psqlCommand
+
+ objectList = None
+ stderr = None
+ if not isTesting:
+ psqlProcess = Popen(psqlCommand.split(), stdin=PIPE,
+ stdout=PIPE, stderr=PIPE)
+
+ (objectList, stderr) = psqlProcess.communicate(objectListSql)
+
+ retVal = psqlProcess.returncode
+
+ if retVal != 0:
+ return retVal, stderr
+
+ # Sample output to follow
+ else:
+ objectList = """\
+ model.rsqauredCube
+ model.mungingScratchspace
+ model.postMunging
+ model.fancyAnalytics
+ development.testtable1
+ development.randomtable2
+ development.someOtherSweetTable
+ """;
+
+ retVal = 0
+
+ # sample yields: 342f414e7519f8c6a9eacce94777ba08
+ return retVal, objectList.split("\n")
+
+def saveMetadataForSchemaObjects(baseDirectory="/tmp", objectList=[], isTesting=False):
+ """Utility function to export table metadata in a schema to a set of files
+ (one per table)
+ Args:
+ baseDirectory (str): the base directory to create a tarball of
+ objectList (List): a list of objects to invoke hawq extract with
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, non-zero otherwise
+ """
+
+ retVal = -1
+
+ mkdirCommand = "mkdir -p {d}/hawqExtract".format(d=baseDirectory)
+
+ hawqExtractCommand = "hawq extract {{o}} -o {d}/hawqExtract/{{o}}.yml".format(d=baseDirectory)
+
+ stdout = None
+ stderr = None
+
+ if isTesting:
+ return 0
+
+ mkdirProcess = Popen(mkdirCommand.split(), stdout=PIPE, stderr=PIPE)
+
+ (stdout, stderr) = mkdirProcess.communicate()
+
+ retVal = mkdirProcess.returncode
+
+ if retVal != 0:
+ return retVal, stderr
+
+ for obj in objectList:
+
+ if len(obj) == 0:
+ continue
+
+ thisObjectCommand = hawqExtractCommand.format(o=obj)
+
+ print thisObjectCommand
+
+ hawqExtractProcess = Popen(thisObjectCommand.split(),
+ stdout=PIPE, stderr=PIPE)
+
+ (stdout, stderr) = hawqExtractProcess.communicate()
+
+ rv = hawqExtractProcess.returncode
+
+ retVal |= rv
+
+ if rv != 0:
+ print rv, stderr
+
+ return retVal
+
+def createTarball(baseDirectory="/tmp", targetTarball="/tmp/hawqExtract-{t}.tar.bz2", isTesting=False):
+ """Utility function to create a tarball of the extracted metadata
+ Args:
+ baseDirectory (str): the base directory to create a tarball of
+ targetTarball (str): the target directory and filename of the tarball
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise
+ message (str): message contains status string
+ checksum (str): the MD5 checksum of the created tarball, if successful,
+ negative one otherwise
+ """
+
+ checksum = None
+
+ # Example invocation
+ # tar cjf /tmp/test.tar.bz2 -C /tmp hawqExtract
+
+ theTime = strftime("%Y-%m-%d-%H%M")
+
+ tarCommand = "tar -cjf {t} -C {c} hawqExtract".format(t=targetTarball.format(t=theTime),
+ c=baseDirectory)
+
+ stdout = None
+ stderr = None
+ if isTesting:
+ return 0, "TESTING BRANCH", -1
+
+ try:
+ tarProcess = Popen(tarCommand.split(), stdout=PIPE, stderr=PIPE)
+
+ (stdout, stderr) = tarProcess.communicate()
+
+ except OSError as e:
+ return -1, str(e), -1
+
+
+ if tarProcess.returncode != 0:
+ print "Tarball creation failed : " + stderr
+ return -1, stderr, -1
+
+ md5Command = "md5sum {f}".format(f=targetTarball.format(t=theTime))
+
+ try:
+ md5Process = Popen(md5Command.split(), stdout=PIPE, stderr=PIPE)
+
+ (stdout2, stderr2) = md5Process.communicate()
+
+ checksum = stdout2.split()[0].strip()
+
+ if md5Process.returncode != 0:
+ return -1, "md5 checksum creation failed : " + stderr2, -1
+ else:
+ return 0, targetTarball.format(t=theTime), checksum
+
+ except OSError as e:
+ return -1, str(e), -1
+
+def copyToHdfs(source, dest, isTesting=False):
+ """Utility function to copy a source file
+ to the destination HDFS directory/file
+ Args:
+ source (str): the source file on the local FS
+ dest (str): the target HDFS directory and filename
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise
+ message (str): message contains status string
+ """
+
+ retVal = -1
+
+ hdfsCommand = "/usr/bin/hdfs dfs -copyFromLocal {s} {d}".format(s=source,
+ d=dest)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+ # Force HDFS commands to run as gpadmin user
+ env = os.environ.copy()
+ env['HADOOP_USER_NAME'] = 'gpadmin'
+ hdfsProcess = Popen(hdfsCommand.split(), env=env, stdout=PIPE, stderr=PIPE)
+
+ (stdout, stderr) = hdfsProcess.communicate()
+
+ return hdfsProcess.returncode, stderr
+
+ else:
+ return 0, "TESTING"
+
+if __name__ == '__main__':
+ options, args = parseargs()
+
+ #if options.verbose:
+ # enable_verbose_logging()
+
+ # TODO - switch prints to this once using gppylibs
+ #logger, log_filename = setup_hawq_tool_logging('hawq_sync',getLocalHostname(),getUserName(), options.logDir)
+
+ if options.prompt:
+ # TODO - switch to this once using gppylibs
+ #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
+
+ # TODO - switch to gppylib-based logging
+ print "---------------------------------------------------------------"
+ print ""
+ print "This tool will extract metadata for every table in the schema list."
+ print ""
+ print "You must be gpadmin and have /usr/local/hawq/bin/psql in your PATH"
+ print ""
+ print " Would you like to continue?"
+ print "---------------------------------------------------------------"
+ answer = raw_input("y or n: ")
+ if "y" not in answer and "Y" not in answer:
+ print "Exiting."
+ sys.exit(1)
+
+ retVal, objectList = getSchemaObjectList(options.hawqUri,
+ schemaList=options.schemas.split(","),
+ isTesting=False)
+
+ retVal = saveMetadataForSchemaObjects(objectList, isTesting=False)
+
+ retVal, filenameOrStderr, checksum = createTarball(isTesting=False)
+
+ retVal, stderr = copyToHdfs(filenameOrStderr, "/hawq_default", isTesting=False)
diff --git a/tools/bin/hawqsync-falcon b/tools/bin/hawqsync-falcon
new file mode 100755
index 0000000000..8e613e6488
--- /dev/null
+++ b/tools/bin/hawqsync-falcon
@@ -0,0 +1,1319 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+from optparse import OptionParser
+from subprocess import Popen, PIPE
+from hashlib import md5
+from json import loads
+from time import strftime, sleep, time
+from collections import defaultdict
+# TODO - make use of these common HAWQ libs instead of print
+#from gppylib.gplog import setup_hawq_tool_logging, enable_verbose_logging
+#from gppylib.commands.unix import getLocalHostname, getUserName
+try:
+ from xml.etree import cElementTree as ElementTree
+except ImportError, e:
+ from xml.etree import ElementTree
+
+def parseargs():
+ parser = OptionParser(usage="HAWQ sync options.")
+ parser.add_option('-v', '--verbose', action='store_true',
+ default=False)
+ parser.add_option("-a", "--prompt", action="store_false",
+ dest="prompt", default=True,
+ help="Execute without prompt.")
+ parser.add_option("-l", "--logdir", dest="logDir",
+ help="Sets the directory for log files")
+ parser.add_option('-d', '--dryRun', action='store_true',
+ default=False,
+ dest='testMode', help="Execute in test mode")
+ parser.add_option('-u', '--user', dest='userName', default="gpadmin",
+ help="The user to own Falcon ACLs and run job as")
+ parser.add_option('--maxMaps', dest='distcpMaxMaps',
+ default="10",
+ help="The maximum number of map jobs to allow")
+ parser.add_option('--mapBandwidth', dest='distcpMaxMBpsPerMap',
+ default="100",
+ help="The maximum allowable bandwidth for each map job, in MB/s")
+ parser.add_option('-s', '--sourceNamenode', dest='sourceNamenode',
+ default="",
+ help="The IP or FQDN of the source HDFS namenode")
+ parser.add_option('-S', '--sourceEntity', dest='sourceClusterEntityName',
+ default="source",
+ help="The Falcon cluster entity name of the source")
+ parser.add_option('-m', '--sourceHawqMaster', dest='sourceHawqMaster',
+ default="",
+ help="The IP or FQDN of the source HAWQ master")
+ parser.add_option('-M', '--targetHawqMaster', dest='targetHawqMaster',
+ default="",
+ help="The IP or FQDN of the target HAWQ master")
+ parser.add_option('-f', '--falconUri', dest='falconUri',
+ default="http://localhost:15000",
+ help="The URI to use for issuing Falcon REST calls")
+ parser.add_option('-t', '--targetNamenode', dest='targetNamenode',
+ default="",
+ help="The IP or FQDN of the source HDFS namenode")
+ parser.add_option('-T', '--targetEntity', dest='targetClusterEntityName',
+ default="target",
+ help="The Falcon cluster entity name of the target")
+ parser.add_option('-e', '--executionEntity',
+ dest='executionClusterEntityName',
+ default="source",
+ help="The Falcon cluster entity name specifying where to execute the job")
+ parser.add_option('-w', '--workflowHdfsFilename', dest='workflowFilename',
+ default="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
+ help="The HDFS location of the underlying Oozie workflow to use for sync job")
+ parser.add_option('-p', '--pathToSync', dest='pathToSync',
+ default="/tmp/syncTest",
+ help="The root directory to be syncronized")
+ parser.add_option('-j', '--jobName', dest='jobName', default="drSync",
+ help="The Falcon job entity name to be executed")
+
+ (options, args) = parser.parse_args()
+ return (options, args)
+
+def extractFilenameAndSize(line, hdfsPort):
+ """Utility function to extract filename and file
+ size from a line of output from `hdfs dfs -ls -R`
+
+ """
+
+ tokens = line.split()
+ return tokens[-1].split(":" + hdfsPort)[-1], tokens[4]
+
+def flattenFilelist(data, hdfsPort):
+ """Utility function to convert a list of output
+ lines from `hdfs dfs -ls -R` into a single, sorted,
+ delimited string to be used as a syncronization
+ fingerprint
+
+ """
+
+ # Ensure record contains expected number of fields
+ isValid = lambda r: len(r.strip().split()) == 8
+
+ # Subset the records to only filename and size fields
+ filenameAndSize = [extractFilenameAndSize(line, hdfsPort) for line in data.split("\n") if isValid(line)]
+
+ # Reverse sort the list by filename column
+ sortedFilenameAndSize = sorted(filenameAndSize, key=lambda r: r[0], reverse=True)
+
+ # Flatten a single line into a delimited string
+ mergeLines = lambda l: "-".join(l)
+
+ # Perform the flatten for every line and join lines into a string
+ return "\n".join(map(mergeLines, sortedFilenameAndSize))
+
+def computeMd5(data):
+ """Utility function to compute MD5 checksum
+
+ """
+ hasher = md5()
+ hasher.update(data)
+
+ return hasher.hexdigest()
+
+def getHdfsFingerprint(hdfsUri="", hdfsDir="/hawq_default", isTesting=False):
+ """Utility function to compute an MD5
+ hash from the output of a recursive HDFS
+ directory listing
+
+ """
+
+ retVal = -1
+
+ hdfsPort = hdfsUri.split(":")[-1]
+
+ hdfsCommand = "hdfs dfs -ls -R {u}{d}".format(u=hdfsUri, d=hdfsDir)
+ #print hdfsCommand
+
+ filelist = None
+ stderr = None
+ if not isTesting:
+ # Force HDFS commands to run as gpadmin user
+ env = os.environ.copy()
+ env['HADOOP_USER_NAME'] = 'gpadmin'
+ hdfsProcess = Popen(hdfsCommand.split(), env=env,
+ stdout=PIPE, stderr=PIPE)
+
+ (filelist, stderr) = hdfsProcess.communicate()
+
+ retVal = hdfsProcess.returncode
+
+ if retVal != 0:
+ return retVal, stderr
+
+ # Sample output to follow
+ else:
+ filelist = """
+ drwx------ - gpadmin gpadmin 0 2016-07-14 12:32 hdfs://sandbox:8020/hawq_default/16385
+ drwx------ - gpadmin gpadmin 0 2016-08-04 18:58 hdfs://sandbox:8020/hawq_default/16385/16387
+ drwx------ - gpadmin gpadmin 0 2016-07-14 12:14 hdfs://sandbox:8020/hawq_default/16385/16387/18947
+ """;
+
+ retVal = 0
+
+ data = flattenFilelist(filelist, hdfsPort)
+
+ # sample yields: 342f414e7519f8c6a9eacce94777ba08
+ return retVal, computeMd5(data)
+
+def etree_to_dict(t):
+ """Utility function to turn an XML
+ element tree into a dictionary
+
+ """
+
+ d = {t.tag: {} if t.attrib else None}
+ children = list(t)
+ if children:
+ dd = defaultdict(list)
+ for dc in map(etree_to_dict, children):
+ for k, v in dc.iteritems():
+ dd[k].append(v)
+
+ d[t.tag] = dict((k, v[0]) if len(v) == 1 else dict((k, v)) for k, v in dd.iteritems())
+ if t.attrib:
+ d[t.tag].update(('@' + k, v) for k, v in t.attrib.iteritems())
+ if t.text:
+ text = t.text.strip()
+ if children or t.attrib:
+ if text:
+ d[t.tag]['#text'] = text
+ else:
+ d[t.tag] = text
+ return d
+
+def xmlToDict(data):
+ """Wrapper function to convert
+ XML string into a Python dictionary
+
+ """
+
+ e = ElementTree.XML(data)
+ return etree_to_dict(e)
+
+def getFalconStatus(falconUri="http://localhost:15000", entity="drSyncTest",
+ user="gpadmin", onlyRuntime=False, isTesting=False,
+ doDebug=False):
+ """Get the current status of an existing Falcon process/job entity
+
+ Args:
+ falconUri (str): the URI for the Falcon server (e.g. http://host:port)
+ entity (str): the Falcon process entity name to get status for
+ user (str): the username used for authorization
+ onlyRuntime (bool): only query for process runtime (e.g. post-completion)
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+ doDebug (bool): debugging mode for additional verbosity
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise. message can contain process start time or logfile
+ message (str): message can contain process end time or status string
+
+ """
+
+ retVal = -1
+
+ # Example REST call:
+ # GET http://localhost:15000/api/entities/status/process/drSyncTest?user.name=falcon&fields=status,clusters,tags
+
+ endpoint = "/api/instance/status/process/{e}?user.name={u}&fields=status".format(u=user, e=entity)
+
+ curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
+
+ if doDebug:
+ print curlCommand
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+ curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
+
+ (stdout, stderr) = curlProcess.communicate()
+
+ retVal = curlProcess.returncode
+
+ if doDebug:
+ print "stdout:", stdout, "stderr:", stderr
+
+ try:
+ result = loads(stdout)
+ if 'instances' not in result:
+ print "No instance was started, try deleting the job and re-running"
+ return -1, "stdout: {0}, stderr: {1}".format(stdout, stderr)
+ if onlyRuntime:
+ # Parse the start/end times in JSON result from cURL
+ resultPayload = result['instances'][0]
+ return resultPayload['startTime'], resultPayload['endTime']
+ else:
+ # Parse the logfile/status in JSON result from cURL
+ resultPayload = result['instances'][0] #['actions'][0]
+ return resultPayload['logFile'], resultPayload['status']
+ except KeyError as e:
+ print "KeyError in getFalconStatus()", str(e), "\n", stdout
+ return -1, str(e)
+ except ValueError as e:
+ print "ValueError in getFalconStatus()", str(e), "\n", stdout
+ print "Is Falcon running at : {} ?".format(falconUri)
+ return -1, str(e)
+
+ # Example output follows:
+ else:
+ stdout = """{
+ "status":"SUCCEEDED",
+ "message":"default/STATUS\n",
+ "requestId":"default/1436392466@qtp-1730704097-152 - 8dd8f7fa-2024-4bdb-a048-c188759c2f47\n",
+ "instances":[{
+ "instance":"2016-08-17T13:22Z",
+ "status":"SUSPENDED",
+ "logFile":"http://sandbox2.hortonworks.com:11000/oozie?job=0000014-160805194358788-oozie-oozi-W",
+ "cluster":"secondaryIDP",
+ "startTime":"2016-08-17T12:25:23-07:00",
+ "details":"",
+ "actions":[{
+ "action":"user-action",
+ "status":"RUNNING"
+ }, {
+ "action":"failed-post-processing",
+ "status":"RUNNING",
+ "logFile":"http://sandbox2.hortonworks.com:8088/proxy/application_1470437437449_0002/"
+ }]
+ }]
+ }""".replace("\n", "");
+
+ return 0, loads(stdout)['instances'][0]['actions'][0]
+
+def doFalconSchedule(falconUri="http://localhost:15000", jobName="drSyncTest",
+ userName="gpadmin", isTesting=False):
+ """Schedule an existing Falcon process/job entity for execution
+
+ Args:
+ falconUri (str): the URI for the Falcon server (e.g. http://host:port)
+ jobName (str): the Falcon process entity name to get status for
+ userName (str): the username used for authorization
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int): Zero for success, negative one otherwise
+ message (str): message status string (e.g. SUCCEEDED)
+
+ """
+
+ retVal = -1
+
+ # Example REST call:
+ # "Content-Type:text/xml"
+ # POST http://localhost:15000/api/entities/schedule/process/SampleProcess?skipDryRun=false
+
+ endpoint = "/api/entities/schedule/process/{n}?user.name={u}&skipDryRun=true".format(n=jobName, u=userName)
+
+ curlCommand = "curl -H Content-Type:text/xml -X POST {0}".format(falconUri + endpoint)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+ curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
+
+ (stdout, stderr) = curlProcess.communicate()
+
+ retVal = curlProcess.returncode
+
+ try:
+ # Parse the XML result from cURL into a dictionary
+ resultPayload = xmlToDict(stdout)['result']
+ return resultPayload['status'], resultPayload['message']
+ except KeyError:
+ print "Parse error in getFalconSchedule()", stdout
+ return -1, stdout
+ except:
+ print "Parse error in getFalconSchedule()", stdout
+ return -1, stdout
+
+ # Example output follows:
+ else:
+ stdout = """
+
+ SUCCEEDED
+ default/drSyncTest(process) scheduled successfully
+ default/2028387903@qtp-1730704097-6 - 89554f01-91cf-4bbd-97c2-ee175711b2ba
+
+ """.replace("\n", "");
+
+ return 0, xmlToDict(stdout)['result']['status']
+
+# Falcon process entity template used to create/update job attributes
+drSyncTemplate="""
+
+ _falcon_mirroring_type=HDFS
+
+
+
+
+
+ 1
+ LAST_ONLY
+ days(7)
+ GMT{gmtOffset}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+def doFalconSubmit(falconUri="http://localhost:15000", jobParameters=None,
+ isTesting=False):
+ """Submit/create a Falcon process/job entity
+
+ Args:
+ falconUri (str): the URI for the Falcon server (e.g. http://host:port)
+ jobParameters (dict): a dictionary containing process entity configuration
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int): Zero for success, negative one otherwise
+ message (str): message status string (e.g. SUCCEEDED)
+
+ """
+
+ retVal = -1
+
+ if jobParameters is None:
+ return retVal, "You must provide a job parameters dictionary"
+
+ # Example REST call:
+ # "Content-Type:text/xml"
+ # POST http://localhost:15000/api/entities/submit/process?user.name=falcon
+
+ endpoint = "/api/entities/submit/process?user.name={u}".format(u=jobParameters['userName'])
+
+ thisMinute = int(strftime("%M"))
+ thisYear = int(strftime("%Y"))
+ gmtOffset = strftime("%z")
+
+ oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1))
+ oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
+
+ # TODO long term would be to encapsulate Falcon functions in a class structure
+ # -- (i.e. full Python Falcon abstraction/API)
+ # -- Targeting S3 or Azure Blob will require a different template
+ payload = drSyncTemplate.format(startTime=oneMinuteLater,
+ endTime=oneYearLater,
+ gmtOffset=gmtOffset,
+ distcpMaxMaps=jobParameters['distcpMaxMaps'],
+ distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
+ sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
+ sourceHdfsUri=jobParameters['sourceHdfsUri'],
+ targetClusterEntityName=jobParameters['targetClusterEntityName'],
+ targetHdfsUri=jobParameters['targetHdfsUri'],
+ executionClusterEntityName=jobParameters['executionClusterEntityName'],
+ workflowFilename=jobParameters['workflowFilename'],
+ pathToSync=jobParameters['pathToSync'],
+ name=jobParameters['jobName'],
+ userName=jobParameters['userName'])
+
+ curlCommand = "curl -X GET {0}".format(falconUri + endpoint)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+ curlProcess = Popen(curlCommand.split(), stdout=PIPE, stderr=PIPE)
+
+ (stdout, stderr) = curlProcess.communicate()
+
+ retVal = curlProcess.returncode
+
+ try:
+ # Parse the XML result from cURL into a dictionary
+ resultPayload = xmlToDict(stdout)['result']
+ return resultPayload['status'], resultPayload['message']
+ except ElementTree.ParseError:
+ print "Parse error in getFalconSchedule()", stdout
+ return -1, stdout
+
+ # Example output follows:
+ else:
+ stdout = """
+
+ SUCCEEDED
+ falcon/default/Submit successful (process) drSyncTest
+ falcon/default/2028387903@qtp-1730704097-6 - 7ddba052-527b-462f-823f-e7dd0a1a08fa
+
+ """.replace("\n", "");
+
+ return 0, xmlToDict(stdout)['result']['status']
+
+def doFalconSoar(falconUri="http://localhost:15000", jobParameters=None,
+ isTesting=False):
+ """Update, schedule, and monitor a Falcon process/job entity
+
+ Args:
+ falconUri (str): the URI for the Falcon server (e.g. http://host:port)
+ jobParameters (dict): a dictionary containing process entity configuration
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int): Zero for success, negative one otherwise
+ message (str): message status string (e.g. SUCCEEDED)
+
+ """
+
+ retVal = -1
+
+ if jobParameters is None:
+ return retVal, "You must provide a job parameters dictionary"
+
+ # Example REST call:
+ # "Content-Type:text/xml"
+ # POST http://localhost:15000/api/entities/update/process/drSyncTest?user.name=falcon
+
+ endpoint = "/api/entities/update/process/{n}?user.name={u}".format(n=jobParameters['jobName'],
+ u=jobParameters['userName'])
+
+ thisMinute = int(strftime("%M"))
+ thisYear = int(strftime("%Y"))
+ gmtOffset = strftime("%z")[:3] + ":00"
+
+ oneMinuteLater = strftime("%Y-%m-%dT%H:{0:02d}Z".format(thisMinute + 1))
+ oneYearLater = strftime("{0}-%m-%dT%H:%MZ".format(thisYear + 1))
+
+ print "Scheduling for", oneMinuteLater
+ print "Ending on", oneYearLater
+
+ # TODO encapsulate everything in a class structure
+ # -- (i.e. full Python Falcon abstraction/API)
+ # -- Targeting AWS S3 or Azure Blob will require a different template
+ payload = drSyncTemplate.format(startTime=oneMinuteLater,
+ endTime=oneYearLater,
+ gmtOffset=gmtOffset,
+ distcpMaxMaps=jobParameters['distcpMaxMaps'],
+ distcpMaxMBpsPerMap=jobParameters['distcpMaxMBpsPerMap'],
+ sourceClusterEntityName=jobParameters['sourceClusterEntityName'],
+ sourceHdfsUri=jobParameters['sourceHdfsUri'],
+ targetClusterEntityName=jobParameters['targetClusterEntityName'],
+ targetHdfsUri=jobParameters['targetHdfsUri'],
+ executionClusterEntityName=jobParameters['executionClusterEntityName'],
+ workflowFilename=jobParameters['workflowFilename'],
+ pathToSync=jobParameters['pathToSync'],
+ name=jobParameters['jobName'],
+ userName=jobParameters['userName'])
+
+ # TODO output for debug level
+ #from pprint import pprint
+ #pprint (payload)
+
+ curlCommand = "curl -H Content-Type:text/xml -X POST {uri} -d ".format(uri=falconUri + endpoint)
+
+ stdout = None
+ stderr = None
+ if isTesting:
+ # Example output follows:
+ stdout = """
+
+ SUCCEEDED
+ falcon/update/default/Updated successfully
+ falcon/update/default/868391317@qtp-1730704097-47 - b2391bd7-3ae0-468e-b39c-5d002099a446
+
+ """.replace("\n", "");
+
+ # Parse the XML result from cURL into a dictionary
+
+ return 0, xmlToDict(stdout)['result']['status']
+
+ # Note, needed to seperate out the payload as it can't be split on spaces
+ curlProcess = Popen(curlCommand.split() + [payload],
+ stdout=PIPE, stderr=PIPE)
+
+ (stdout, stderr) = curlProcess.communicate()
+
+ retVal = curlProcess.returncode
+
+ # Curl process did not complete successfully
+ if retVal != 0:
+ print "Curl command failed"
+ return retVal, stderr
+
+ try:
+ # Parse the XML result from cURL into a dictionary
+ result = xmlToDict(stdout)['result']
+ stderr = result['message']
+
+ # Falcon REST update operation NOT successful
+ if "SUCCEEDED" not in result['status']:
+ print "Falcon REST operation not successful"
+ return result['status'], stderr
+
+ # We should doFalconSchedule() here
+ status, message = doFalconSchedule(falconUri=falconUri,
+ jobName=jobParameters['jobName'],
+ isTesting=False)
+
+ # Scheduling failed
+ if "SUCCEEDED" not in status:
+ return -1, message
+
+ # Reset retVal to catch error between scheduled and running states
+ retVal = -1
+ sleep(5)
+
+ message, status = getFalconStatus(falconUri=falconUri,
+ entity=jobParameters['jobName'])
+
+ # Continuously poll for hdfs-mirroring status
+ while "RUNNING" in status:
+ message, status = getFalconStatus(falconUri=falconUri,
+ entity=jobParameters['jobName'])
+ print status
+
+ # flag RUNNING state reached using retVal
+ retVal = 0
+ sleep(10)
+
+ if status == "KILLED":
+ return -1, message
+
+ # Poll one last time for runtimes
+ start, finish = getFalconStatus(falconUri=falconUri,
+ entity=jobParameters['jobName'],
+ onlyRuntime=True)
+
+ return retVal, "logfile: {0} started: {1} finished: {2}".format(message, start, finish)
+
+ except KeyError:
+ print "Are you using the correct Falcon server URI?", falconUri
+ return -1, stdout
+
+
+def hdfsFingerprintsMatch(reference, comparison):
+ """Helper function to compare two fingerprints / md5 hashes
+
+ Args:
+ reference (str): the reference MD5 checksum string
+ comparison (str): the comparison MD5 checksum string
+
+ Returns:
+ isEqual (bool): True for success, False otherwise
+
+ """
+
+ return reference == comparison
+
+
+def stopHawq(masterHost=None, isTesting=False):
+ """Optionally connect to a remote HAWQ master
+ and do a quick stop of the HAWQ master process
+
+ Args:
+ masterHost (str): the remote host to SSH to first, otherwise localhost
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise
+ message (str): message contains a stderr string
+
+ """
+
+ retVal = -1
+
+ hawqStopCommand = "hawq stop master -a -M fast"
+
+ if masterHost is not None:
+ hawqStopCommand = "ssh {h} -- '{c}'".format(h=masterHost,
+ c=hawqStopCommand)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+ hawqStopProcess = Popen(hawqStopCommand,
+ stdout=PIPE, stderr=PIPE, shell=True)
+
+ (stdout, stderr) = hawqStopProcess.communicate()
+
+ return hawqStopProcess.returncode, stderr
+
+ else:
+ return 0, "TEST MODE"
+
+def startHawq(masterHost=None, isTesting=False):
+ """Optionally connect to a remote HAWQ master
+ and do a start of the HAWQ master process
+
+ Args:
+ masterHost (str): the remote host to SSH to first, otherwise localhost
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise
+ message (str): message contains a stderr string
+
+ """
+
+ retVal = -1
+
+ hawqStartCommand = "hawq start master -a"
+
+ if masterHost is not None:
+ hawqStartCommand = "ssh {h} -- '{c}'".format(h=masterHost,
+ c=hawqStartCommand)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+ hawqStartProcess = Popen(hawqStartCommand,
+ stdout=PIPE, stderr=PIPE, shell=True)
+
+ (stdout, stderr) = hawqStartProcess.communicate()
+
+ return hawqStartProcess.returncode, stderr
+
+ else:
+ return 0, "TEST MODE"
+
+
+def copyToHdfs(source, dest, isTesting=False):
+ """Utility function to copy a source file
+ to the destination HDFS directory/file
+
+ Args:
+ source (str): the source file on the local FS
+ dest (str): the target HDFS directory and filename
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise
+ message (str): message contains status string
+
+ """
+
+ retVal = -1
+
+ hdfsCommand = "/usr/bin/hdfs dfs -copyFromLocal {s} {d}".format(s=source,
+ d=dest)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+ # Force HDFS commands to run as hdfs user
+ env = os.environ.copy()
+ env['HADOOP_USER_NAME'] = 'gpadmin'
+ hdfsProcess = Popen(hdfsCommand.split(), env=env,
+ stdout=PIPE, stderr=PIPE)
+
+ (stdout, stderr) = hdfsProcess.communicate()
+
+ return hdfsProcess.returncode, stderr
+
+ else:
+ return 0, "TESTING"
+
+def checkHdfsSafemode(namenodeHost=None, isTesting=False):
+ """Utility function to query HDFS for
+ safemode enabled or disabled
+
+ Args:
+ namenodeHost (str): the remote host to SSH to first, otherwise localhost
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise
+ message (str): message contains status string
+
+ """
+
+ retVal = -1
+
+ hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode get"
+
+ if namenodeHost is not None:
+ hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
+ c=hdfsCommand)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+ # Force HDFS commands to run as hdfs user
+ env = os.environ.copy()
+ env['HADOOP_USER_NAME'] = 'hdfs'
+ hdfsProcess = Popen(hdfsCommand, env=env,
+ stdout=PIPE, stderr=PIPE, shell=True)
+
+ (stdout, stderr) = hdfsProcess.communicate()
+
+ try:
+ offOrOn = True if "ON" in stdout.split()[-1] else False
+ except IndexError as e:
+ return -1, str(e)
+
+ return hdfsProcess.returncode, offOrOn
+
+ else:
+ return 0, "TESTING"
+
+def enableHdfsSafemode(namenodeHost=None, isTesting=False):
+ """Utility function to enable safemode
+
+ Args:
+ namenodeHost (str): the remote host to SSH to first, otherwise localhost
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise
+ message (str): message contains status string
+
+ """
+
+ retVal = -1
+
+ hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode enter"
+
+ if namenodeHost is not None:
+ hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
+ c=hdfsCommand)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+ # Force HDFS commands to run as hdfs user
+ env = os.environ.copy()
+ env['HADOOP_USER_NAME'] = 'hdfs'
+ hdfsProcess = Popen(hdfsCommand, env=env,
+ stdout=PIPE, stderr=PIPE, shell=True)
+
+ (stdout, stderr) = hdfsProcess.communicate()
+
+ return hdfsProcess.returncode, stderr
+
+ else:
+ return 0, "TESTING"
+
+def disableHdfsSafemode(namenodeHost=None, isTesting=False):
+ """Utility function to disable safemode
+
+ Args:
+ namenodeHost (str): the remote host to SSH to first, otherwise localhost
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise
+ message (str): message contains status string
+
+ """
+
+ retVal = -1
+
+ hdfsCommand = "/usr/bin/hdfs dfsadmin -safemode leave"
+
+ if namenodeHost is not None:
+ hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
+ c=hdfsCommand)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+ # Force HDFS commands to run as hdfs user
+ env = os.environ.copy()
+ env['HADOOP_USER_NAME'] = 'hdfs'
+ hdfsProcess = Popen(hdfsCommand, env=env,
+ stdout=PIPE, stderr=PIPE, shell=True)
+
+ (stdout, stderr) = hdfsProcess.communicate()
+
+ return hdfsProcess.returncode, stderr
+
+ else:
+ return 0, "TESTING"
+
+def forceHdfsCheckpoint(namenodeHost=None, isTesting=False):
+ """Utility function to force an HDFS checkpoint
+
+ Args:
+ namenodeHost (str): the remote host to SSH to first, otherwise localhost
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise
+ message (str): message contains status string
+
+ """
+
+ retVal = -1
+
+ hdfsCommand = "/usr/bin/hdfs dfsadmin -saveNamespace"
+
+ if namenodeHost is not None:
+ hdfsCommand = "ssh {h} -- 'env HADOOP_USER_NAME=hdfs {c}'".format(h=namenodeHost,
+ c=hdfsCommand)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+ # Force HDFS commands to run as hdfs user
+ env = os.environ.copy()
+ env['HADOOP_USER_NAME'] = 'hdfs'
+ hdfsProcess = Popen(hdfsCommand, env=env,
+ stdout=PIPE, stderr=PIPE, shell=True)
+
+ (stdout, stderr) = hdfsProcess.communicate()
+
+ return hdfsProcess.returncode, stderr
+
+ else:
+ return 0, "TESTING"
+
+
+def createTarball(masterDataBase="/data/hawq/",
+ targetTarball="/tmp/hawqMdd-{t}.tar", isTesting=False):
+ """Utility function to create a tarball of the HAWQ MASTER_DATA_DIRECTORY
+
+ Args:
+ masterDataBase (str): the base directory containing the MASTER_DATA_DIRECTORY
+ targetTarball (str): the target directory and filename of the tarball
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise
+ message (str): message contains status string
+
+ """
+
+ retVal = -1
+ checksum = None
+
+ # Example invocation (note: excluding most of pg_log contents)
+ # tar cpf /tmp/test.tar.bz2 --exclude=csv -C /data/hawq master
+
+ theTime = strftime("%Y-%m-%d-%H%M")
+
+ tarCommand = "tar -cpf {t} --exclude=csv -C {c} master".format(t=targetTarball.format(t=theTime),
+ c=masterDataBase)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+
+ try:
+ tarProcess = Popen(tarCommand.split(), stdout=PIPE, stderr=PIPE)
+
+ (stdout, stderr) = tarProcess.communicate()
+
+ except OSError as e:
+ return -1, str(e), -1
+
+ if tarProcess.returncode != 0:
+ print "Tarball creation failed : " + stderr
+ return -1, stderr, -1
+
+ md5Command = "md5sum {f}".format(f=targetTarball.format(t=theTime))
+
+ try:
+ md5Process = Popen(md5Command.split(),
+ stdout=PIPE, stderr=PIPE)
+
+ (stdout2, stderr2) = md5Process.communicate()
+
+ checksum = stdout2.split()[0].strip()
+
+ if md5Process.returncode != 0:
+ return -1, "md5 checksum creation failed : " + stderr2, -1
+ else:
+ return 0, targetTarball.format(t=theTime), checksum
+
+ except OSError as e:
+ return -1, str(e), -1
+
+
+ else:
+ return 0, "TEST BRANCH", -1
+
+def cleanupTarball(filename, isTesting=False):
+ """Utility function to delete a tarball of the HAWQ MASTER_DATA_DIRECTORY
+
+ Args:
+ filename (str): the target directory and filename of the tarball to clean up
+ isTesting (bool): NOOP mode bypassing actual REST calls for testing
+
+ Returns:
+ retVal (int) or message (str): Zero for success, negative one otherwise
+ message (str): message contains status string
+
+ """
+
+ retVal = -1
+ checksum = None
+
+ # Example invocation (note: excluding most of pg_log contents)
+ # rm -f /tmp/test.tar
+
+ rmCommand = "rm -f {f}".format(f=filename)
+
+ stdout = None
+ stderr = None
+ if not isTesting:
+
+ try:
+ rmProcess = Popen(rmCommand.split(), stdout=PIPE, stderr=PIPE)
+
+ (stdout, stderr) = rmProcess.communicate()
+
+ retVal = rmProcess.returncode
+
+ return retVal, stderr
+
+ except OSError as e:
+ return -1, str(e)
+ else:
+ return 0, "TEST BRANCH"
+
+if __name__ == '__main__':
+ options, args = parseargs()
+
+ #if options.verbose:
+ # enable_verbose_logging()
+
+ # TODO - switch prints to this once using gppylibs
+ #logger, log_filename = setup_hawq_tool_logging('hawq_sync',getLocalHostname(),getUserName(), options.logDir)
+
+
+ # ### HAWQ Extract every non-system table (source)
+ # Note: the asusmption is this has been done in
+ # advance of executing this tool.
+ if options.prompt:
+ # TODO - switch to this once using gppylibs
+ #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
+
+ # TODO - switch to gppylib-based logging
+ print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
+ print ""
+ print "Please confirm you've performed hawq_extract() on all critical data tables"
+ print "and saved this information outside of the cluster (e.g. version control)"
+ print "or are using Falcon with an atomic option (i.e. in HDP-2.5: snapshot-based replication)"
+ print ""
+ print "This is critical for data recovery if a sync operation partially completes!"
+ print ""
+ print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
+ answer = raw_input("y or n: ")
+ if "y" not in answer and "Y" not in answer:
+ print "Exiting."
+ sys.exit(1)
+
+ # ### Falcon cluster entities
+ # Note: the assumption is both source and target
+ # cluster entities have alredy been created in Falcon
+ # TODO add a confirmation step, later a REST call to check
+ if options.prompt:
+ # TODO - switch to this once using gppylibs
+ #if not userinput.ask_yesno(None, "\nContinue with HAWQ standby master activate", 'N'):
+
+ # TODO - switch to gppylib-based logging
+ print ""
+ print "Please confirm you've created both source and target Falcon cluster entities:"
+ print ""
+ answer = raw_input("y or n: ")
+ if "y" not in answer and "Y" not in answer:
+ print "Exiting."
+ sys.exit(1)
+
+ # ### Stop HAWQ
+ #
+ # TODO?: switch to Ambari REST, followed by pkill -5 <>
+
+ # Source
+ earlier = int(time())
+ print "Stopping source HAWQ" if options.verbose else None;
+ retVal, stderr = stopHawq(isTesting=options.testMode)
+ print retVal, stderr if options.verbose else None;
+
+ if retVal != 0:
+ print "Failed to stop source HAWQ master"
+ print "Error message was " + stderr
+ print "Exiting"
+ sys.exit(1)
+
+ # Target
+ # TODO - either use Ambari REST call or use SSH
+ print "Stopping target HAWQ" if options.verbose else None;
+ retVal, stderr = stopHawq(masterHost=options.targetHawqMaster,
+ isTesting=options.testMode)
+ print retVal, stderr if options.verbose else None;
+
+ if retVal != 0:
+ print "Failed to stop target HAWQ master"
+ print "Error message was " + stderr
+ print "Restarting source HAWQ" if options.verbose else None;
+ retVal, stderr = startHawq(isTesting=options.testMode)
+ later = int(time())
+
+ if retVal == 0:
+ print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
+ else:
+ print "Source HAWQ failed to restart after pre-sync failure."
+ print "Exiting"
+ sys.exit(1)
+
+ # ### Create HAWQ Master Data Directory archive
+ print "Creating MDD tarball" if options.verbose else None;
+ retVal, filenameOrStderr, md5sum = createTarball(masterDataBase="/data/hawq/",
+ isTesting=options.testMode)
+ print retVal, filenameOrStderr, md5sum if options.verbose else None;
+
+ if retVal != 0:
+ print "Failed to create archive of source HAWQ MASTER_DATA_DIRECTORY"
+ print "Error message was : " + filenameOrStderr
+ print "Cleaning up MDD tarball on local FS"
+ print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
+ retVal, stderr = startHawq(isTesting=options.testMode)
+ later = int(time())
+ if retVal == 0:
+ print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
+ else:
+ print "Source HAWQ failed to restart after pre-sync failure."
+ print "Exiting."
+ sys.exit(1)
+
+ # ### Start HAWQ
+ print "Starting source HAWQ" if options.verbose else None;
+ retVal, stderr = startHawq(isTesting=options.testMode)
+ later = int(time())
+
+ if retVal != 0:
+ print "Failed to start source HAWQ master"
+ print "Error message was " + stderr
+ print "Cleaning up MDD tarball on local FS"
+ print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
+ print "Exiting"
+ sys.exit(1)
+
+ print "HAWQ was down for {0} seconds".format(later-earlier) if options.verbose else None;
+
+ # TODO add a CLI flag to force source into read-write
+ if checkHdfsSafemode()[1] == True:
+ print "Source cluster HDFS is read-only, cannot proceed"
+ print "Cleaning up MDD tarball on local FS"
+ print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
+ sys.exit(1)
+
+ # ### Copy MDD archive to HDFS
+ print "Copying MDD tarball to HDFS" if options.verbose else None;
+ retVal, stderr = copyToHdfs(source=filenameOrStderr,
+ dest=options.pathToSync,
+ isTesting=options.testMode)
+
+ if retVal != 0:
+ print "Failed to copy MDD tarball to HDFS"
+ print "Error message was " + stderr
+ print "Cleaning up MDD tarball on local FS"
+ print cleanupTarball(filenameOrStderr, isTesting=options.testMode)
+ print "Exiting"
+ sys.exit(1)
+
+ # ### Cleanup MDD archive from /tmp
+ print "Cleaning up MDD tarball on local FS" if options.verbose else None;
+ retVal, stderr = cleanupTarball(filenameOrStderr,
+ isTesting=options.testMode)
+
+ if retVal != 0:
+ print "Failed to clean up MDD tarball"
+ print "Error message was " + stderr
+ print ""
+ print "You will need to manually remove the following file"
+ print filenameOrStderr
+ print ""
+ print "Exiting"
+ sys.exit(1)
+
+ """
+ # ### Force HDFS checkpoint and enable safemode on source
+ print "Enabling HDFS safemode on source cluster" if options.verbose else None;
+ retVal, stderr = enableHdfsSafemode(isTesting=options.testMode)
+
+ if retVal != 0:
+ print "Failed to enable HDFS safemode on source cluster"
+ print "Error message was " + stderr
+ print "Exiting"
+ sys.exit(1)
+
+ print "Forcing HDFS checkpoint on source cluster" if options.verbose else None;
+ retVal, stderr = forceHdfsCheckpoint(isTesting=options.testMode)
+
+ if retVal != 0:
+ print "Failed to force HDFS checkpoint on source cluster"
+ print "Error message was " + stderr
+ print "Exiting"
+ sys.exit(1)
+ """;
+
+ # ### Leave safemode on target HDFS
+ print "Disabling HDFS safemode on target" if options.verbose else None;
+ retVal, stderr = disableHdfsSafemode(namenodeHost=options.targetNamenode,
+ isTesting=options.testMode)
+
+ if retVal != 0:
+ print "Failed to leave HDFS safemode on target cluster"
+ print "Error message was " + stderr
+ print "Exiting"
+ sys.exit(1)
+
+ # Note, the entity names refer to Falcon
+ # entities that have been created prior
+ # to execution of this tool
+ """
+ jobParameters = dict(userName="gpadmin",
+ distcpMaxMaps="100",
+ distcpMaxMBpsPerMap="1000",
+ sourceClusterEntityName="sourceCluster",
+ sourceHdfsUri="hdfs://{0}:8020".format(sourceNamenode),
+ targetClusterEntityName="targetCluster",
+ txMapsargetHdfsUri="hdfs://{0}:8020".format(targetNamenode),
+ executionClusterEntityName="sourceCluster",
+ workflowFilename="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
+ pathToSync="/tmp/syncTest",
+ jobName="drSync")
+ """;
+
+ jobParameters = dict(userName=options.userName,
+ distcpMaxMaps=options.distcpMaxMaps,
+ distcpMaxMBpsPerMap=options.distcpMaxMBpsPerMap,
+ sourceClusterEntityName=options.sourceClusterEntityName,
+ sourceHdfsUri="hdfs://{0}:8020".format(options.sourceNamenode),
+ targetClusterEntityName=options.targetClusterEntityName,
+ targetHdfsUri="hdfs://{0}:8020".format(options.targetNamenode),
+ executionClusterEntityName=options.executionClusterEntityName,
+ workflowFilename=options.workflowFilename,
+ pathToSync=options.pathToSync,
+ jobName=options.jobName)
+
+ print jobParameters if options.verbose else None;
+
+ # ### Update and Schedule Job - monitor until completion
+ print "Falcon Soar" if options.verbose else None;
+ retVal, stderr = doFalconSoar(falconUri=options.falconUri,
+ jobParameters=jobParameters,
+ isTesting=options.testMode)
+ falconOutput = stderr
+
+ if retVal != 0:
+ print "Falcon replication job failed"
+ print "Error message was " + stderr
+ print "Source cluster will be left in safemode for remediation"
+ print "Exiting"
+ sys.exit(1)
+
+ # ### Leave safemode on source HDFS
+ print "Disable HDFS safemode on source cluster" if options.verbose else None;
+ retVal, stderr = disableHdfsSafemode(isTesting=options.testMode)
+
+ if retVal != 0:
+ print "Failed to leave HDFS safemode on source cluster"
+ print "Error message was " + stderr
+ print "Exiting"
+ sys.exit(1)
+
+ # ### Force HDFS checkpoint and enable safemode on target
+ print "Enabling HDFS safemode on target cluster" if options.verbose else None
+ retVal, stderr = enableHdfsSafemode(namenodeHost=options.targetNamenode, isTesting=options.testMode)
+
+ if retVal != 0:
+ print "Failed to enable HDFS safemode on target cluster"
+ print "Error message was " + stderr
+ print "Exiting"
+ sys.exit(1)
+
+ print "Forcing HDFS checkpoint on target cluster" if options.verbose else None
+ retVal, stderr = forceHdfsCheckpoint(namenodeHost=options.targetNamenode, isTesting=options.testMode)
+
+ if retVal != 0:
+ print "Failed to force HDFS checkpoint on target cluster"
+ print "Error message was " + stderr
+ print "Exiting"
+ sys.exit(1)
+
+ # ### HDFS Fingerprint comparison
+
+ print "Validating HDFS fingerprints match between source and target clusters" if options.verbose else None
+
+ retVal, md5OrStderr = getHdfsFingerprint(hdfsUri=jobParameters['sourceHdfsUri'],
+ hdfsDir=jobParameters['pathToSync'],
+ isTesting=options.testMode)
+
+ if retVal != 0:
+ print "Failed to generate HDFS fingerprint on source cluster"
+ print "Error message was " + md5OrStderr
+ print "Exiting"
+ sys.exit(1)
+
+ retVal, md5OrStderr2 = getHdfsFingerprint(hdfsUri=jobParameters['targetHdfsUri'],
+ hdfsDir=jobParameters['pathToSync'],
+ isTesting=options.testMode)
+
+ if retVal != 0:
+ print "Failed to generate HDFS fingerprint on target cluster"
+ print "Error message was " + md5OrStderr2
+ print "Exiting"
+ sys.exit(1)
+
+ isValidSync = hdfsFingerprintsMatch(md5OrStderr, md5OrStderr2)
+
+ if not isValidSync:
+ print "Source and target cluster HDFS fingerprints do not match."
+ print "Source checksum : " + md5OrStderr
+ print "Target checksum : " + md5OrStderr2
+ print "This is bad, please check Falcon sync logs : " + falconOutput
+ sys.exit(1)
+ else:
+ print "Source and target HDFS fingerprints match."
+
+ # Target
+ # TODO - either use Ambari REST call or use SSH
+ print "Starting target HAWQ" if options.verbose else None;
+ retVal, stderr = startHawq(masterHost=options.targetHawqMaster,
+ isTesting=options.testMode)
+ print retVal, stderr if options.verbose else None;
+
+ if retVal != 0:
+ print "Failed to start target HAWQ master"
+ print "Error message was " + stderr
+ print "Exiting"
+ sys.exit(1)
+ else:
+ print "HAWQ sync completed successfully!"
+ print """
+ ## Manual runbook during DR event
+ 1. Copy MDD archive from HDFS to target master (CLI)
+ 2. Restore archive in /data/hawq/ (CLI)
+ 3. Disable HDFS safemode (CLI)
+ 4. Start HAWQ (CLI)
+ 5. Update hawq_segment_configuration (psql)
+ 6. Deploy a HAWQ standby master in Ambari
+ """;
+ sys.exit(0)
+
diff --git a/tools/doc/gpconfigs/hawqsync-falcon-workflow.xml b/tools/doc/gpconfigs/hawqsync-falcon-workflow.xml
new file mode 100644
index 0000000000..084b9b8dab
--- /dev/null
+++ b/tools/doc/gpconfigs/hawqsync-falcon-workflow.xml
@@ -0,0 +1,45 @@
+
+
+
+ _falcon_mirroring_type=HDFS
+
+
+
+
+
+ 1
+ LAST_ONLY
+ months(1)
+ GMT-07:00
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tools/doc/gpconfigs/hawqsync-oozie-workflow.xml b/tools/doc/gpconfigs/hawqsync-oozie-workflow.xml
new file mode 100644
index 0000000000..8af4de8531
--- /dev/null
+++ b/tools/doc/gpconfigs/hawqsync-oozie-workflow.xml
@@ -0,0 +1,55 @@
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapred.job.priority
+ ${jobPriority}
+
+
+ mapred.job.queue.name
+ ${queueName}
+
+
+ -update
+ -delete
+ -m
+ ${distcpMaxMaps}
+ -bandwidth
+ ${distcpMapBandwidth}
+ -strategy
+ dynamic
+ ${drSourceClusterFS}${drSourceDir}
+ ${drTargetClusterFS}${drTargetDir}
+
+
+
+
+
+
+ Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+