From 41bf522b678a92aed6ba6141afb69ce92a05d6a0 Mon Sep 17 00:00:00 2001 From: aaronfriedman Date: Fri, 15 Nov 2024 14:31:06 -0700 Subject: [PATCH 1/7] Update database clients --- CHANGELOG.md | 6 + README.md | 5 +- pyproject.toml | 7 +- src/nypl_py_utils/classes/mysql_client.py | 66 ++++++--- .../classes/postgresql_client.py | 94 +++++++----- .../classes/postgresql_pool_client.py | 137 ------------------ src/nypl_py_utils/classes/redshift_client.py | 66 ++++++--- tests/test_mysql_client.py | 19 ++- tests/test_postgresql_client.py | 37 ++++- tests/test_postgresql_pool_client.py | 121 ---------------- tests/test_redshift_client.py | 21 ++- 11 files changed, 228 insertions(+), 351 deletions(-) delete mode 100644 src/nypl_py_utils/classes/postgresql_pool_client.py delete mode 100644 tests/test_postgresql_pool_client.py diff --git a/CHANGELOG.md b/CHANGELOG.md index bdc66c8..27a04d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,10 @@ # Changelog +## v1.5.0 11/15/24 +- Use executemany instead of execute when appropriate in PostgreSQLClient +- Add capability to retry connecting to a database to the MySQL, PostgreSQL, and Redshift clients +- Automatically close database connection upon error in the MySQL, PostgreSQL, and Redshift clients +- Delete old PostgreSQLPoolClient, which was not production ready + ## v1.4.0 9/23/24 - Added SFTP client diff --git a/README.md b/README.md index f695d8b..deb9698 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,6 @@ This package contains common Python utility classes and functions. * Downloading files from a remote SSH SFTP server * Connecting to and querying a MySQL database * Connecting to and querying a PostgreSQL database -* Connecting to and querying a PostgreSQL database using a connection pool * Connecting to and querying Redshift * Making requests to the Oauth2 authenticated APIs such as NYPL Platform API and Sierra @@ -37,7 +36,7 @@ kinesis_client = KinesisClient(...) # Do not use any version below 1.0.0 # All available optional dependencies can be found in pyproject.toml. # See the "Managing dependencies" section below for more details. -nypl-py-utils[kinesis-client,config-helper]==1.4.0 +nypl-py-utils[kinesis-client,config-helper]==1.5.0 ``` ## Developing locally @@ -63,7 +62,7 @@ The optional dependency sets also give the developer the option to manually list ### Using PostgreSQLClient in an AWS Lambda Because `psycopg` requires a statically linked version of the `libpq` library, the `PostgreSQLClient` cannot be installed as-is in an AWS Lambda function. Instead, it must be packaged as follows: ```bash -pip install --target ./package nypl-py-utils[postgresql-client]==1.4.0 +pip install --target ./package nypl-py-utils[postgresql-client]==1.5.0 pip install \ --platform manylinux2014_x86_64 \ diff --git a/pyproject.toml b/pyproject.toml index ed151cd..fbd2558 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "nypl_py_utils" -version = "1.4.0" +version = "1.5.0" authors = [ { name="Aaron Friedman", email="aaronfriedman@nypl.org" }, ] @@ -45,9 +45,6 @@ oauth2-api-client = [ postgresql-client = [ "psycopg[binary]>=3.1.6" ] -postgresql-pool-client = [ - "psycopg[binary,pool]>=3.1.6" -] redshift-client = [ "botocore>=1.29.5", "redshift-connector>=2.0.909" @@ -74,7 +71,7 @@ research-catalog-identifier-helper = [ "requests>=2.28.1" ] development = [ - "nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,postgresql-pool-client,redshift-client,s3-client,secrets-manager-client,sftp-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]", + "nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,redshift-client,s3-client,secrets-manager-client,sftp-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]", "flake8>=6.0.0", "freezegun>=1.2.2", "mock>=4.0.3", diff --git a/src/nypl_py_utils/classes/mysql_client.py b/src/nypl_py_utils/classes/mysql_client.py index 94bb3c7..a755d5b 100644 --- a/src/nypl_py_utils/classes/mysql_client.py +++ b/src/nypl_py_utils/classes/mysql_client.py @@ -1,4 +1,5 @@ import mysql.connector +import time from nypl_py_utils.functions.log_helper import create_log @@ -15,35 +16,50 @@ def __init__(self, host, port, database, user, password): self.user = user self.password = password - def connect(self, **kwargs): + def connect(self, retry_count=0, backoff_factor=5, **kwargs): """ Connects to a MySQL database using the given credentials. - Keyword args can be passed into the connection to set certain options. - All possible arguments can be found here: - https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html. - - Common arguments include: - autocommit: bool - Whether to automatically commit each query rather than running - them as part of a transaction. By default False. + Parameters + ---------- + retry_count: int, optional + The number of times to retry connecting before throwing an error. + By default no retry occurs. + backoff_factor: int, optional + The backoff factor when retrying. The amount of time to wait before + retrying is backoff_factor ** number_of_retries_made. + kwargs: + All possible arguments can be found here: + https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html """ self.logger.info('Connecting to {} database'.format(self.database)) - try: - self.conn = mysql.connector.connect( - host=self.host, - port=self.port, - database=self.database, - user=self.user, - password=self.password, - **kwargs) - except mysql.connector.Error as e: - self.logger.error( - 'Error connecting to {name} database: {error}'.format( - name=self.database, error=e)) - raise MySQLClientError( - 'Error connecting to {name} database: {error}'.format( - name=self.database, error=e)) from None + attempt_count = 0 + while attempt_count <= retry_count: + try: + try: + self.conn = mysql.connector.connect( + host=self.host, + port=self.port, + database=self.database, + user=self.user, + password=self.password, + **kwargs) + except (mysql.connector.Error): + if attempt_count < retry_count: + self.logger.info('Failed to connect -- retrying') + time.sleep(backoff_factor ** attempt_count) + attempt_count += 1 + else: + raise + else: + break + except Exception as e: + self.logger.error( + 'Error connecting to {name} database: {error}'.format( + name=self.database, error=e)) + raise MySQLClientError( + 'Error connecting to {name} database: {error}'.format( + name=self.database, error=e)) from None def execute_query(self, query, query_params=None, **kwargs): """ @@ -83,6 +99,8 @@ def execute_query(self, query, query_params=None, **kwargs): return cursor.fetchall() except Exception as e: self.conn.rollback() + cursor.close() + self.close_connection() self.logger.error( ('Error executing {name} database query \'{query}\': {error}') .format(name=self.database, query=query, error=e)) diff --git a/src/nypl_py_utils/classes/postgresql_client.py b/src/nypl_py_utils/classes/postgresql_client.py index 05c7a97..82c6b9e 100644 --- a/src/nypl_py_utils/classes/postgresql_client.py +++ b/src/nypl_py_utils/classes/postgresql_client.py @@ -1,4 +1,5 @@ import psycopg +import time from nypl_py_utils.functions.log_helper import create_log @@ -6,43 +7,54 @@ class PostgreSQLClient: """Client for managing individual connections to a PostgreSQL database""" - def __init__(self, host, port, db_name, user, password): + def __init__(self, host, port, database, user, password): self.logger = create_log('postgresql_client') self.conn = None self.conn_info = ('postgresql://{user}:{password}@{host}:{port}/' - '{db_name}').format(user=user, password=password, - host=host, port=port, - db_name=db_name) + '{database}').format(user=user, password=password, + host=host, port=port, + database=database) + self.database = database - self.db_name = db_name - - def connect(self, **kwargs): + def connect(self, retry_count=0, backoff_factor=5, **kwargs): """ Connects to a PostgreSQL database using the given credentials. - Keyword args can be passed into the connection to set certain options. - All possible arguments can be found here: - https://www.psycopg.org/psycopg3/docs/api/connections.html#psycopg.Connection.connect. - - Common arguments include: - autocommit: bool - Whether to automatically commit each query rather than running - them as part of a transaction. By default False. - row_factory: RowFactory - A psycopg RowFactory that determines how the data will be - returned. Defaults to tuple_row, which returns the rows as a - list of tuples. + Parameters + ---------- + retry_count: int, optional + The number of times to retry connecting before throwing an error. + By default no retry occurs. + backoff_factor: int, optional + The backoff factor when retrying. The amount of time to wait before + retrying is backoff_factor ** number_of_retries_made. + kwargs: + All possible arguments (such as the row_factory) can be found here: + https://www.psycopg.org/psycopg3/docs/api/connections.html#psycopg.Connection.connect """ - self.logger.info('Connecting to {} database'.format(self.db_name)) - try: - self.conn = psycopg.connect(self.conn_info, **kwargs) - except psycopg.Error as e: - self.logger.error( - 'Error connecting to {name} database: {error}'.format( - name=self.db_name, error=e)) - raise PostgreSQLClientError( - 'Error connecting to {name} database: {error}'.format( - name=self.db_name, error=e)) from None + self.logger.info('Connecting to {} database'.format(self.database)) + attempt_count = 0 + while attempt_count <= retry_count: + try: + try: + self.conn = psycopg.connect(self.conn_info, **kwargs) + except (psycopg.OperationalError, + psycopg.errors.ConnectionTimeout): + if attempt_count < retry_count: + self.logger.info('Failed to connect -- retrying') + time.sleep(backoff_factor ** attempt_count) + attempt_count += 1 + else: + raise + else: + break + except Exception as e: + self.logger.error( + 'Error connecting to {name} database: {error}'.format( + name=self.database, error=e)) + raise PostgreSQLClientError( + 'Error connecting to {name} database: {error}'.format( + name=self.database, error=e)) from None def execute_query(self, query, query_params=None, **kwargs): """ @@ -53,7 +65,11 @@ def execute_query(self, query, query_params=None, **kwargs): query: str The query to execute query_params: sequence, optional - The values to be used in a parameterized query + The values to be used in a parameterized query. The values can be + for a single insert query -- e.g. execute_query( + "INSERT INTO x VALUES (%s, %s)", (1, "a")) + or for multiple -- e.g execute_transaction( + "INSERT INTO x VALUES (%s, %s)", [(1, "a"), (2, "b")]) kwargs: All possible arguments can be found here: https://www.psycopg.org/psycopg3/docs/api/cursors.html#psycopg.Cursor.execute @@ -65,30 +81,38 @@ def execute_query(self, query, query_params=None, **kwargs): based on the connection's row_factory if there's something to return (even if the result set is empty). """ - self.logger.info('Querying {} database'.format(self.db_name)) + self.logger.info('Querying {} database'.format(self.database)) self.logger.debug('Executing query {}'.format(query)) try: cursor = self.conn.cursor() - cursor.execute(query, query_params, **kwargs) + if query_params is not None and all( + isinstance(param, tuple) or isinstance(param, list) + for param in query_params + ): + cursor.executemany(query, query_params, **kwargs) + else: + cursor.execute(query, query_params, **kwargs) self.conn.commit() return None if cursor.description is None else cursor.fetchall() except Exception as e: self.conn.rollback() + cursor.close() + self.close_connection() self.logger.error( ('Error executing {name} database query \'{query}\': ' '{error}').format( - name=self.db_name, query=query, error=e)) + name=self.database, query=query, error=e)) raise PostgreSQLClientError( ('Error executing {name} database query \'{query}\': ' '{error}').format( - name=self.db_name, query=query, error=e)) from None + name=self.database, query=query, error=e)) from None finally: cursor.close() def close_connection(self): """Closes the database connection""" self.logger.debug('Closing {} database connection'.format( - self.db_name)) + self.database)) self.conn.close() diff --git a/src/nypl_py_utils/classes/postgresql_pool_client.py b/src/nypl_py_utils/classes/postgresql_pool_client.py deleted file mode 100644 index beaf589..0000000 --- a/src/nypl_py_utils/classes/postgresql_pool_client.py +++ /dev/null @@ -1,137 +0,0 @@ -import psycopg - -from nypl_py_utils.functions.log_helper import create_log -from psycopg.rows import tuple_row -from psycopg_pool import ConnectionPool - - -class PostgreSQLPoolClient: - """Client for managing a connection pool to a PostgreSQL database""" - - def __init__(self, host, port, db_name, user, password, conn_timeout=300.0, - **kwargs): - """ - Creates (but does not open) a connection pool. - - Parameters - ---------- - host, port, db_name, user, password: str - Required connection information - kwargs: dict, optional - Keyword args to be passed into the ConnectionPool. All possible - arguments can be found here: - https://www.psycopg.org/psycopg3/docs/api/pool.html#psycopg_pool.ConnectionPool. - - Common arguments include: - min_size/max_size: The minimum and maximum size of the pool, by - default 0 and 1 - max_idle: Half the number of seconds a connection can stay idle - before being automatically closed, by default 90.0, which - corresponds to 3 minutes of idle time. Note that if - min_size is greater than 0, this won't apply to the first - min_size connections, which will stay open until manually - closed. - """ - self.logger = create_log('postgresql_client') - self.conn_info = ('postgresql://{user}:{password}@{host}:{port}/' - '{db_name}').format(user=user, password=password, - host=host, port=port, - db_name=db_name) - - self.db_name = db_name - self.kwargs = kwargs - self.kwargs['min_size'] = kwargs.get('min_size', 0) - self.kwargs['max_size'] = kwargs.get('max_size', 1) - self.kwargs['max_idle'] = kwargs.get('max_idle', 90.0) - - if self.kwargs['max_idle'] > 150.0: - self.logger.error(( - 'max_idle is too high -- values over 150 seconds are unsafe ' - 'and may lead to connection leakages in ECS')) - raise PostgreSQLPoolClientError(( - 'max_idle is too high -- values over 150 seconds are unsafe ' - 'and may lead to connection leakages in ECS')) from None - - self.pool = ConnectionPool(self.conn_info, open=False, **self.kwargs) - - def connect(self, timeout=300.0): - """ - Opens the connection pool and connects to the given PostgreSQL database - min_size number of times - - Parameters - ---------- - conn_timeout: float, optional - The number of seconds to try connecting before throwing an error. - Defaults to 300 seconds. - """ - self.logger.info('Connecting to {} database'.format(self.db_name)) - try: - if self.pool is None: - self.pool = ConnectionPool( - self.conn_info, open=False, **self.kwargs) - self.pool.open(wait=True, timeout=timeout) - except psycopg.Error as e: - self.logger.error( - 'Error connecting to {name} database: {error}'.format( - name=self.db_name, error=e)) - raise PostgreSQLPoolClientError( - 'Error connecting to {name} database: {error}'.format( - name=self.db_name, error=e)) from None - - def execute_query(self, query, query_params=None, row_factory=tuple_row, - **kwargs): - """ - Requests a connection from the pool and uses it to execute an arbitrary - query. After the query is complete, either commits it or rolls it back, - and then returns the connection to the pool. - - Parameters - ---------- - query: str - The query to execute - query_params: sequence, optional - The values to be used in a parameterized query - row_factory: RowFactory, optional - A psycopg RowFactory that determines how the data will be returned. - Defaults to tuple_row, which returns the rows as a list of tuples. - kwargs: - All possible arguments can be found here: - https://www.psycopg.org/psycopg3/docs/api/cursors.html#psycopg.Cursor.execute - - Returns - ------- - None or sequence - None if the cursor has nothing to return. Some type of sequence - based on the row_factory input if there's something to return - (even if the result set is empty). - """ - self.logger.info('Querying {} database'.format(self.db_name)) - self.logger.debug('Executing query {}'.format(query)) - with self.pool.connection() as conn: - try: - conn.row_factory = row_factory - cursor = conn.execute(query, query_params, **kwargs) - return (None if cursor.description is None - else cursor.fetchall()) - except Exception as e: - self.logger.error( - ('Error executing {name} database query \'{query}\': ' - '{error}').format( - name=self.db_name, query=query, error=e)) - raise PostgreSQLPoolClientError( - ('Error executing {name} database query \'{query}\': ' - '{error}').format( - name=self.db_name, query=query, error=e)) from None - - def close_pool(self): - """Closes the connection pool""" - self.logger.debug('Closing {} database connection pool'.format( - self.db_name)) - self.pool.close() - self.pool = None - - -class PostgreSQLPoolClientError(Exception): - def __init__(self, message=None): - self.message = message diff --git a/src/nypl_py_utils/classes/redshift_client.py b/src/nypl_py_utils/classes/redshift_client.py index 17c4558..9f594ef 100644 --- a/src/nypl_py_utils/classes/redshift_client.py +++ b/src/nypl_py_utils/classes/redshift_client.py @@ -1,6 +1,6 @@ import redshift_connector +import time -from botocore.exceptions import ClientError from nypl_py_utils.functions.log_helper import create_log @@ -15,23 +15,46 @@ def __init__(self, host, database, user, password): self.user = user self.password = password - def connect(self): - """Connects to a Redshift database using the given credentials""" + def connect(self, retry_count=0, backoff_factor=5): + """ + Connects to a Redshift database using the given credentials. + + Parameters + ---------- + retry_count: int, optional + The number of times to retry connecting before throwing an error. + By default no retry occurs. + backoff_factor: int, optional + The backoff factor when retrying. The amount of time to wait before + retrying is backoff_factor ** number_of_retries_made. + """ self.logger.info('Connecting to {} database'.format(self.database)) - try: - self.conn = redshift_connector.connect( - host=self.host, - database=self.database, - user=self.user, - password=self.password, - sslmode='verify-full') - except ClientError as e: - self.logger.error( - 'Error connecting to {name} database: {error}'.format( - name=self.database, error=e)) - raise RedshiftClientError( - 'Error connecting to {name} database: {error}'.format( - name=self.database, error=e)) from None + attempt_count = 0 + while attempt_count <= retry_count: + try: + try: + self.conn = redshift_connector.connect( + host=self.host, + database=self.database, + user=self.user, + password=self.password, + sslmode='verify-full') + except (redshift_connector.InterfaceError): + if attempt_count < retry_count: + self.logger.info('Failed to connect -- retrying') + time.sleep(backoff_factor ** attempt_count) + attempt_count += 1 + else: + raise + else: + break + except Exception as e: + self.logger.error( + 'Error connecting to {name} database: {error}'.format( + name=self.database, error=e)) + raise RedshiftClientError( + 'Error connecting to {name} database: {error}'.format( + name=self.database, error=e)) from None def execute_query(self, query, dataframe=False): """ @@ -62,6 +85,8 @@ def execute_query(self, query, dataframe=False): return cursor.fetchall() except Exception as e: self.conn.rollback() + cursor.close() + self.close_connection() self.logger.error( ('Error executing {name} database query \'{query}\': {error}') .format(name=self.database, query=query, error=e)) @@ -83,10 +108,9 @@ def execute_transaction(self, queries): A list of tuples containing a query and the values to be used if the query is parameterized (or None if it's not). The values can be for a single insert query -- e.g. execute_transaction( - "INSERT INTO x VALUES (%s, %s)", (1, "a")) + [("INSERT INTO x VALUES (%s, %s)", (1, "a"))]) or for multiple -- e.g execute_transaction( - "INSERT INTO x VALUES (%s, %s)", [(1, "a"), (2, "b")]) - + [("INSERT INTO x VALUES (%s, %s)", [(1, "a"), (2, "b")])]) """ self.logger.info('Executing transaction against {} database'.format( self.database)) @@ -106,6 +130,8 @@ def execute_transaction(self, queries): self.conn.commit() except Exception as e: self.conn.rollback() + cursor.close() + self.close_connection() self.logger.error( ('Error executing {name} database transaction: {error}') .format(name=self.database, error=e)) diff --git a/tests/test_mysql_client.py b/tests/test_mysql_client.py index a1f8a87..39508f2 100644 --- a/tests/test_mysql_client.py +++ b/tests/test_mysql_client.py @@ -1,3 +1,4 @@ +import mysql.connector import pytest from nypl_py_utils.classes.mysql_client import MySQLClient, MySQLClientError @@ -22,6 +23,21 @@ def test_connect(self, mock_mysql_conn, test_instance): user='test_user', password='test_password') + def test_connect_retry_success(self, mock_mysql_conn, test_instance, + mocker): + mock_mysql_conn.side_effect = [mysql.connector.Error, + mocker.MagicMock()] + test_instance.connect(retry_count=2, backoff_factor=0) + assert mock_mysql_conn.call_count == 2 + + def test_connect_retry_fail(self, mock_mysql_conn, test_instance): + mock_mysql_conn.side_effect = mysql.connector.Error + + with pytest.raises(MySQLClientError): + test_instance.connect(retry_count=2, backoff_factor=0) + + assert mock_mysql_conn.call_count == 3 + def test_execute_read_query(self, mock_mysql_conn, test_instance, mocker): test_instance.connect() @@ -75,7 +91,8 @@ def test_execute_query_with_exception( test_instance.execute_query('test query') test_instance.conn.rollback.assert_called_once() - mock_cursor.close.assert_called_once() + mock_cursor.close.assert_called() + test_instance.conn.close.assert_called_once() def test_close_connection(self, mock_mysql_conn, test_instance): test_instance.connect() diff --git a/tests/test_postgresql_client.py b/tests/test_postgresql_client.py index 99e5042..af93625 100644 --- a/tests/test_postgresql_client.py +++ b/tests/test_postgresql_client.py @@ -2,6 +2,7 @@ from nypl_py_utils.classes.postgresql_client import ( PostgreSQLClient, PostgreSQLClientError) +from psycopg import OperationalError class TestPostgreSQLClient: @@ -12,14 +13,27 @@ def mock_pg_conn(self, mocker): @pytest.fixture def test_instance(self): - return PostgreSQLClient('test_host', 'test_port', 'test_db_name', + return PostgreSQLClient('test_host', 'test_port', 'test_database', 'test_user', 'test_password') def test_connect(self, mock_pg_conn, test_instance): test_instance.connect() mock_pg_conn.assert_called_once_with( 'postgresql://test_user:test_password@test_host:test_port/' + - 'test_db_name') + 'test_database') + + def test_connect_retry_success(self, mock_pg_conn, test_instance, mocker): + mock_pg_conn.side_effect = [OperationalError(), mocker.MagicMock()] + test_instance.connect(retry_count=2, backoff_factor=0) + assert mock_pg_conn.call_count == 2 + + def test_connect_retry_fail(self, mock_pg_conn, test_instance): + mock_pg_conn.side_effect = OperationalError() + + with pytest.raises(PostgreSQLClientError): + test_instance.connect(retry_count=2, backoff_factor=0) + + assert mock_pg_conn.call_count == 3 def test_execute_read_query(self, mock_pg_conn, test_instance, mocker): test_instance.connect() @@ -63,6 +77,22 @@ def test_execute_write_query_with_params(self, mock_pg_conn, test_instance, test_instance.conn.commit.assert_called_once() mock_cursor.close.assert_called_once() + def test_execute_write_query_with_many_params( + self, mock_pg_conn, test_instance, mocker): + test_instance.connect() + + mock_cursor = mocker.MagicMock() + mock_cursor.description = None + test_instance.conn.cursor.return_value = mock_cursor + + assert test_instance.execute_query( + 'test query %s %s', query_params=[('a', 1), ('b', None), (None, 2)] + ) is None + mock_cursor.executemany.assert_called_once_with( + 'test query %s %s', [('a', 1), ('b', None), (None, 2)]) + test_instance.conn.commit.assert_called_once() + mock_cursor.close.assert_called_once() + def test_execute_query_with_exception( self, mock_pg_conn, test_instance, mocker): test_instance.connect() @@ -75,7 +105,8 @@ def test_execute_query_with_exception( test_instance.execute_query('test query') test_instance.conn.rollback.assert_called_once() - mock_cursor.close.assert_called_once() + mock_cursor.close.assert_called() + test_instance.conn.close.assert_called_once() def test_close_connection(self, mock_pg_conn, test_instance): test_instance.connect() diff --git a/tests/test_postgresql_pool_client.py b/tests/test_postgresql_pool_client.py deleted file mode 100644 index 82f22b6..0000000 --- a/tests/test_postgresql_pool_client.py +++ /dev/null @@ -1,121 +0,0 @@ -import pytest - -from nypl_py_utils.classes.postgresql_pool_client import ( - PostgreSQLPoolClient, PostgreSQLPoolClientError) -from psycopg import Error - - -class TestPostgreSQLPoolClient: - - @pytest.fixture - def test_instance(self, mocker): - mocker.patch('psycopg_pool.ConnectionPool.open') - mocker.patch('psycopg_pool.ConnectionPool.close') - return PostgreSQLPoolClient('test_host', 'test_port', 'test_db_name', - 'test_user', 'test_password') - - def test_init(self, test_instance): - assert test_instance.pool.conninfo == ( - 'postgresql://test_user:test_password@test_host:test_port/' + - 'test_db_name') - assert test_instance.pool._opened is False - assert test_instance.pool.min_size == 0 - assert test_instance.pool.max_size == 1 - - def test_init_with_long_max_idle(self): - with pytest.raises(PostgreSQLPoolClientError): - PostgreSQLPoolClient( - 'test_host', 'test_port', 'test_db_name', 'test_user', - 'test_password', max_idle=300.0) - - def test_connect(self, test_instance): - test_instance.connect() - test_instance.pool.open.assert_called_once_with(wait=True, - timeout=300.0) - - def test_connect_with_exception(self, mocker): - mocker.patch('psycopg_pool.ConnectionPool.open', - side_effect=Error()) - - test_instance = PostgreSQLPoolClient( - 'test_host', 'test_port', 'test_db_name', 'test_user', - 'test_password') - - with pytest.raises(PostgreSQLPoolClientError): - test_instance.connect(timeout=1.0) - - def test_execute_read_query(self, test_instance, mocker): - test_instance.connect() - - mock_cursor = mocker.MagicMock() - mock_cursor.description = [('description', None, None)] - mock_cursor.fetchall.return_value = [(1, 2, 3), ('a', 'b', 'c')] - mock_conn = mocker.MagicMock() - mock_conn.execute.return_value = mock_cursor - mock_conn_context = mocker.MagicMock() - mock_conn_context.__enter__.return_value = mock_conn - mocker.patch('psycopg_pool.ConnectionPool.connection', - return_value=mock_conn_context) - - assert test_instance.execute_query( - 'test query') == [(1, 2, 3), ('a', 'b', 'c')] - mock_conn.execute.assert_called_once_with('test query', None) - mock_cursor.fetchall.assert_called_once() - - def test_execute_write_query(self, test_instance, mocker): - test_instance.connect() - - mock_cursor = mocker.MagicMock() - mock_cursor.description = None - mock_conn = mocker.MagicMock() - mock_conn.execute.return_value = mock_cursor - mock_conn_context = mocker.MagicMock() - mock_conn_context.__enter__.return_value = mock_conn - mocker.patch('psycopg_pool.ConnectionPool.connection', - return_value=mock_conn_context) - - assert test_instance.execute_query('test query') is None - mock_conn.execute.assert_called_once_with('test query', None) - - def test_execute_write_query_with_params(self, test_instance, mocker): - test_instance.connect() - - mock_cursor = mocker.MagicMock() - mock_cursor.description = None - mock_conn = mocker.MagicMock() - mock_conn.execute.return_value = mock_cursor - mock_conn_context = mocker.MagicMock() - mock_conn_context.__enter__.return_value = mock_conn - mocker.patch('psycopg_pool.ConnectionPool.connection', - return_value=mock_conn_context) - - assert test_instance.execute_query( - 'test query %s %s', query_params=('a', 1)) is None - mock_conn.execute.assert_called_once_with('test query %s %s', - ('a', 1)) - - def test_execute_query_with_exception(self, test_instance, mocker): - test_instance.connect() - - mock_conn = mocker.MagicMock() - mock_conn.execute.side_effect = Exception() - mock_conn_context = mocker.MagicMock() - mock_conn_context.__enter__.return_value = mock_conn - mocker.patch('psycopg_pool.ConnectionPool.connection', - return_value=mock_conn_context) - - with pytest.raises(PostgreSQLPoolClientError): - test_instance.execute_query('test query') - - def test_close_pool(self, test_instance): - test_instance.connect() - test_instance.close_pool() - assert test_instance.pool is None - - def test_reopen_pool(self, test_instance, mocker): - test_instance.connect() - test_instance.close_pool() - test_instance.connect() - test_instance.pool.open.assert_has_calls([ - mocker.call(wait=True, timeout=300), - mocker.call(wait=True, timeout=300)]) diff --git a/tests/test_redshift_client.py b/tests/test_redshift_client.py index 7d6219d..e33024e 100644 --- a/tests/test_redshift_client.py +++ b/tests/test_redshift_client.py @@ -2,6 +2,7 @@ from nypl_py_utils.classes.redshift_client import ( RedshiftClient, RedshiftClientError) +from redshift_connector import InterfaceError class TestRedshiftClient: @@ -23,6 +24,20 @@ def test_connect(self, mock_redshift_conn, test_instance): password='test_password', sslmode='verify-full') + def test_connect_retry_success(self, mock_redshift_conn, test_instance, + mocker): + mock_redshift_conn.side_effect = [InterfaceError(), mocker.MagicMock()] + test_instance.connect(retry_count=2, backoff_factor=0) + assert mock_redshift_conn.call_count == 2 + + def test_connect_retry_fail(self, mock_redshift_conn, test_instance): + mock_redshift_conn.side_effect = InterfaceError() + + with pytest.raises(RedshiftClientError): + test_instance.connect(retry_count=2, backoff_factor=0) + + assert mock_redshift_conn.call_count == 3 + def test_execute_query(self, mock_redshift_conn, test_instance, mocker): test_instance.connect() @@ -60,7 +75,8 @@ def test_execute_query_with_exception( test_instance.execute_query('test query') test_instance.conn.rollback.assert_called_once() - mock_cursor.close.assert_called_once() + mock_cursor.close.assert_called() + test_instance.conn.close.assert_called_once() def test_execute_transaction(self, mock_redshift_conn, test_instance, mocker): @@ -119,7 +135,8 @@ def test_execute_transaction_with_exception( mocker.call('query 2', None)]) test_instance.conn.commit.assert_not_called() test_instance.conn.rollback.assert_called_once() - mock_cursor.close.assert_called_once() + mock_cursor.close.assert_called() + test_instance.conn.close.assert_called_once() def test_close_connection(self, mock_redshift_conn, test_instance): test_instance.connect() From efc3ec475ab7295e3051af980ebc3c4257e99425 Mon Sep 17 00:00:00 2001 From: aaronfriedman Date: Wed, 20 Nov 2024 14:51:11 -0700 Subject: [PATCH 2/7] Add patron data helper --- Makefile | 2 +- pyproject.toml | 6 +- .../functions/patron_data_helper.py | 238 ++++++++++++++ tests/test_patron_data_helper.py | 293 ++++++++++++++++++ 4 files changed, 537 insertions(+), 2 deletions(-) create mode 100644 src/nypl_py_utils/functions/patron_data_helper.py create mode 100644 tests/test_patron_data_helper.py diff --git a/Makefile b/Makefile index 44bc47e..ba5a846 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ help: @echo " lint project files using the flake8 linter" test: - pytest + pytest -W ignore::FutureWarning lint: flake8 --exclude *env \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index fbd2558..2a9385e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,11 +67,15 @@ config-helper = [ obfuscation-helper = [ "bcrypt>=4.0.1" ] +patron-data-helper = [ + "nypl_py_utils[postgresql-client,redshift-client]>=1.5.0", + "pandas>=2.2.2" +] research-catalog-identifier-helper = [ "requests>=2.28.1" ] development = [ - "nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,redshift-client,s3-client,secrets-manager-client,sftp-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]", + "nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,redshift-client,s3-client,secrets-manager-client,sftp-client,config-helper,obfuscation-helper,patron-data-helper,research-catalog-identifier-helper]", "flake8>=6.0.0", "freezegun>=1.2.2", "mock>=4.0.3", diff --git a/src/nypl_py_utils/functions/patron_data_helper.py b/src/nypl_py_utils/functions/patron_data_helper.py new file mode 100644 index 0000000..b97b2a1 --- /dev/null +++ b/src/nypl_py_utils/functions/patron_data_helper.py @@ -0,0 +1,238 @@ +import pandas as pd + +from nypl_py_utils.functions.log_helper import create_log + +logger = create_log("patron_data_helpers") + +_REDSHIFT_QUERY = """ + SELECT patron_id, postal_code, geoid + FROM {table} + WHERE patron_id IN ({ids});""" + +_SIERRA_BARCODES_TO_IDS_QUERY = """ + SELECT index_tag || index_entry, record_id + FROM sierra_view.phrase_entry + WHERE index_tag || index_entry IN ({});""" + +_SIERRA_PATRON_DATA_QUERY = """ + SELECT id, barcode, ptype_code, pcode3, + CASE WHEN LENGTH(TRIM(home_library_code)) = 0 + OR TRIM(home_library_code) = 'none' THEN NULL + ELSE TRIM(home_library_code) END + FROM sierra_view.patron_view + WHERE id IN ({});""" + + +def barcodes_to_patron_ids(sierra_client, barcodes, isolate_connection=True, + remove_duplicates=True): + """ + Converts barcodes into Sierra patron ids + + Parameters + ---------- + sierra_client: PostgreSQLClient + The client with which to query Sierra + barcodes: sequence of strings + The sequence of barcodes to be mapped. Must be iterable and without + 'None' entries. Each barcode is expected to be a string without a + prepending 'b' character. + isolate_connection: bool, optional + Whether the database connection should be opened and closed within this + method or whether it will be handled by the user + remove_duplicates: bool, optional + Whether barcodes that map to multiple patron ids should be removed + + Returns + ------- + DataFrame + A pandas DataFrame with 'barcode' and 'patron_id' columns. The + 'patron_id' column is set to be a string. + """ + unique_barcodes = set(barcodes) + if unique_barcodes: + logger.info(f"Mapping ({len(unique_barcodes)}) barcodes to patron ids") + barcodes_str = "'b" + "','b".join(unique_barcodes) + "'" + if isolate_connection: + sierra_client.connect() + raw_data = sierra_client.execute_query( + _SIERRA_BARCODES_TO_IDS_QUERY.format(barcodes_str)) + if isolate_connection: + sierra_client.close_connection() + else: + logger.info("No barcodes given with which to query Sierra") + raw_data = [] + + df = pd.DataFrame(raw_data, columns=["barcode", "patron_id"]) + df = df[pd.notnull(df[["barcode", "patron_id"]]).all(axis=1)] + df["barcode"] = df["barcode"].str.lstrip("b") + df["patron_id"] = df["patron_id"].astype("Int64").astype("string") + df = df.drop_duplicates() + if remove_duplicates: + return df.drop_duplicates("barcode", keep=False) + else: + return df + + +def get_sierra_patron_data_from_ids(sierra_client, patron_ids, + isolate_connection=True, + remove_duplicates=False): + """ + Given Sierra patron ids, returns standard patron fields from Sierra + + Parameters + ---------- + sierra_client: PostgreSQLClient + The client with which to query Sierra + patron_ids: sequence of strings + The sequence of patron ids to be fetched. Must be iterable and without + 'None' entries. Each patron id is expected to be a string. + isolate_connection: bool, optional + Whether the database connection should be opened and closed within this + method or whether it will be handled by the user + remove_duplicates: bool, optional + Whether patron ids that map to multiple rows with different values + should be removed + + Returns + ------- + DataFrame + A pandas DataFrame with standard patron columns. The 'patron_id' column + is set to be a string. + """ + unique_patron_ids = set(patron_ids) + if unique_patron_ids: + logger.info( + f"Fetching Sierra patron data for ({len(unique_patron_ids)}) " + "patrons") + patron_ids_str = ",".join(unique_patron_ids) + if isolate_connection: + sierra_client.connect() + raw_data = sierra_client.execute_query( + _SIERRA_PATRON_DATA_QUERY.format(patron_ids_str)) + if isolate_connection: + sierra_client.close_connection() + else: + logger.info("No patron ids given with which to query Sierra") + raw_data = [] + + df = pd.DataFrame(raw_data, columns=[ + "patron_id", "barcode", "ptype_code", "pcode3", + "patron_home_library_code"]) + df = df[pd.notnull(df["patron_id"])] + df["patron_id"] = df["patron_id"].astype("Int64").astype("string") + if remove_duplicates: + # If one patron id maps to two rows that are identical except for the + # barcode, arbitrarily delete one of the rows + df = df.drop_duplicates( + ["patron_id", "ptype_code", "pcode3", "patron_home_library_code"]) + return df.drop_duplicates("patron_id", keep=False) + else: + return df.drop_duplicates() + + +def get_sierra_patron_data_from_barcodes(sierra_client, barcodes, + isolate_connection=True): + """ + Given barcodes, returns standard patron fields from Sierra. One row per + barcode is returned for all barcodes found in Sierra. + + Parameters + ---------- + sierra_client: PostgreSQLClient + The client with which to query Sierra + barcodes: sequence of strings + The sequence of barcodes to be mapped. Must be iterable and without + 'None' entries. Each barcode is expected to be a string without a + prepending 'b' character. + isolate_connection: bool, optional + Whether the database connection should be opened and closed within this + method or whether it will be handled by the user + + Returns + ------- + DataFrame + A pandas DataFrame with barcodes plus the standard patron columns. The + 'patron_id' column is set to be a string. + """ + if isolate_connection: + sierra_client.connect() + barcode_patron_id_df = barcodes_to_patron_ids( + sierra_client, barcodes, False, True) + patron_data_df = get_sierra_patron_data_from_ids( + sierra_client, barcode_patron_id_df["patron_id"], False, False) + if isolate_connection: + sierra_client.close_connection() + + # If one patron id maps to two rows that are identical except for the + # barcode, arbitrarily delete one of the rows + patron_data_df = patron_data_df.drop_duplicates( + ["patron_id", "ptype_code", "pcode3", "patron_home_library_code"]) + + # Prefer matches where both the barcode and the patron id match. Otherwise, + # accept matches where only the patron id matches. If more than one match + # is found, use none of them and NULL out the patron fields. + df = barcode_patron_id_df.merge( + patron_data_df, how="left", on=["patron_id", "barcode"], + indicator=True + ) + perfect_match_df = df[df["_merge"] == "both"].drop(columns=["_merge"]) + imperfect_match_df = df[["barcode", "patron_id"]].drop( + perfect_match_df.index).merge(patron_data_df.drop(columns=["barcode"]), + how="left", on="patron_id") + df = pd.concat([perfect_match_df, imperfect_match_df], ignore_index=True) + df.loc[df.duplicated("barcode", keep=False), [ + "ptype_code", "pcode3", "patron_home_library_code"]] = None + return df.drop_duplicates("barcode") + + +def get_redshift_patron_data(redshift_client, obfuscated_patron_ids, + isolate_connection=True): + """ + Given obfuscated patron ids, returns postal code and geoid from Redshift. + One row per patron id is returned for all patron ids found in Redshift. + + Parameters + ---------- + redshift_client: RedshiftClient + The client with which to query Redshift + obfuscated_patron_ids: sequence of strings + The sequence of patron ids to be mapped. Must be iterable and without + 'None' entries. Each patron id is expected to have been obfuscated. + isolate_connection: bool, optional + Whether the database connection should be opened and closed within this + method or whether it will be handled by the user + + Returns + ------- + DataFrame + A pandas DataFrame with 'patron_id', 'postal_code', and 'geoid' columns + """ + unique_patron_ids = set(obfuscated_patron_ids) + if unique_patron_ids: + logger.info(f"Querying Redshift for ({len(unique_patron_ids)}) " + "patrons") + redshift_table = "patron_info" + if redshift_client.database != "production": + redshift_table += "_" + redshift_client.database + patron_ids_str = "'" + "','".join(unique_patron_ids) + "'" + + if isolate_connection: + redshift_client.connect() + raw_data = redshift_client.execute_query( + _REDSHIFT_QUERY.format(table=redshift_table, ids=patron_ids_str)) + if isolate_connection: + redshift_client.close_connection() + else: + logger.info("No patron ids given with which to query Redshift") + raw_data = [] + + df = pd.DataFrame(raw_data, columns=["patron_id", "postal_code", "geoid"]) + df = df[pd.notnull(df["patron_id"])] + if not df["patron_id"].is_unique: + duplicates = df.loc[df.duplicated("patron_id"), "patron_id"] + logger.warning( + "More than one Redshift row found for the following patron ids: " + f"{', '.join(duplicates)}") + return df.drop_duplicates("patron_id", keep=False) + else: + return df diff --git a/tests/test_patron_data_helper.py b/tests/test_patron_data_helper.py new file mode 100644 index 0000000..78287a5 --- /dev/null +++ b/tests/test_patron_data_helper.py @@ -0,0 +1,293 @@ +import logging +import pandas as pd + +from nypl_py_utils.functions.patron_data_helper import ( + barcodes_to_patron_ids, + get_redshift_patron_data, + get_sierra_patron_data_from_barcodes, + get_sierra_patron_data_from_ids) +from pandas.testing import assert_frame_equal, assert_series_equal + +_TEST_REDSHIFT_RESPONSE = [ + ["obf1", "1"*5, "1"*11], ["obf2", None, "2"*11], ["obf3", "3"*5, None], + [None, "4"*5, "4"*11], +] + +_TEST_SIERRA_BARCODE_RESPONSE = [ + ("b1", 1), ("b2", 2), ("b3", 3), ("b33", 3), ("b4", 4), ("b4", 44), + ("b5", None), (None, 5), +] + +_TEST_SIERRA_IDS_RESPONSE = [ + (1, "b1", 11, 12, "aa"), (2, "b2", 21, 22, "bb"), + (3, "b3", 31, 32, "cc"), (33, "b3", 331, 332, "ccc"), + (4, None, None, None, None), (5, "b5", 51, 52, "dd"), + (6, "b6", 61, 62, "ee"), (6, "b66", 61, 62, "ee"), + (7, "b7", 71, 72, "ff"), (7, "b77", 771, 772, "ffff"), + (None, "b4", None, None, None), (5, "b5", 51, 52, "dd"), +] + +_TEST_BARCODE_DF = pd.DataFrame( + [[f"b{i}", str(i)] for i in range(1, 10)], + columns=["barcode", "patron_id"]) +_TEST_BARCODE_DF["patron_id"] = _TEST_BARCODE_DF["patron_id"].astype("string") + +_TEST_ID_DF = pd.DataFrame( + [["1", "b1", 11, 12, "aa"], # one perfect match + ["2", "b5", 21, 22, "bb"], # different id and barcode matches + # no match for patron id 3 + ["4", "b4", 41, 42, "dd"], # two matches -- one perfect, one imperfect + ["4", "b444", 43, 44, "dddd"], + ["5", "b555", 51, 52, "eeee"], # two matches -- both imperfect + ["5", "b556", 53, 54, "eeef"], + ["6", "b6", 61, 62, "ffff"], # two matches -- both perfect + ["6", "b6", 63, 64, "fffg"], + ["7", "b777", 71, 72, "gg"], # two matches -- same except barcode + ["7", "b778", 71, 72, "gg"], + ["8", None, 81, 82, "hh"], # one imperfect match/no barcode + ["9", "b9", None, None, None]], # one perfect match/all null fields + columns=["patron_id", "barcode", "ptype_code", "pcode3", + "patron_home_library_code"]) +_TEST_ID_DF["patron_id"] = _TEST_ID_DF["patron_id"].astype("string") + + +class TestPatronDataHelper: + + def test_barcodes_to_patron_ids(self, mocker): + RESULT = pd.DataFrame( + [["1", "1"], ["2", "2"], ["3", "3"], ["33", "3"]], + columns=["barcode", "patron_id"]) + RESULT["patron_id"] = RESULT["patron_id"].astype("string") + + mock_sierra_client = mocker.MagicMock() + mock_sierra_client.execute_query.return_value = \ + _TEST_SIERRA_BARCODE_RESPONSE + + assert_frame_equal( + RESULT, barcodes_to_patron_ids( + mock_sierra_client, [str(el) for el in range(1, 8)] + ["1",] + )) + + mock_sierra_client.connect.assert_called_once() + mock_sierra_client.execute_query.assert_called_once() + mock_sierra_client.close_connection.assert_called_once() + + # Because the set of barcodes is unordered, it can't be tested + # directly. The workaround is to test the total length of the query + # plus that each barcode appears in it. + query = mock_sierra_client.execute_query.call_args[0][0] + assert len(query) == 157 + for el in range(1, 8): + assert f"'b{el}'" in query + + def test_barcodes_to_patron_ids_unisolated(self, mocker): + mock_sierra_client = mocker.MagicMock() + mock_sierra_client.execute_query.return_value = [] + + barcodes_to_patron_ids(mock_sierra_client, ["1",], + isolate_connection=False) + + mock_sierra_client.connect.assert_not_called() + mock_sierra_client.execute_query.assert_called_once() + mock_sierra_client.close_connection.assert_not_called() + + def test_barcodes_to_patron_ids_with_duplicates(self, mocker): + RESULT = pd.DataFrame( + [["1", "1"], ["2", "2"], ["3", "3"], + ["33", "3"], ["4", "4"], ["4", "44"]], + columns=["barcode", "patron_id"]) + RESULT["patron_id"] = RESULT["patron_id"].astype("string") + + mock_sierra_client = mocker.MagicMock() + mock_sierra_client.execute_query.return_value = \ + _TEST_SIERRA_BARCODE_RESPONSE + + assert_frame_equal( + RESULT, barcodes_to_patron_ids( + mock_sierra_client, [str(el) for el in range(1, 7)], + remove_duplicates=False + )) + + def test_get_sierra_patron_data_from_ids(self, mocker): + RESULT = pd.DataFrame( + [["1", "b1", 11, 12, "aa"], ["2", "b2", 21, 22, "bb"], + ["3", "b3", 31, 32, "cc"], ["33", "b3", 331, 332, "ccc"], + ["4", None, None, None, None], + ["5", "b5", 51, 52, "dd"], ["6", "b6", 61, 62, "ee"], + ["6", "b66", 61, 62, "ee"], ["7", "b7", 71, 72, "ff"], + ["7", "b77", 771, 772, "ffff"]], + columns=["patron_id", "barcode", "ptype_code", "pcode3", + "patron_home_library_code"]) + RESULT["patron_id"] = RESULT["patron_id"].astype("string") + + mock_sierra_client = mocker.MagicMock() + mock_sierra_client.execute_query.return_value = \ + _TEST_SIERRA_IDS_RESPONSE + + assert_frame_equal( + RESULT, get_sierra_patron_data_from_ids( + mock_sierra_client, [str(el) for el in range(1, 9)] + ["1",] + )) + + mock_sierra_client.connect.assert_called_once() + mock_sierra_client.execute_query.assert_called_once() + mock_sierra_client.close_connection.assert_called_once() + + # Because the set of patron ids is unordered, it can't be tested + # directly. The workaround is to test the total length of the query + # plus that each id appears in it. + query = mock_sierra_client.execute_query.call_args[0][0] + assert len(query) == 257 + for el in range(1, 9): + assert str(el) in query + + def test_get_sierra_patron_data_from_ids_unisolated(self, mocker): + mock_sierra_client = mocker.MagicMock() + mock_sierra_client.execute_query.return_value = [] + + get_sierra_patron_data_from_ids(mock_sierra_client, ["1",], + isolate_connection=False) + + mock_sierra_client.connect.assert_not_called() + mock_sierra_client.execute_query.assert_called_once() + mock_sierra_client.close_connection.assert_not_called() + + def test_get_sierra_patron_data_from_ids_without_duplicates(self, mocker): + RESULT = pd.DataFrame( + [["1", "b1", 11, 12, "aa"], ["2", "b2", 21, 22, "bb"], + ["3", "b3", 31, 32, "cc"], ["33", "b3", 331, 332, "ccc"], + ["4", None, None, None, None], ["5", "b5", 51, 52, "dd"], + ["6", "b6", 61, 62, "ee"]], + columns=["patron_id", "barcode", "ptype_code", "pcode3", + "patron_home_library_code"]) + RESULT["patron_id"] = RESULT["patron_id"].astype("string") + + mock_sierra_client = mocker.MagicMock() + mock_sierra_client.execute_query.return_value = \ + _TEST_SIERRA_IDS_RESPONSE + + assert_frame_equal( + RESULT, get_sierra_patron_data_from_ids( + mock_sierra_client, [str(el) for el in range(1, 9)], + remove_duplicates=True + )) + + def test_get_sierra_patron_data_from_barcodes(self, mocker): + RESULT = pd.DataFrame( + [["b1", "1", 11, 12, "aa"], + ["b4", "4", 41, 42, "dd"], + ["b6", "6", None, None, None], + ["b9", "9", None, None, None], + ["b2", "2", 21, 22, "bb"], + ["b3", "3", None, None, None], + ["b5", "5", None, None, None], + ["b7", "7", 71, 72, "gg"], + ["b8", "8", 81, 82, "hh"]], + columns=["barcode", "patron_id", "ptype_code", "pcode3", + "patron_home_library_code"]) + RESULT["patron_id"] = RESULT["patron_id"].astype("string") + TEST_BARCODES = [f"b{i}" for i in range(1, 11)] + ["b1",] + TEST_IDS = pd.Series([str(i) for i in range(1, 10)], + dtype="string", name="patron_id") + mocked_barcodes_method = mocker.patch( + "nypl_py_utils.functions.patron_data_helper.barcodes_to_patron_ids", # noqa: E501 + return_value=_TEST_BARCODE_DF) + mocked_ids_method = mocker.patch( + "nypl_py_utils.functions.patron_data_helper.get_sierra_patron_data_from_ids", # noqa: E501 + return_value=_TEST_ID_DF) + mock_sierra_client = mocker.MagicMock() + + assert_frame_equal( + RESULT, + get_sierra_patron_data_from_barcodes( + mock_sierra_client, TEST_BARCODES).reset_index(drop=True), + check_like=True) + + mock_sierra_client.connect.assert_called_once() + mock_sierra_client.close_connection.assert_called_once() + + mocked_barcodes_method.assert_called_once_with( + mock_sierra_client, TEST_BARCODES, False, True) + assert mocked_ids_method.call_args[0][0] == mock_sierra_client + assert_series_equal(mocked_ids_method.call_args[0][1], TEST_IDS) + assert mocked_ids_method.call_args[0][2] is False + assert mocked_ids_method.call_args[0][3] is False + + def test_get_sierra_patron_data_from_barcodes_unisolated(self, mocker): + mocker.patch( + "nypl_py_utils.functions.patron_data_helper.barcodes_to_patron_ids", # noqa: E501 + return_value=pd.DataFrame([], columns=["barcode", "patron_id"])) + mocker.patch( + "nypl_py_utils.functions.patron_data_helper.get_sierra_patron_data_from_ids", # noqa: E501 + return_value=pd.DataFrame( + [], columns=["patron_id", "barcode", "ptype_code", "pcode3", + "patron_home_library_code"])) + mock_sierra_client = mocker.MagicMock() + + get_sierra_patron_data_from_barcodes( + mock_sierra_client, ["1",], isolate_connection=False) + + mock_sierra_client.connect.assert_not_called() + mock_sierra_client.close_connection.assert_not_called() + + def test_get_redshift_patron_data(self, mocker, caplog): + RESULT = pd.DataFrame( + [["obf1", "1"*5, "1"*11], ["obf2", None, "2"*11], + ["obf3", "3"*5, None]], + columns=["patron_id", "postal_code", "geoid"]) + + mock_redshift_client = mocker.MagicMock() + mock_redshift_client.execute_query.return_value = \ + _TEST_REDSHIFT_RESPONSE + + with caplog.at_level(logging.WARNING): + assert_frame_equal( + RESULT, get_redshift_patron_data( + mock_redshift_client, + ["obf1", "obf2", "obf3", "obf4", "obf1"] + )) + + assert caplog.text == "" + mock_redshift_client.connect.assert_called_once() + mock_redshift_client.execute_query.assert_called_once() + mock_redshift_client.close_connection.assert_called_once() + + # Because the set of patron ids is unordered, it can't be tested + # directly. The workaround is to test the total length of the query + # plus that each id appears in it. + query = mock_redshift_client.execute_query.call_args[0][0] + assert len(query) == 175 + for el in ["'obf1'", "'obf2'", "'obf3'", "'obf4'"]: + assert el in query + + def test_get_redshift_patron_data_unisolated(self, mocker): + mock_redshift_client = mocker.MagicMock() + mock_redshift_client.execute_query.return_value = [] + + get_redshift_patron_data(mock_redshift_client, ["1",], + isolate_connection=False) + + mock_redshift_client.connect.assert_not_called() + mock_redshift_client.execute_query.assert_called_once() + mock_redshift_client.close_connection.assert_not_called() + + def test_get_redshift_patron_data_with_duplicates(self, mocker, caplog): + RESULT = pd.DataFrame( + [["obf1", "1"*5, "1"*11], ["obf2", None, "2"*11], + ["obf3", "3"*5, None]], + columns=["patron_id", "postal_code", "geoid"]) + + mock_redshift_client = mocker.MagicMock() + mock_redshift_client.execute_query.return_value = \ + _TEST_REDSHIFT_RESPONSE + [["obf4", "bad_zip", "bad_geoid"], + ["obf4", "bad_zip2", "bad_geoid2"]] + + with caplog.at_level(logging.WARNING): + assert_frame_equal( + RESULT, get_redshift_patron_data( + mock_redshift_client, + ["obf1", "obf2", "obf3", "obf4"] + )) + + assert ("More than one Redshift row found for the following patron " + "ids: obf4") in caplog.text From 0d38a614edca9afeaec295659128abbc7469b9a9 Mon Sep 17 00:00:00 2001 From: aaronfriedman Date: Wed, 20 Nov 2024 14:56:38 -0700 Subject: [PATCH 3/7] Update README and CHANGELOG --- CHANGELOG.md | 1 + README.md | 5 +++-- pyproject.toml | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c9bddd..0a772e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # Changelog ## v1.6.0 11/20/24 +- Added patron_data_helper functions - Use executemany instead of execute when appropriate in PostgreSQLClient - Add capability to retry connecting to a database to the MySQL, PostgreSQL, and Redshift clients - Automatically close database connection upon error in the MySQL, PostgreSQL, and Redshift clients diff --git a/README.md b/README.md index 68e93ce..6eca9bc 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ This package contains common Python utility classes and functions. * Creating a logger in the appropriate format * Obfuscating a value using bcrypt * Parsing/building Research Catalog identifiers +* Mapping between barcodes and Sierra patron ids plus getting patron data from Sierra and Redshift using those ids ## Usage ```python @@ -37,7 +38,7 @@ kinesis_client = KinesisClient(...) # Do not use any version below 1.0.0 # All available optional dependencies can be found in pyproject.toml. # See the "Managing dependencies" section below for more details. -nypl-py-utils[kinesis-client,config-helper]==1.5.0 +nypl-py-utils[kinesis-client,config-helper]==1.6.0 ``` ## Developing locally @@ -63,7 +64,7 @@ The optional dependency sets also give the developer the option to manually list ### Using PostgreSQLClient in an AWS Lambda Because `psycopg` requires a statically linked version of the `libpq` library, the `PostgreSQLClient` cannot be installed as-is in an AWS Lambda function. Instead, it must be packaged as follows: ```bash -pip install --target ./package nypl-py-utils[postgresql-client]==1.5.0 +pip install --target ./package nypl-py-utils[postgresql-client]==x.y.z pip install \ --platform manylinux2014_x86_64 \ diff --git a/pyproject.toml b/pyproject.toml index e9f73d7..3f81d4f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "nypl_py_utils" -version = "1.5.0" +version = "1.6.0" authors = [ { name="Aaron Friedman", email="aaronfriedman@nypl.org" }, ] @@ -71,7 +71,7 @@ obfuscation-helper = [ "bcrypt>=4.0.1" ] patron-data-helper = [ - "nypl_py_utils[postgresql-client,redshift-client]>=1.5.0", + "nypl_py_utils[postgresql-client,redshift-client]>=1.6.0", "pandas>=2.2.2" ] research-catalog-identifier-helper = [ From 1a597f63447039d0a2f34a372a5b7b7d05bbb25d Mon Sep 17 00:00:00 2001 From: aaronfriedman Date: Wed, 20 Nov 2024 15:08:12 -0700 Subject: [PATCH 4/7] Add test Redshift db --- tests/test_patron_data_helper.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_patron_data_helper.py b/tests/test_patron_data_helper.py index 78287a5..7ba1ed4 100644 --- a/tests/test_patron_data_helper.py +++ b/tests/test_patron_data_helper.py @@ -237,6 +237,7 @@ def test_get_redshift_patron_data(self, mocker, caplog): columns=["patron_id", "postal_code", "geoid"]) mock_redshift_client = mocker.MagicMock() + mock_redshift_client.database = "test_db" mock_redshift_client.execute_query.return_value = \ _TEST_REDSHIFT_RESPONSE @@ -256,7 +257,8 @@ def test_get_redshift_patron_data(self, mocker, caplog): # directly. The workaround is to test the total length of the query # plus that each id appears in it. query = mock_redshift_client.execute_query.call_args[0][0] - assert len(query) == 175 + assert len(query) == 124 + assert "patron_info_test_db" in query for el in ["'obf1'", "'obf2'", "'obf3'", "'obf4'"]: assert el in query From 1ced6e71aaa21811c6645575d82bc2f9d91c99ce Mon Sep 17 00:00:00 2001 From: aaronfriedman Date: Fri, 22 Nov 2024 10:12:19 -0700 Subject: [PATCH 5/7] Remove specific version from README --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 6eca9bc..ff9ba81 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ kinesis_client = KinesisClient(...) # Do not use any version below 1.0.0 # All available optional dependencies can be found in pyproject.toml. # See the "Managing dependencies" section below for more details. -nypl-py-utils[kinesis-client,config-helper]==1.6.0 +nypl-py-utils[kinesis-client,config-helper]==1.x.y ``` ## Developing locally @@ -64,7 +64,7 @@ The optional dependency sets also give the developer the option to manually list ### Using PostgreSQLClient in an AWS Lambda Because `psycopg` requires a statically linked version of the `libpq` library, the `PostgreSQLClient` cannot be installed as-is in an AWS Lambda function. Instead, it must be packaged as follows: ```bash -pip install --target ./package nypl-py-utils[postgresql-client]==x.y.z +pip install --target ./package nypl-py-utils[postgresql-client]==1.x.y pip install \ --platform manylinux2014_x86_64 \ From 03b6bddf2b02046df9249570447cd180d2fac5a0 Mon Sep 17 00:00:00 2001 From: aaronfriedman Date: Fri, 22 Nov 2024 10:18:34 -0700 Subject: [PATCH 6/7] Require lower version of py-utils in patron-data-helper --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 3f81d4f..a410fc0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,7 +71,7 @@ obfuscation-helper = [ "bcrypt>=4.0.1" ] patron-data-helper = [ - "nypl_py_utils[postgresql-client,redshift-client]>=1.6.0", + "nypl_py_utils[postgresql-client,redshift-client]>=1.1.5", "pandas>=2.2.2" ] research-catalog-identifier-helper = [ From 9538159632a724c75162fdef504d008f6d576792 Mon Sep 17 00:00:00 2001 From: aaronfriedman Date: Fri, 22 Nov 2024 10:42:36 -0700 Subject: [PATCH 7/7] Move break --- src/nypl_py_utils/classes/mysql_client.py | 3 +-- src/nypl_py_utils/classes/postgresql_client.py | 3 +-- src/nypl_py_utils/classes/redshift_client.py | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/nypl_py_utils/classes/mysql_client.py b/src/nypl_py_utils/classes/mysql_client.py index a755d5b..befe42b 100644 --- a/src/nypl_py_utils/classes/mysql_client.py +++ b/src/nypl_py_utils/classes/mysql_client.py @@ -44,6 +44,7 @@ def connect(self, retry_count=0, backoff_factor=5, **kwargs): user=self.user, password=self.password, **kwargs) + break except (mysql.connector.Error): if attempt_count < retry_count: self.logger.info('Failed to connect -- retrying') @@ -51,8 +52,6 @@ def connect(self, retry_count=0, backoff_factor=5, **kwargs): attempt_count += 1 else: raise - else: - break except Exception as e: self.logger.error( 'Error connecting to {name} database: {error}'.format( diff --git a/src/nypl_py_utils/classes/postgresql_client.py b/src/nypl_py_utils/classes/postgresql_client.py index 82c6b9e..f174d34 100644 --- a/src/nypl_py_utils/classes/postgresql_client.py +++ b/src/nypl_py_utils/classes/postgresql_client.py @@ -38,6 +38,7 @@ def connect(self, retry_count=0, backoff_factor=5, **kwargs): try: try: self.conn = psycopg.connect(self.conn_info, **kwargs) + break except (psycopg.OperationalError, psycopg.errors.ConnectionTimeout): if attempt_count < retry_count: @@ -46,8 +47,6 @@ def connect(self, retry_count=0, backoff_factor=5, **kwargs): attempt_count += 1 else: raise - else: - break except Exception as e: self.logger.error( 'Error connecting to {name} database: {error}'.format( diff --git a/src/nypl_py_utils/classes/redshift_client.py b/src/nypl_py_utils/classes/redshift_client.py index 9f594ef..47cabeb 100644 --- a/src/nypl_py_utils/classes/redshift_client.py +++ b/src/nypl_py_utils/classes/redshift_client.py @@ -39,6 +39,7 @@ def connect(self, retry_count=0, backoff_factor=5): user=self.user, password=self.password, sslmode='verify-full') + break except (redshift_connector.InterfaceError): if attempt_count < retry_count: self.logger.info('Failed to connect -- retrying') @@ -46,8 +47,6 @@ def connect(self, retry_count=0, backoff_factor=5): attempt_count += 1 else: raise - else: - break except Exception as e: self.logger.error( 'Error connecting to {name} database: {error}'.format(