Skip to content

Commit

Permalink
Compact listen dumps in HDFS
Browse files Browse the repository at this point in the history
Full dumps when imported in Spark are partitioned by listened_at's year
and month for storage in HDFS. Incremental dumps are imported everyday
and appended to a single incremental.parquet. Deleted listens are similarly
stored in deleted-listens.parquet and deleted-user-listen-history.parquet.
At run time, the full dumps are read and concatenated with incremental
dumps and the deleted listens are filtered out from the union. When a new
full dump is imported, it contains all the listens till that time and
all the deleted listens removed and the additional parquet files for
incremental and deleted listens are removed. This happens on a biweekly
timeline at the moment.

Full dumps are cumbersome to produce, hence we want to reduce our dependence
on them inside of ListenBrainz and Spark. After an initial full dump import
to seed the cluster, we intend to get rid of the biweekly full dump imports
and just rely on incremental dumps continuously. Hence, we need to rethink
some steps in how incremental listens are stored in the spark cluster and
how to implement deletions.

The solution I have come up with is replace the full dump import step with a
compaction step which reads all the partitioned base listens combines them
with incremental listens, removes the deleted listens and writes them back
to HDFS in the partitioned format. Everything else remains same.
  • Loading branch information
amCap1712 committed Mar 4, 2025
1 parent 194565e commit 4a4ab5c
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 49 deletions.
8 changes: 7 additions & 1 deletion listenbrainz/spark/request_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ def request_troi_playlists(slug, create_all):


@cli.command(name="request_tags")
def request_troi_playlists():
def request_tags():
""" Generate the tags dataset with percent rank """
send_request_to_spark_cluster("tags.default")

Expand All @@ -516,6 +516,12 @@ def request_import_deleted_listens():
send_request_to_spark_cluster("import.deleted_listens")


@cli.command(name="request_compact_listens")
def request_compact_listens():
""" Send a request to spark cluster to compact listens imported from listenbrainz """
send_request_to_spark_cluster("import.compact_listens")


# Some useful commands to keep our crontabs manageable. These commands do not add new functionality
# rather combine multiple commands related to a task so that they are always invoked in the correct order.

Expand Down
5 changes: 5 additions & 0 deletions listenbrainz/spark/request_queries.json
Original file line number Diff line number Diff line change
Expand Up @@ -226,5 +226,10 @@
"name": "import.deleted_listens",
"description": "Import deleted listens from ListenBrainz into spark cluster.",
"params": []
},
"import.compact_listens": {
"name": "import.compact_listens",
"description": "'Compact' listens imported from ListenBrainz in spark cluster.",
"params": []
}
}
66 changes: 66 additions & 0 deletions listenbrainz_spark/listens/compact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os

import listenbrainz_spark
from listenbrainz_spark import hdfs_connection
from listenbrainz_spark.hdfs.utils import path_exists
from listenbrainz_spark.listens.cache import unpersist_incremental_df
from listenbrainz_spark.listens.data import get_listens_from_dump
from listenbrainz_spark.listens.metadata import get_listens_metadata, generate_new_listens_location, \
update_listens_metadata


def main():
"""
Compacts listen storage by processing base and incremental listen records.
Reads base and incremental listen records, removes deleted listens, and stores the final
processed data partitioned by year and month in a new HDFS location.
"""
table = "listens_to_compact"
old_df = get_listens_from_dump(include_incremental=True, remove_deleted=True)
old_df.createOrReplaceTempView(table)

write_partitioned_listens(table)


def write_partitioned_listens(table):
""" Read listens from the given table and write them to a new HDFS location partitioned
by listened_at's year and month. """
query = f"""
select extract(year from listened_at) as year
, extract(month from listened_at) as month
, *
from {table}
"""
new_location = generate_new_listens_location()
new_base_listens_location = os.path.join(new_location, "base")

listenbrainz_spark \
.sql_context \
.sql(query) \
.write \
.partitionBy("year", "month") \
.mode("overwrite") \
.parquet(new_base_listens_location)

query = f"""
select max(listened_at) as max_listened_at, max(created) as max_created
from parquet.`{new_base_listens_location}`
"""
result = listenbrainz_spark \
.sql_context \
.sql(query) \
.collect()[0]

metadata = get_listens_metadata()
if metadata is None:
existing_location = None
else:
existing_location = metadata.location

update_listens_metadata(new_location, result.max_listened_at, result.max_created)

unpersist_incremental_df()

if existing_location and path_exists(existing_location):
hdfs_connection.client.delete(existing_location, recursive=True, skip_trash=True)
53 changes: 5 additions & 48 deletions listenbrainz_spark/listens/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from listenbrainz_spark.hdfs.upload import upload_archive_to_hdfs_temp
from listenbrainz_spark.hdfs.utils import path_exists, delete_dir, rename
from listenbrainz_spark.listens.cache import unpersist_incremental_df
from listenbrainz_spark.listens.compact import write_partitioned_listens
from listenbrainz_spark.listens.metadata import get_listens_metadata, generate_new_listens_location, \
update_listens_metadata
from listenbrainz_spark.path import IMPORT_METADATA
Expand Down Expand Up @@ -203,54 +204,10 @@ def insert_dump_data(dump_id: int, dump_type: DumpType, imported_at: datetime):


def process_full_listens_dump(temp_path):
""" Partition the imported full listens parquet dump by year and month """
metadata = get_listens_metadata()
if metadata is None:
existing_location = None
else:
existing_location = metadata.location

query = f"""
select extract(year from listened_at) as year
, extract(month from listened_at) as month
, listened_at
, created
, user_id
, recording_msid
, artist_name
, artist_credit_id
, release_name
, release_mbid
, recording_name
, recording_mbid
, artist_credit_mbids
from parquet.`{temp_path}`
"""
new_location = generate_new_listens_location()
base_listens_location = os.path.join(new_location, "base")

listenbrainz_spark \
.sql_context \
.sql(query) \
.write \
.partitionBy("year", "month") \
.mode("overwrite") \
.parquet(base_listens_location)

query = f"""
select max(listened_at) as max_listened_at, max(created) as max_created
from parquet.`{base_listens_location}`
"""
result = listenbrainz_spark \
.sql_context \
.sql(query) \
.collect()[0]
update_listens_metadata(new_location, result.max_listened_at, result.max_created)

unpersist_incremental_df()

if existing_location and path_exists(existing_location):
hdfs_connection.client.delete(existing_location, recursive=True, skip_trash=True)
""" Partition the imported full listens parquet dump by year and month and store in a new HDFS location. """
table = "unprocessed_full_dump_listens"
read_files_from_HDFS(temp_path).createOrReplaceTempView(table)
write_partitioned_listens(table)
hdfs_connection.client.delete(temp_path, recursive=True, skip_trash=True)


Expand Down
2 changes: 2 additions & 0 deletions listenbrainz_spark/query_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import listenbrainz_spark.echo.echo
import listenbrainz_spark.listens.dump
import listenbrainz_spark.listens.delete
import listenbrainz_spark.listens.compact

functions = {
'echo.echo': listenbrainz_spark.echo.echo.handler,
Expand Down Expand Up @@ -78,6 +79,7 @@
'troi.playlists': listenbrainz_spark.troi.periodic_jams.main,
'tags.default': listenbrainz_spark.tags.tags.main,
'import.deleted_listens': listenbrainz_spark.listens.delete.main,
'import.compact_listens': listenbrainz_spark.listens.compact.main
}


Expand Down

0 comments on commit 4a4ab5c

Please sign in to comment.