diff --git a/kartograf/context.py b/kartograf/context.py index da98902..b93cca8 100644 --- a/kartograf/context.py +++ b/kartograf/context.py @@ -37,7 +37,7 @@ def __init__(self, args): self.args.reproduce += '/' self.data_dir = self.args.reproduce else: - self.data_dir = str(cwd / "data" / self.epoch_dir) + self.data_dir = str(Path("data") / self.epoch_dir) if Path(self.data_dir).exists() and not self.reproduce: print("Not so fast, a folder with that epoch already exists.") diff --git a/kartograf/rpki/fetch.py b/kartograf/rpki/fetch.py index de824c3..24d19ec 100644 --- a/kartograf/rpki/fetch.py +++ b/kartograf/rpki/fetch.py @@ -1,7 +1,8 @@ import subprocess import sys -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed +import json from threading import Lock from pathlib import Path import requests @@ -97,7 +98,7 @@ def validate_rpki_db(context): with open(context.debug_log, 'a') as logs: logs.write("\n\n=== RPKI Validation ===\n") - def process_file(file): + def process_file(batch): result = subprocess.run(["rpki-client", "-j", "-n", @@ -106,7 +107,7 @@ def process_file(file): "-P", context.epoch, ] + tal_options + - ["-f", file], # -f has to be last + ["-f"] + batch, # -f has to be last capture_output=True, check=False) @@ -114,18 +115,29 @@ def process_file(file): stderr_output = result.stderr.decode() with debug_file_lock: with open(context.debug_log, 'a') as logs: - logs.write(f'\nfile: {file}\n') logs.write(stderr_output) return result.stdout - with ThreadPoolExecutor() as executor: - results = list(tqdm(executor.map(process_file, files), total=len(files))) - - json_results = [result.decode() for result in results if result] + total = len(files) + batch_size = 250 + batches = [] + for i in range(0, total, batch_size): + batch = [str(f) for f in files[i:i + batch_size]] + batches.append(batch) - with open(result_path, "w") as res_file: - res_file.write("[") - res_file.write(",".join(json_results)) - res_file.write("]") - - print(f"{len(json_results)} RKPI ROAs validated and saved to {result_path}, file hash: {calculate_sha256(result_path)}") + total_batches = len(batches) + results = [] + with ThreadPoolExecutor() as executor: + futures = [executor.submit(process_file, batch) for batch in batches] + for future in tqdm(as_completed(futures), total=total_batches): + result = future.result() + if result: + normalized = result.replace(b"\n}\n{\n\t", b"\n},\n{\n").decode('utf-8').strip() + results.append(normalized) + results_json = json.loads("[" + ",".join(results) + "]") + s = sorted(results_json, key=lambda result: result["hash_id"]) + + with open(result_path, 'w') as f: + json.dump(s, f) + + print(f"RKPI ROAs validated and saved to {result_path}, file hash: {calculate_sha256(result_path)}") diff --git a/kartograf/rpki/parse.py b/kartograf/rpki/parse.py index 2fc0a72..42cadff 100644 --- a/kartograf/rpki/parse.py +++ b/kartograf/rpki/parse.py @@ -64,7 +64,7 @@ def parse_rpki(context): if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn): if context.debug_log: with open(context.debug_log, 'a') as logs: - logs.write(f"RPKI: parser encountered an invalid IP network: {prefix}") + logs.write(f"RPKI: parser encountered an invalid IP network: {prefix} \n") continue if context.max_encode and is_out_of_encoding_range(asn, context.max_encode):