-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
515 additions
and
804 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,124 +1,104 @@ | ||
import tempfile | ||
import os | ||
from os import path | ||
|
||
import h5py | ||
import numpy as np | ||
import pytest | ||
|
||
from MDANSE.Framework.InputData.HDFTrajectoryInputData import HDFTrajectoryInputData | ||
from MDANSE.Framework.InstrumentResolutions.IInstrumentResolution import ( | ||
IInstrumentResolution, | ||
) | ||
from MDANSE.Framework.InputData.HDFTrajectoryInputData import \ | ||
HDFTrajectoryInputData | ||
from MDANSE.Framework.InstrumentResolutions.IInstrumentResolution import \ | ||
IInstrumentResolution | ||
from MDANSE.Framework.Jobs.IJob import IJob | ||
from test_helpers.compare_mdt import compare_mdt | ||
from test_helpers.paths import CONV_DIR, RESULTS_DIR | ||
|
||
|
||
short_traj = os.path.join( | ||
os.path.dirname(os.path.realpath(__file__)), | ||
"..", | ||
"Converted", | ||
"short_trajectory_after_changes.mdt", | ||
) | ||
result_dir = os.path.join( | ||
os.path.dirname(os.path.realpath(__file__)), | ||
"..", | ||
"Results", | ||
) | ||
short_traj = CONV_DIR / "short_trajectory_after_changes.mdt" | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def trajectory(): | ||
trajectory = HDFTrajectoryInputData(short_traj) | ||
yield trajectory | ||
|
||
@pytest.mark.parametrize("resolution_generator", IInstrumentResolution.subclasses()) | ||
def test_disf(tmp_path, trajectory, resolution_generator): | ||
temp_name = tmp_path / "output" | ||
out_file = temp_name.with_suffix(".mda") | ||
log_file = temp_name.with_suffix(".log") | ||
|
||
parameters = { | ||
"atom_selection": None, | ||
"atom_transmutation": None, | ||
"frames": (0, 10, 1, 5), | ||
"instrument_resolution": ("Ideal", {}), | ||
"q_vectors": ( | ||
"SphericalLatticeQVectors", | ||
{"seed": 0, "shells": (5.0, 36, 10.0), "n_vectors": 10, "width": 9.0}, | ||
), | ||
"running_mode": ("single-core",), | ||
"trajectory": short_traj, | ||
"weights": "b_incoherent2", | ||
} | ||
|
||
parameters["output_files"] = (temp_name, ("MDAFormat",), "INFO") | ||
instance = IInstrumentResolution.create(resolution_generator) | ||
resolution_defaults = { | ||
name: value[1]["default"] for name, value in instance.settings.items() | ||
} | ||
|
||
print(resolution_generator) | ||
print(resolution_defaults) | ||
|
||
def test_disf(trajectory): | ||
parameters = {} | ||
parameters["atom_selection"] = None | ||
parameters["atom_transmutation"] = None | ||
parameters["frames"] = (0, 10, 1, 5) | ||
parameters["instrument_resolution"] = ("Ideal", {}) | ||
parameters["q_vectors"] = ( | ||
"SphericalLatticeQVectors", | ||
{"seed": 0, "shells": (5.0, 36, 10.0), "n_vectors": 10, "width": 9.0}, | ||
parameters["instrument_resolution"] = ( | ||
resolution_generator, | ||
resolution_defaults, | ||
) | ||
parameters["running_mode"] = ("single-core",) | ||
parameters["trajectory"] = short_traj | ||
parameters["weights"] = "b_incoherent2" | ||
for resolution_generator in IInstrumentResolution.subclasses(): | ||
temp_name = tempfile.mktemp() | ||
parameters["output_files"] = (temp_name, ("MDAFormat",), "INFO") | ||
instance = IInstrumentResolution.create(resolution_generator) | ||
resolution_defaults = { | ||
name: value[1]["default"] for name, value in instance.settings.items() | ||
} | ||
print(resolution_generator) | ||
print(resolution_defaults) | ||
parameters["instrument_resolution"] = ( | ||
resolution_generator, | ||
resolution_defaults, | ||
) | ||
disf = IJob.create("DynamicIncoherentStructureFactor") | ||
disf.run(parameters, status=True) | ||
assert path.exists(temp_name + ".mda") | ||
assert path.isfile(temp_name + ".mda") | ||
os.remove(temp_name + ".mda") | ||
assert path.exists(temp_name + ".log") | ||
assert path.isfile(temp_name + ".log") | ||
os.remove(temp_name + ".log") | ||
|
||
|
||
list_of_resolutions = IInstrumentResolution.subclasses() | ||
|
||
|
||
@pytest.mark.parametrize("resolution_generator", list_of_resolutions) | ||
def test_dos(trajectory, resolution_generator): | ||
parameters = {} | ||
parameters["atom_selection"] = None | ||
parameters["atom_transmutation"] = None | ||
parameters["frames"] = (0, 10, 1, 5) | ||
parameters["instrument_resolution"] = ("Ideal", {}) | ||
parameters["running_mode"] = ("single-core",) | ||
parameters["trajectory"] = short_traj | ||
parameters["weights"] = "b_incoherent2" | ||
temp_name = tempfile.mktemp() | ||
|
||
disf = IJob.create("DynamicIncoherentStructureFactor") | ||
disf.run(parameters, status=True) | ||
|
||
assert out_file.is_file() | ||
assert log_file.is_file() | ||
|
||
|
||
@pytest.mark.parametrize("resolution_generator", IInstrumentResolution.subclasses()) | ||
def test_dos(tmp_path, trajectory, resolution_generator): | ||
temp_name = tmp_path / "output" | ||
out_file = temp_name.with_suffix(".mda") | ||
log_file = temp_name.with_suffix(".log") | ||
text_file = tmp_path / "output_text.tar" | ||
|
||
parameters = { | ||
"atom_selection": None, | ||
"atom_transmutation": None, | ||
"frames": (0, 10, 1, 5), | ||
"instrument_resolution": ("Ideal", {}), | ||
"running_mode": ("single-core",), | ||
"trajectory": short_traj, | ||
"weights": "b_incoherent2", | ||
} | ||
|
||
parameters["output_files"] = (temp_name, ("MDAFormat", "TextFormat"), "INFO") | ||
|
||
instance = IInstrumentResolution.create(resolution_generator) | ||
resolution_defaults = { | ||
name: value[1]["default"] for name, value in instance.settings.items() | ||
} | ||
|
||
print(resolution_generator) | ||
print(resolution_defaults) | ||
|
||
parameters["instrument_resolution"] = ( | ||
resolution_generator, | ||
resolution_defaults, | ||
) | ||
|
||
disf = IJob.create("DensityOfStates") | ||
disf.run(parameters, status=True) | ||
assert path.exists(temp_name + ".mda") | ||
assert path.isfile(temp_name + ".mda") | ||
result_file = os.path.join(result_dir, f"dos_{resolution_generator}.mda") | ||
|
||
with h5py.File(temp_name + ".mda") as actual, h5py.File(result_file) as desired: | ||
for key in [ | ||
"dos_Cu", | ||
"dos_S", | ||
"dos_Sb", | ||
"dos_total", | ||
"vacf_Cu", | ||
"vacf_S", | ||
"vacf_Sb", | ||
"vacf_total", | ||
]: | ||
np.testing.assert_array_almost_equal( | ||
actual[f"/{key}"] * actual[f"/{key}"].attrs["scaling_factor"], | ||
desired[f"/{key}"], | ||
) | ||
|
||
os.remove(temp_name + ".mda") | ||
assert path.exists(temp_name + "_text.tar") | ||
assert path.isfile(temp_name + "_text.tar") | ||
os.remove(temp_name + "_text.tar") | ||
assert path.exists(temp_name + ".log") | ||
assert path.isfile(temp_name + ".log") | ||
os.remove(temp_name + ".log") | ||
|
||
assert out_file.is_file() | ||
assert log_file.is_file() | ||
assert text_file.is_file() | ||
|
||
result_file = RESULTS_DIR / f"dos_{resolution_generator}.mda" | ||
|
||
keys = [f"{fn}_{elem}" | ||
for fn in ("dos", "vacf") | ||
for elem in ("Cu", "S", "Sb", "total")] | ||
|
||
compare_mdt(out_file, result_file, keys, normalised=True) |
Oops, something went wrong.