From 372c9cc0f10930fc8ae797491c259d1cb64639fa Mon Sep 17 00:00:00 2001 From: jurraca Date: Tue, 4 Feb 2025 16:22:41 +0000 Subject: [PATCH 1/4] rpki validation: parse in batches and handle concatenated json The rpki-client '-f' flag can handle multiple ROA files. Instead of passing files one by one, pass them in batches of 250. Instead of building a list of `json_results` in memory, write to the result file as they return in batches. This requires adding a delimiter to the resulting JSON stream, which does not have any by default. Finally, this removes the parallelization via ThreadPoolExecutor, and is about 2x faster without it. May want to bring this back. --- kartograf/rpki/fetch.py | 24 +++++++++++++----------- kartograf/rpki/parse.py | 2 +- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/kartograf/rpki/fetch.py b/kartograf/rpki/fetch.py index de824c3..7c99b59 100644 --- a/kartograf/rpki/fetch.py +++ b/kartograf/rpki/fetch.py @@ -1,7 +1,6 @@ import subprocess import sys -from concurrent.futures import ThreadPoolExecutor from threading import Lock from pathlib import Path import requests @@ -97,7 +96,7 @@ def validate_rpki_db(context): with open(context.debug_log, 'a') as logs: logs.write("\n\n=== RPKI Validation ===\n") - def process_file(file): + def process_file(batch): result = subprocess.run(["rpki-client", "-j", "-n", @@ -106,7 +105,7 @@ def process_file(file): "-P", context.epoch, ] + tal_options + - ["-f", file], # -f has to be last + ["-f"] + batch, # -f has to be last capture_output=True, check=False) @@ -114,18 +113,21 @@ def process_file(file): stderr_output = result.stderr.decode() with debug_file_lock: with open(context.debug_log, 'a') as logs: - logs.write(f'\nfile: {file}\n') logs.write(stderr_output) return result.stdout - with ThreadPoolExecutor() as executor: - results = list(tqdm(executor.map(process_file, files), total=len(files))) - - json_results = [result.decode() for result in results if result] + total = len(files) + batch_size = 250 with open(result_path, "w") as res_file: res_file.write("[") - res_file.write(",".join(json_results)) - res_file.write("]") + for i in tqdm(range(0, total, batch_size)): + batch = [str(f) for f in files[i:i + batch_size]] + results = process_file(batch) + normalized = results.replace(b"\n}\n{\n\t", b"\n},\n{\n").decode('utf-8').strip() + res_file.write(normalized) + res_file.write(",") + + res_file.write("{}]") - print(f"{len(json_results)} RKPI ROAs validated and saved to {result_path}, file hash: {calculate_sha256(result_path)}") + print(f"RKPI ROAs validated and saved to {result_path}, file hash: {calculate_sha256(result_path)}") diff --git a/kartograf/rpki/parse.py b/kartograf/rpki/parse.py index 2fc0a72..42cadff 100644 --- a/kartograf/rpki/parse.py +++ b/kartograf/rpki/parse.py @@ -64,7 +64,7 @@ def parse_rpki(context): if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn): if context.debug_log: with open(context.debug_log, 'a') as logs: - logs.write(f"RPKI: parser encountered an invalid IP network: {prefix}") + logs.write(f"RPKI: parser encountered an invalid IP network: {prefix} \n") continue if context.max_encode and is_out_of_encoding_range(asn, context.max_encode): From ae2f56edfc801ceb11484b1312b9af545cebd810 Mon Sep 17 00:00:00 2001 From: jurraca Date: Wed, 19 Feb 2025 16:30:54 +0000 Subject: [PATCH 2/4] bring back concurrency: gotta go fast this commit brings back the ThreadPoolExecutor ommitted in the previous commit, and results in significant performance gains (~7x). The output rpki_raw.json gets a different hash (expected) and the final_result.txt file gets the same hash as running with the previous commit and linear processing. --- kartograf/rpki/fetch.py | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/kartograf/rpki/fetch.py b/kartograf/rpki/fetch.py index 7c99b59..1a35455 100644 --- a/kartograf/rpki/fetch.py +++ b/kartograf/rpki/fetch.py @@ -1,6 +1,7 @@ import subprocess import sys +from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock from pathlib import Path import requests @@ -118,16 +119,25 @@ def process_file(batch): total = len(files) batch_size = 250 - - with open(result_path, "w") as res_file: - res_file.write("[") - for i in tqdm(range(0, total, batch_size)): - batch = [str(f) for f in files[i:i + batch_size]] - results = process_file(batch) - normalized = results.replace(b"\n}\n{\n\t", b"\n},\n{\n").decode('utf-8').strip() - res_file.write(normalized) - res_file.write(",") - - res_file.write("{}]") + batches = [] + for i in range(0, total, batch_size): + batch = [str(f) for f in files[i:i + batch_size]] + batches.append(batch) + + total_batches = len(batches) + completed = 0 + with ThreadPoolExecutor() as executor: + futures = [executor.submit(process_file, batch) for batch in batches] + with open(result_path, "w") as res_file: + res_file.write("[") + for future in tqdm(as_completed(futures), total=total_batches): + result = future.result() + if result: + normalized = result.replace(b"\n}\n{\n\t", b"\n},\n{\n").decode('utf-8').strip() + res_file.write(normalized) + completed += 1 + if completed < total_batches: + res_file.write(",") + res_file.write("]") print(f"RKPI ROAs validated and saved to {result_path}, file hash: {calculate_sha256(result_path)}") From bc417c8bd0319083f3f9416d9bedc80c8e010448 Mon Sep 17 00:00:00 2001 From: jurraca Date: Thu, 6 Mar 2025 22:31:31 +0000 Subject: [PATCH 3/4] fix: use relative filepath in both main and reproduce contexts --- kartograf/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kartograf/context.py b/kartograf/context.py index da98902..b93cca8 100644 --- a/kartograf/context.py +++ b/kartograf/context.py @@ -37,7 +37,7 @@ def __init__(self, args): self.args.reproduce += '/' self.data_dir = self.args.reproduce else: - self.data_dir = str(cwd / "data" / self.epoch_dir) + self.data_dir = str(Path("data") / self.epoch_dir) if Path(self.data_dir).exists() and not self.reproduce: print("Not so fast, a folder with that epoch already exists.") From 22c7ca87283c603f2ee94b1565633ca755abe050 Mon Sep 17 00:00:00 2001 From: jurraca Date: Thu, 6 Mar 2025 22:34:14 +0000 Subject: [PATCH 4/4] sort RPKI JSON output this stores the JSON results in memory, then sorts and writes them to disk, avoiding the need to handle concatenating JSON strings. --- kartograf/rpki/fetch.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/kartograf/rpki/fetch.py b/kartograf/rpki/fetch.py index 1a35455..24d19ec 100644 --- a/kartograf/rpki/fetch.py +++ b/kartograf/rpki/fetch.py @@ -2,6 +2,7 @@ import sys from concurrent.futures import ThreadPoolExecutor, as_completed +import json from threading import Lock from pathlib import Path import requests @@ -125,19 +126,18 @@ def process_file(batch): batches.append(batch) total_batches = len(batches) - completed = 0 + results = [] with ThreadPoolExecutor() as executor: futures = [executor.submit(process_file, batch) for batch in batches] - with open(result_path, "w") as res_file: - res_file.write("[") - for future in tqdm(as_completed(futures), total=total_batches): - result = future.result() - if result: - normalized = result.replace(b"\n}\n{\n\t", b"\n},\n{\n").decode('utf-8').strip() - res_file.write(normalized) - completed += 1 - if completed < total_batches: - res_file.write(",") - res_file.write("]") + for future in tqdm(as_completed(futures), total=total_batches): + result = future.result() + if result: + normalized = result.replace(b"\n}\n{\n\t", b"\n},\n{\n").decode('utf-8').strip() + results.append(normalized) + results_json = json.loads("[" + ",".join(results) + "]") + s = sorted(results_json, key=lambda result: result["hash_id"]) + + with open(result_path, 'w') as f: + json.dump(s, f) print(f"RKPI ROAs validated and saved to {result_path}, file hash: {calculate_sha256(result_path)}")