diff --git a/kartograf/rpki/parse.py b/kartograf/rpki/parse.py index e828e21..47f72ea 100644 --- a/kartograf/rpki/parse.py +++ b/kartograf/rpki/parse.py @@ -1,5 +1,5 @@ import json -import os +from pathlib import Path from typing import Dict from kartograf.bogon import ( @@ -13,8 +13,8 @@ @timed def parse_rpki(context): - raw_input = os.path.join(context.out_dir_rpki, "rpki_raw.json") - rpki_res = os.path.join(context.out_dir_rpki, "rpki_final.txt") + raw_input = Path(context.out_dir_rpki) / "rpki_raw.json" + rpki_res = Path(context.out_dir_rpki) / "rpki_final.txt" output_cache: Dict[str, [str, str]] = {} diff --git a/tests/context.py b/tests/context.py index db05a9c..f9789e3 100644 --- a/tests/context.py +++ b/tests/context.py @@ -1,6 +1,7 @@ import csv import json import os +from pathlib import Path from types import SimpleNamespace from kartograf.context import Context @@ -32,25 +33,26 @@ def load_rpki_csv_to_json(self, csv_path): row["vrps"] = vrps rpki_data.append(row) - output_path = os.path.join(self.out_dir_rpki, 'rpki_raw.json') + output_path = Path(self.out_dir_rpki) / 'rpki_raw.json' with open(output_path, 'w') as jsonfile: json.dump(rpki_data, jsonfile, indent=2) def create_test_context(tmp_path, epoch): - current_path = os.getcwd() + current_path = Path.cwd() + fixtures_path = Path(__file__).parent / "data" os.chdir(tmp_path) # Use temporary directory TEST_ARGS.epoch = epoch context = Context(TEST_ARGS) - context.data_dir = os.path.join(tmp_path, "data/", context.epoch_dir) - context.data_dir_rpki = os.path.join(context.data_dir, "rpki/") - context.out_dir = os.path.join(tmp_path, "out/", context.epoch_dir) - context.out_dir_rpki = os.path.join(context.out_dir, "rpki/") + context.data_dir = Path(tmp_path) / "data" / context.epoch_dir + context.data_dir_rpki = Path(context.data_dir) / "rpki" + context.out_dir = Path(tmp_path) / "out" / context.epoch_dir + context.out_dir_rpki = Path(context.out_dir) / "rpki" - os.makedirs(context.data_dir_rpki, exist_ok=True) - os.makedirs(context.out_dir_rpki, exist_ok=True) + Path.mkdir(context.data_dir_rpki, exist_ok=True, parents=True) + Path.mkdir(context.out_dir_rpki, exist_ok=True, parents=True) - load_rpki_csv_to_json(context, os.path.join(current_path, "tests/data/rpki_raw.csv")) + load_rpki_csv_to_json(context, fixtures_path / "rpki_raw.csv") os.chdir(current_path) return context diff --git a/tests/test_merge.py b/tests/test_merge.py index 79c2b6e..09c8729 100644 --- a/tests/test_merge.py +++ b/tests/test_merge.py @@ -1,7 +1,7 @@ """ Test merging multiple sets of networks, as if they were independent AS files. """ -import os +from pathlib import Path from kartograf.merge import general_merge @@ -39,10 +39,10 @@ def test_merge_from_fixtures(tmp_path): ''' Assert that general_merge merges subnets correctly. ''' - data_dir = os.path.join(os.path.dirname(__file__), "data") - base_nets, base_subnet_count = __read_test_vectors(os.path.join(data_dir, "base_file.csv")) + data_dir = Path(__file__).parent / "data" + base_nets, base_subnet_count = __read_test_vectors(data_dir / "base_file.csv") base_path = tmp_path / "base.txt" - extra_nets, extra_subnet_count = __read_test_vectors(os.path.join(data_dir, "extra_file.csv")) + extra_nets, extra_subnet_count = __read_test_vectors(data_dir / "extra_file.csv") extra_path = tmp_path / "extra.txt" # write the networks to disk, generating ASNs for each network generate_ip_file(base_path, build_file_lines(base_nets, generate_asns(len(base_nets)))) diff --git a/tests/test_rpki_parser.py b/tests/test_rpki_parser.py index d51543d..6a82ba3 100644 --- a/tests/test_rpki_parser.py +++ b/tests/test_rpki_parser.py @@ -1,5 +1,5 @@ import json -import os +from pathlib import Path from kartograf.rpki.parse import parse_rpki from .context import create_test_context @@ -27,11 +27,11 @@ def test_roa_validations(tmp_path, capsys): parse_rpki(context) # Check that rpki_final.txt was created - final_path = os.path.join(context.out_dir_rpki, "rpki_final.txt") - assert os.path.exists(final_path), "rpki_final.txt should exist" + final_path = Path(context.out_dir_rpki) / "rpki_final.txt" + assert Path.exists(final_path), "rpki_final.txt should exist" # Read the raw JSON to compare counts - with open(os.path.join(context.out_dir_rpki, "rpki_raw.json"), "r") as f: + with open(Path(context.out_dir_rpki) / "rpki_raw.json", "r") as f: raw_data = json.load(f) # Count entries in final output @@ -79,14 +79,14 @@ def test_roa_incompletes(tmp_path, capsys): ] # Write test data to rpki_raw.json - with open(os.path.join(context.out_dir_rpki, "rpki_raw.json"), "w") as f: + with open(Path(context.out_dir_rpki) / "rpki_raw.json", "w") as f: json.dump(test_data, f) parse_rpki(context) # Check that rpki_final.txt was created - final_path = os.path.join(context.out_dir_rpki, "rpki_final.txt") - assert os.path.exists(final_path), "rpki_final.txt should exist" + final_path = Path(context.out_dir_rpki) / "rpki_final.txt" + assert Path.exists(final_path), "rpki_final.txt should exist" # Count entries in final output with open(final_path, "r") as f: