From 4a4ab5c4303f503cc0987bc94fbe72066d689420 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Sat, 1 Mar 2025 19:04:38 +0530 Subject: [PATCH] Compact listen dumps in HDFS Full dumps when imported in Spark are partitioned by listened_at's year and month for storage in HDFS. Incremental dumps are imported everyday and appended to a single incremental.parquet. Deleted listens are similarly stored in deleted-listens.parquet and deleted-user-listen-history.parquet. At run time, the full dumps are read and concatenated with incremental dumps and the deleted listens are filtered out from the union. When a new full dump is imported, it contains all the listens till that time and all the deleted listens removed and the additional parquet files for incremental and deleted listens are removed. This happens on a biweekly timeline at the moment. Full dumps are cumbersome to produce, hence we want to reduce our dependence on them inside of ListenBrainz and Spark. After an initial full dump import to seed the cluster, we intend to get rid of the biweekly full dump imports and just rely on incremental dumps continuously. Hence, we need to rethink some steps in how incremental listens are stored in the spark cluster and how to implement deletions. The solution I have come up with is replace the full dump import step with a compaction step which reads all the partitioned base listens combines them with incremental listens, removes the deleted listens and writes them back to HDFS in the partitioned format. Everything else remains same. --- listenbrainz/spark/request_manage.py | 8 ++- listenbrainz/spark/request_queries.json | 5 ++ listenbrainz_spark/listens/compact.py | 66 +++++++++++++++++++++++++ listenbrainz_spark/listens/dump.py | 53 ++------------------ listenbrainz_spark/query_map.py | 2 + 5 files changed, 85 insertions(+), 49 deletions(-) create mode 100644 listenbrainz_spark/listens/compact.py diff --git a/listenbrainz/spark/request_manage.py b/listenbrainz/spark/request_manage.py index 47b08c8c5c..f24da7bb58 100644 --- a/listenbrainz/spark/request_manage.py +++ b/listenbrainz/spark/request_manage.py @@ -505,7 +505,7 @@ def request_troi_playlists(slug, create_all): @cli.command(name="request_tags") -def request_troi_playlists(): +def request_tags(): """ Generate the tags dataset with percent rank """ send_request_to_spark_cluster("tags.default") @@ -516,6 +516,12 @@ def request_import_deleted_listens(): send_request_to_spark_cluster("import.deleted_listens") +@cli.command(name="request_compact_listens") +def request_compact_listens(): + """ Send a request to spark cluster to compact listens imported from listenbrainz """ + send_request_to_spark_cluster("import.compact_listens") + + # Some useful commands to keep our crontabs manageable. These commands do not add new functionality # rather combine multiple commands related to a task so that they are always invoked in the correct order. diff --git a/listenbrainz/spark/request_queries.json b/listenbrainz/spark/request_queries.json index 2bcacf8558..edca490067 100644 --- a/listenbrainz/spark/request_queries.json +++ b/listenbrainz/spark/request_queries.json @@ -226,5 +226,10 @@ "name": "import.deleted_listens", "description": "Import deleted listens from ListenBrainz into spark cluster.", "params": [] + }, + "import.compact_listens": { + "name": "import.compact_listens", + "description": "'Compact' listens imported from ListenBrainz in spark cluster.", + "params": [] } } \ No newline at end of file diff --git a/listenbrainz_spark/listens/compact.py b/listenbrainz_spark/listens/compact.py new file mode 100644 index 0000000000..b5e8295448 --- /dev/null +++ b/listenbrainz_spark/listens/compact.py @@ -0,0 +1,66 @@ +import os + +import listenbrainz_spark +from listenbrainz_spark import hdfs_connection +from listenbrainz_spark.hdfs.utils import path_exists +from listenbrainz_spark.listens.cache import unpersist_incremental_df +from listenbrainz_spark.listens.data import get_listens_from_dump +from listenbrainz_spark.listens.metadata import get_listens_metadata, generate_new_listens_location, \ + update_listens_metadata + + +def main(): + """ + Compacts listen storage by processing base and incremental listen records. + + Reads base and incremental listen records, removes deleted listens, and stores the final + processed data partitioned by year and month in a new HDFS location. + """ + table = "listens_to_compact" + old_df = get_listens_from_dump(include_incremental=True, remove_deleted=True) + old_df.createOrReplaceTempView(table) + + write_partitioned_listens(table) + + +def write_partitioned_listens(table): + """ Read listens from the given table and write them to a new HDFS location partitioned + by listened_at's year and month. """ + query = f""" + select extract(year from listened_at) as year + , extract(month from listened_at) as month + , * + from {table} + """ + new_location = generate_new_listens_location() + new_base_listens_location = os.path.join(new_location, "base") + + listenbrainz_spark \ + .sql_context \ + .sql(query) \ + .write \ + .partitionBy("year", "month") \ + .mode("overwrite") \ + .parquet(new_base_listens_location) + + query = f""" + select max(listened_at) as max_listened_at, max(created) as max_created + from parquet.`{new_base_listens_location}` + """ + result = listenbrainz_spark \ + .sql_context \ + .sql(query) \ + .collect()[0] + + metadata = get_listens_metadata() + if metadata is None: + existing_location = None + else: + existing_location = metadata.location + + update_listens_metadata(new_location, result.max_listened_at, result.max_created) + + unpersist_incremental_df() + + if existing_location and path_exists(existing_location): + hdfs_connection.client.delete(existing_location, recursive=True, skip_trash=True) diff --git a/listenbrainz_spark/listens/dump.py b/listenbrainz_spark/listens/dump.py index 2d2fa22b69..cd1070febe 100644 --- a/listenbrainz_spark/listens/dump.py +++ b/listenbrainz_spark/listens/dump.py @@ -18,6 +18,7 @@ from listenbrainz_spark.hdfs.upload import upload_archive_to_hdfs_temp from listenbrainz_spark.hdfs.utils import path_exists, delete_dir, rename from listenbrainz_spark.listens.cache import unpersist_incremental_df +from listenbrainz_spark.listens.compact import write_partitioned_listens from listenbrainz_spark.listens.metadata import get_listens_metadata, generate_new_listens_location, \ update_listens_metadata from listenbrainz_spark.path import IMPORT_METADATA @@ -203,54 +204,10 @@ def insert_dump_data(dump_id: int, dump_type: DumpType, imported_at: datetime): def process_full_listens_dump(temp_path): - """ Partition the imported full listens parquet dump by year and month """ - metadata = get_listens_metadata() - if metadata is None: - existing_location = None - else: - existing_location = metadata.location - - query = f""" - select extract(year from listened_at) as year - , extract(month from listened_at) as month - , listened_at - , created - , user_id - , recording_msid - , artist_name - , artist_credit_id - , release_name - , release_mbid - , recording_name - , recording_mbid - , artist_credit_mbids - from parquet.`{temp_path}` - """ - new_location = generate_new_listens_location() - base_listens_location = os.path.join(new_location, "base") - - listenbrainz_spark \ - .sql_context \ - .sql(query) \ - .write \ - .partitionBy("year", "month") \ - .mode("overwrite") \ - .parquet(base_listens_location) - - query = f""" - select max(listened_at) as max_listened_at, max(created) as max_created - from parquet.`{base_listens_location}` - """ - result = listenbrainz_spark \ - .sql_context \ - .sql(query) \ - .collect()[0] - update_listens_metadata(new_location, result.max_listened_at, result.max_created) - - unpersist_incremental_df() - - if existing_location and path_exists(existing_location): - hdfs_connection.client.delete(existing_location, recursive=True, skip_trash=True) + """ Partition the imported full listens parquet dump by year and month and store in a new HDFS location. """ + table = "unprocessed_full_dump_listens" + read_files_from_HDFS(temp_path).createOrReplaceTempView(table) + write_partitioned_listens(table) hdfs_connection.client.delete(temp_path, recursive=True, skip_trash=True) diff --git a/listenbrainz_spark/query_map.py b/listenbrainz_spark/query_map.py index 071ca0c531..056e15c5f3 100644 --- a/listenbrainz_spark/query_map.py +++ b/listenbrainz_spark/query_map.py @@ -34,6 +34,7 @@ import listenbrainz_spark.echo.echo import listenbrainz_spark.listens.dump import listenbrainz_spark.listens.delete +import listenbrainz_spark.listens.compact functions = { 'echo.echo': listenbrainz_spark.echo.echo.handler, @@ -78,6 +79,7 @@ 'troi.playlists': listenbrainz_spark.troi.periodic_jams.main, 'tags.default': listenbrainz_spark.tags.tags.main, 'import.deleted_listens': listenbrainz_spark.listens.delete.main, + 'import.compact_listens': listenbrainz_spark.listens.compact.main }