Skip to content

Commit

Permalink
more os.path -> pathlib
Browse files Browse the repository at this point in the history
  • Loading branch information
jurraca committed Jan 30, 2025
1 parent e564fda commit 2140739
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 23 deletions.
6 changes: 3 additions & 3 deletions kartograf/rpki/parse.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
import os
from pathlib import Path
from typing import Dict

from kartograf.bogon import (
Expand All @@ -13,8 +13,8 @@

@timed
def parse_rpki(context):
raw_input = os.path.join(context.out_dir_rpki, "rpki_raw.json")
rpki_res = os.path.join(context.out_dir_rpki, "rpki_final.txt")
raw_input = Path(context.out_dir_rpki) / "rpki_raw.json"
rpki_res = Path(context.out_dir_rpki) / "rpki_final.txt"

output_cache: Dict[str, [str, str]] = {}

Expand Down
20 changes: 11 additions & 9 deletions tests/context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
import json
import os
from pathlib import Path

from types import SimpleNamespace
from kartograf.context import Context
Expand Down Expand Up @@ -32,25 +33,26 @@ def load_rpki_csv_to_json(self, csv_path):
row["vrps"] = vrps
rpki_data.append(row)

output_path = os.path.join(self.out_dir_rpki, 'rpki_raw.json')
output_path = Path(self.out_dir_rpki) / 'rpki_raw.json'
with open(output_path, 'w') as jsonfile:
json.dump(rpki_data, jsonfile, indent=2)


def create_test_context(tmp_path, epoch):
current_path = os.getcwd()
current_path = Path.cwd()
fixtures_path = Path(__file__).parent / "data"
os.chdir(tmp_path) # Use temporary directory

TEST_ARGS.epoch = epoch
context = Context(TEST_ARGS)
context.data_dir = os.path.join(tmp_path, "data/", context.epoch_dir)
context.data_dir_rpki = os.path.join(context.data_dir, "rpki/")
context.out_dir = os.path.join(tmp_path, "out/", context.epoch_dir)
context.out_dir_rpki = os.path.join(context.out_dir, "rpki/")
context.data_dir = Path(tmp_path) / "data" / context.epoch_dir
context.data_dir_rpki = Path(context.data_dir) / "rpki"
context.out_dir = Path(tmp_path) / "out" / context.epoch_dir
context.out_dir_rpki = Path(context.out_dir) / "rpki"

os.makedirs(context.data_dir_rpki, exist_ok=True)
os.makedirs(context.out_dir_rpki, exist_ok=True)
Path.mkdir(context.data_dir_rpki, exist_ok=True, parents=True)
Path.mkdir(context.out_dir_rpki, exist_ok=True, parents=True)

load_rpki_csv_to_json(context, os.path.join(current_path, "tests/data/rpki_raw.csv"))
load_rpki_csv_to_json(context, fixtures_path / "rpki_raw.csv")
os.chdir(current_path)
return context
8 changes: 4 additions & 4 deletions tests/test_merge.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Test merging multiple sets of networks, as if they were independent AS files.
"""
import os
from pathlib import Path

from kartograf.merge import general_merge

Expand Down Expand Up @@ -39,10 +39,10 @@ def test_merge_from_fixtures(tmp_path):
'''
Assert that general_merge merges subnets correctly.
'''
data_dir = os.path.join(os.path.dirname(__file__), "data")
base_nets, base_subnet_count = __read_test_vectors(os.path.join(data_dir, "base_file.csv"))
data_dir = Path(__file__).parent / "data"
base_nets, base_subnet_count = __read_test_vectors(data_dir / "base_file.csv")
base_path = tmp_path / "base.txt"
extra_nets, extra_subnet_count = __read_test_vectors(os.path.join(data_dir, "extra_file.csv"))
extra_nets, extra_subnet_count = __read_test_vectors(data_dir / "extra_file.csv")
extra_path = tmp_path / "extra.txt"
# write the networks to disk, generating ASNs for each network
generate_ip_file(base_path, build_file_lines(base_nets, generate_asns(len(base_nets))))
Expand Down
14 changes: 7 additions & 7 deletions tests/test_rpki_parser.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
import os
from pathlib import Path

from kartograf.rpki.parse import parse_rpki
from .context import create_test_context
Expand Down Expand Up @@ -27,11 +27,11 @@ def test_roa_validations(tmp_path, capsys):
parse_rpki(context)

# Check that rpki_final.txt was created
final_path = os.path.join(context.out_dir_rpki, "rpki_final.txt")
assert os.path.exists(final_path), "rpki_final.txt should exist"
final_path = Path(context.out_dir_rpki) / "rpki_final.txt"
assert Path.exists(final_path), "rpki_final.txt should exist"

# Read the raw JSON to compare counts
with open(os.path.join(context.out_dir_rpki, "rpki_raw.json"), "r") as f:
with open(Path(context.out_dir_rpki) / "rpki_raw.json", "r") as f:
raw_data = json.load(f)

# Count entries in final output
Expand Down Expand Up @@ -79,14 +79,14 @@ def test_roa_incompletes(tmp_path, capsys):
]

# Write test data to rpki_raw.json
with open(os.path.join(context.out_dir_rpki, "rpki_raw.json"), "w") as f:
with open(Path(context.out_dir_rpki) / "rpki_raw.json", "w") as f:
json.dump(test_data, f)

parse_rpki(context)

# Check that rpki_final.txt was created
final_path = os.path.join(context.out_dir_rpki, "rpki_final.txt")
assert os.path.exists(final_path), "rpki_final.txt should exist"
final_path = Path(context.out_dir_rpki) / "rpki_final.txt"
assert Path.exists(final_path), "rpki_final.txt should exist"

# Count entries in final output
with open(final_path, "r") as f:
Expand Down

0 comments on commit 2140739

Please sign in to comment.