Skip to content

Commit

Permalink
Add a mysql wrapper for mysqlsh
Browse files Browse the repository at this point in the history
  • Loading branch information
Arnaud Adant committed Dec 23, 2023
1 parent c35b4d0 commit 2b6adb7
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 14 deletions.
42 changes: 41 additions & 1 deletion sink-connector/python/db/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,46 @@ def get_mysql_connection(mysql_host, mysql_user, mysql_passwd, mysql_port, mysql
return conn


def get_tables_from_regex_sql(conn, no_wc, mysql_database, include_tables_regex, exclude_tables_regex=None, non_partitioned_tables_only=False):
schema = mysql_database
exclude_regex_clause = ""
if exclude_tables_regex is not None:
exclude_regex_clause = f"and table_name not rlike '{exclude_tables_regex}'"
non_partitioned_tables_clause = ""
if non_partitioned_tables_only:
non_partitioned_tables_clause = f" and (table_schema, table_name) in (select table_schema, table_name from information_schema.partitions where table_schema = '{schema}' group by table_schema, table_name having count(*) = 1 )"

strCommand = f"select TABLE_SCHEMA as table_schema, TABLE_NAME as table_name from information_schema.tables where table_type='BASE TABLE' and table_schema = '{schema}' and table_name rlike '{include_tables_regex}' {exclude_regex_clause} {non_partitioned_tables_clause} order by 1"
return strCommand


def get_tables_from_regex(conn, no_wc, mysql_database, include_tables_regex, exclude_tables_regex=None, non_partitioned_tables_only=False):
if no_wc:
return [[tables_regex]]

strCommand = get_tables_from_regex_sql(conn, no_wc, mysql_database, include_tables_regex, exclude_tables_regex=exclude_tables_regex, non_partitioned_tables_only=non_partitioned_tables_only)

(rowset, rowcount) = execute_mysql(conn, strCommand)
x = rowset

return x


def get_partitions_from_regex(conn, mysql_database, include_tables_regex, exclude_tables_regex=None, include_partitions_regex=None, non_partitioned_tables_only=False):

table_sql = get_tables_from_regex_sql(conn, False, mysql_database, include_tables_regex, exclude_tables_regex=exclude_tables_regex, non_partitioned_tables_only=non_partitioned_tables_only)

include_regex_clause = ""
if include_partitions_regex is not None:
include_regex_clause = f"and partition_name rlike '{include_partitions_regex}'"

strCommand = f"select TABLE_SCHEMA as table_schema, TABLE_NAME as table_name, PARTITION_NAME as partition_name from information_schema.partitions where table_schema = '{mysql_database}' {include_regex_clause} and (table_schema, table_name) IN ({table_sql}) order by 1,2,3"
(rowset, rowcount) = execute_mysql(conn, strCommand)
x = rowset

return x


def execute_mysql(conn, strSql):
"""
# -- =======================================================================
Expand Down Expand Up @@ -54,5 +94,5 @@ def resolve_credentials_from_config(config_file):
assert 'client' in config, f"Expected a [client] section in f{config_file}"
mysql_user = config['client']['user']
mysql_password = config['client']['password']
logging.debug(f"mysql_user {mysql_user} mysql_password {mysql_password}")
logging.debug(f"mysql_user {mysql_user} mysql_password ****")
return (mysql_user, mysql_password)
4 changes: 2 additions & 2 deletions sink-connector/python/db_compare/clickhouse_table_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def select_table_statements(table, query, select_query, order_by, external_colum
return statements


def get_tables_from_regex(conn, strDSN):
def get_tables_from_regex(conn):
if args.no_wc:
return [[args.tables_regex]]

Expand Down Expand Up @@ -367,7 +367,7 @@ def main():
(clickhouse_user, clickhouse_password) = resolve_credentials_from_config(config_file)
try:
conn = get_connection(clickhouse_user, clickhouse_password)
tables = get_tables_from_regex(conn, args.tables_regex)
tables = get_tables_from_regex(conn)
# CH does not print decimal with trailing zero, we need a custom function
execute_sql(conn, create_function_format_decimal)

Expand Down
13 changes: 2 additions & 11 deletions sink-connector/python/db_compare/mysql_table_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,8 @@ def select_table_statements(table, query, select_query, order_by, external_colum
return statements


def get_tables_from_regex(conn, strDSN):
if args.no_wc:
return [[args.tables_regex]]
schema = args.mysql_database
strCommand = "select TABLE_NAME as table_name from information_schema.tables where table_type='BASE TABLE' and table_schema = '{d}' and table_name rlike '{t}' order by 1".format(
d=schema, t=args.tables_regex)
(rowset, rowcount) = execute_mysql(conn, strCommand)
x = rowset
conn.close()

return x
def get_tables_from_regex(conn, tables_regexp):
return get_tables_from_regex(conn, args.no_wc, args.mysql_database, tables_regexp)


def calculate_sql_checksum(conn, table):
Expand Down
3 changes: 3 additions & 0 deletions sink-connector/python/db_dump/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""db_dump"""

__version__ = "0.1"
265 changes: 265 additions & 0 deletions sink-connector/python/db_dump/mysql_dumper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
# -- ============================================================================
"""
# -- ============================================================================
# -- FileName : mysql_dumper.py
# -- Date :
# -- Summary : dumps a MySQL database using mysqlsh
# -- Credits : https://dev.mysql.com/doc/mysql-shell/8.0/en/mysql-shell-utilities-dump-instance-schema.html
# --
"""
import logging
import argparse
import traceback
import sys
import datetime
import os
from db.mysql import *
from subprocess import Popen, PIPE
import subprocess
import time

runTime = datetime.datetime.now().strftime("%Y.%m.%d-%H.%M.%S")


SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.dirname(SCRIPT_DIR))


def check_program_exists(name):
p = Popen(['/usr/bin/which', name], stdout=PIPE, stderr=PIPE)
p.communicate()
return p.returncode == 0

# hack to add the user to the logger, which needs it apparently
old_factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.user = "me"
return record


logging.setLogRecordFactory(record_factory)

def run_command(cmd):
"""
# -- ======================================================================
# -- run the command that is passed as cmd and return True or False
# -- ======================================================================
"""
logging.debug("cmd " + cmd)
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True)
for line in process.stdout:
logging.info(line.decode().strip())
time.sleep(0.02)
rc = str(process.poll())
logging.debug("return code = " + str(rc))
return rc


def run_quick_command(cmd):
logging.debug("cmd " + cmd)
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True)
stdout, stderr = process.communicate()
rc = str(process.poll())
if stdout:
logging.info(str(stdout).strip())
logging.debug("return code = " + rc)
if rc != "0":
logging.error("command failed : terminating")
return rc, stdout


def generate_mysqlsh_dump_tables_clause(dump_dir,
dry_run,
database,
tables_to_dump,
data_only,
schema_only,
where,
partition_map,
threads):
# -e 'util.dumpTables("cryptobo_day2_prod", ["street_trade","street_position_exchange_v3","street_position_exchange_blob_v3","rec_trade_missing_street","rec_trade_missing_jump"], "/home/aadant/dbdumps/cryptobo_day2_prod", {"partitions" : { "cryptobo_day2_prod.street_trade": ["p20230517"],"cryptobo_day2_prod.street_position_exchange_v3": ["p20230517"], "cryptobo_day2_prod.street_position_exchange_blob_v3": ["p20230519"], "cryptobo_day2_prod.rec_trade_missing_street": ["p20230515"], "cryptobo_day2_prod.rec_trade_missing_jump": ["p20230515"]}});';
table_array_clause = tables_to_dump
dump_options = {"dryRun":int(dry_run), "ddlOnly":int(schema_only), "dataOnly":int(data_only), "threads":threads}
if partition_map:
dump_options['partitions'] = partition_map
dump_clause=f""" util.dumpTables('{database}',{table_array_clause}, '{dump_dir}', {dump_options} ); """
return dump_clause


def generate_mysqlsh_command(dump_dir,
dry_run,
mysql_host,
mysql_user,
mysql_password,
mysql_port,
defaults_file,
database,
tables_to_dump,
data_only,
schema_only,
where,
partition_map,
threads):
mysql_user_clause = ""
if mysql_user is not None:
mysql_user_clause = f" --user {mysql_user}"
mysql_password_clause = ""
if mysql_password is not None:
mysql_password_clause = f""" --password "{mysql_password}" """
mysql_port_clause = ""
if mysql_port is not None:
mysql_port_clause = f" --port {mysql_port}"
defaults_file_clause = ""
if defaults_file is not None:
defaults_file_clause = f" --defaults-file={defaults_file}"

dump_clause = generate_mysqlsh_dump_tables_clause(dump_dir,
dry_run,
database,
tables_to_dump,
data_only,
schema_only,
where,
partition_map,
threads)
cmd = f"""mysqlsh {defaults_file_clause} -h {mysql_host} {mysql_user_clause} {mysql_password_clause} {mysql_port_clause} -e "{dump_clause}" """
return cmd


def main():

parser = argparse.ArgumentParser(description='''Wrapper for mysqlsh dump''')
# Required
parser.add_argument('--mysql_host', help='MySQL host', required=True)
parser.add_argument('--mysql_user', help='MySQL user', required=False)
parser.add_argument('--mysql_password',
help='MySQL password, discouraged, please use a config file', required=False)
parser.add_argument('--defaults_file',
help='MySQL config file default is ~/.my.cnf', required=False, default='~/.my.cnf')
parser.add_argument('--mysql_database',
help='MySQL database', required=True)
parser.add_argument('--mysql_port', help='MySQL port',
default=3306, required=False)
parser.add_argument('--dump_dir', help='Location of dump files', required=True)
parser.add_argument('--include_tables_regex', help='table regexp', required=False, default=None)
parser.add_argument('--where', help='where clause', required=False)
parser.add_argument('--exclude_tables_regex',
help='exclude table regexp', required=False)
parser.add_argument('--include_partitions_regex', help='partitions regex', required=False, default=None)
parser.add_argument('--threads', type=int,
help='number of parallel threads', default=1)
parser.add_argument('--debug', dest='debug',
action='store_true', default=False)
parser.add_argument('--schema_only', dest='schema_only',
action='store_true', default=False)
parser.add_argument('--data_only', dest='data_only',
action='store_true', default=False)
parser.add_argument('--non_partitioned_tables_only', dest='non_partitioned_tables_only',
action='store_true', default=False)
parser.add_argument('--dry_run', dest='dry_run',
action='store_true', default=False)

global args
args = parser.parse_args()

root = logging.getLogger()
root.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)

formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(threadName)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)

if args.debug:
root.setLevel(logging.DEBUG)
handler.setLevel(logging.DEBUG)

mysql_user = args.mysql_user
mysql_password = args.mysql_password

assert check_program_exists("mysqlsh"), "mysqlsh should in the PATH"

# check parameters
if args.mysql_password:
logging.warning("Using password on the command line is not secure, please specify a config file ")
assert args.mysql_user is not None, "--mysql_user must be specified"
else:
config_file = args.defaults_file
(mysql_user, mysql_password) = resolve_credentials_from_config(config_file)

try:
conn = get_mysql_connection(args.mysql_host, mysql_user,
mysql_password, args.mysql_port, args.mysql_database)
tables = get_tables_from_regex(conn, False,
args.mysql_database,
args.include_tables_regex,
exclude_tables_regex=args.exclude_tables_regex,
non_partitioned_tables_only=args.non_partitioned_tables_only)
partitions = get_partitions_from_regex(conn,
args.mysql_database,
args.include_tables_regex,
exclude_tables_regex=args.exclude_tables_regex,
include_partitions_regex=args.include_partitions_regex,
non_partitioned_tables_only=args.non_partitioned_tables_only)


tables_to_dump = []
for table in tables.fetchall():
logging.debug(table['table_name'])
tables_to_dump.append(table['table_name'])

partition_map = {}
for partition in partitions.fetchall():
schema = partition['table_schema']
table = partition['table_name']
partition_name = partition['partition_name']
key = schema+"."+table
if key not in partition_map:
partition_map[key]=[partition_name]
else:
partition_map[key].append(partition_name)
logging.debug(partition_map)
cmd = generate_mysqlsh_command(args.dump_dir,
args.dry_run,
args.mysql_host,
args.mysql_user,
args.mysql_password,
args.mysql_port,
args.defaults_file,
args.mysql_database,
tables_to_dump,
args.data_only,
args.schema_only,
args.where,
partition_map,
args.threads
)
rc = run_command(cmd)
assert rc == "0", "mysqldumper failed, check the log."

except (KeyboardInterrupt, SystemExit):
logging.info("Received interrupt")
os._exit(1)
except Exception as e:
logging.error("Exception in main thread : " + str(e))
logging.error(traceback.format_exc())
sys.exit(1)
logging.debug("Exiting Main Thread")
sys.exit(0)


if __name__ == '__main__':
main()


0 comments on commit 2b6adb7

Please sign in to comment.