Skip to content

Commit

Permalink
algorithm redesigned + test
Browse files Browse the repository at this point in the history
  • Loading branch information
grzegorzbor committed Apr 20, 2024
1 parent 1f284cd commit b11ed43
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 89 deletions.
127 changes: 41 additions & 86 deletions agnpy/spectra/spectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,42 @@ def plot(self, gamma=None, gamma_power=0, ax=None, **kwargs):

return ax

def evaluate_time(self, time, energy_loss_function, subintervals_count=1):
unit_time_interval = time / subintervals_count

def gamma_recalculated_after_loss(gamma):
old_energy = (gamma * m_e * c ** 2).to("J")
energy_loss_per_time = energy_loss_function(gamma).to("J s-1")
energy_loss = (energy_loss_per_time * unit_time_interval).to("J")
new_energy = old_energy - energy_loss
if np.any(new_energy < 0):
raise ValueError("Energy loss formula returned value higher then original energy")
new_gamma = (new_energy / (m_e * c ** 2)).value
return new_gamma

gammas = self.gamma_values()
n_array = self.__call__(gammas)

# for each gamma point create a narrow bin, calculate the energy loss for start and end of the bin,
# and scale up density by the bin narrowing factor
for r in range(subintervals_count):
bin_size_factor = 0.0001
bins_width = gammas * bin_size_factor
bins_end_recalc = gamma_recalculated_after_loss(gammas + bins_width)
gammas = gamma_recalculated_after_loss(gammas)
narrowed_bins = bins_end_recalc - gammas
if np.any(narrowed_bins <= 0):
raise ValueError(
"Energy loss formula returned too big value - use shorter time intervals")
density_increase = bins_width / narrowed_bins
if np.any(density_increase < 1):
# not possible for simple scenarios because higher gammas should lose more energy,
# but maybe for more complex scenarios it could happen? then we should revise this assertion
raise AssertionError("Unexpected situation, bin has been widened instead of narrowed down")
n_array = n_array * density_increase

return InterpolatedDistribution(gammas, n_array)


class PowerLaw(ParticleDistribution):
r"""Class describing a power-law particle distribution.
Expand Down Expand Up @@ -255,87 +291,6 @@ def __call__(self, gamma):
def gamma_values(self):
return np.logspace(np.log10(self.gamma_min), np.log10(self.gamma_max), 200)

def evaluate_time(self, time, energy_loss_function, intervals=1):
class Bin:
def __init__(self, n, gamma_start, gamma_end):
self.n = n
self.gamma_start = gamma_start
self.gamma_end = gamma_end

gamma_values = self.gamma_values()
n_values = self.__call__(gamma_values)
bin_size_sqrt = math.sqrt(gamma_values[1]/gamma_values[0])
original_bins = []

# fill bins with initial data
for index, gamma_value in enumerate(gamma_values):
n = n_values[index]
if index == 0:
gamma_start = gamma_value / bin_size_sqrt
else:
gamma_start = original_bins[index-1].gamma_end
original_bins.append(Bin(n, gamma_start, gamma_value * bin_size_sqrt))

unit_time_interval = time / intervals

def gamma_recalculated_after_loss(gamma):
old_energy = (gamma * m_e * c ** 2).to("J")
energy_loss_per_time = energy_loss_function(gamma).to("J s-1")
energy_loss = (energy_loss_per_time * unit_time_interval).to("J")
new_energy = old_energy - energy_loss
if new_energy < 0:
raise ValueError(
"Energy loss formula returned value higher then original energy, for gamma " + "{:e}".format(gamma))
new_gamma = (new_energy / (m_e * c ** 2)).value
return new_gamma

# move and narrow down the bins according to energy loss function
translated_bins = original_bins.copy()
for r in range(intervals):
for index, bin in enumerate(translated_bins):
bin_integral = bin.n * (bin.gamma_end - bin.gamma_start)
if index == 0:
new_bin_start = gamma_recalculated_after_loss(bin.gamma_start)
else:
new_bin_start = translated_bins[index-1].gamma_end
new_bin_end = gamma_recalculated_after_loss(bin.gamma_end)
new_bin_width = new_bin_end - new_bin_start
if new_bin_width <= 0:
raise ValueError(
"Energy loss formula returned higher energy loss for bin-end than for bin-start;"
" use shorter time intervals to fix it")
new_n = bin_integral / (new_bin_width)
translated_bins[index] = (Bin(new_n, new_bin_start, new_bin_end))

# remap the translated bins onto the original gamma points
final_bins = []
unit = u.Unit("cm-3")
for bin in original_bins:
final_bins.append(Bin(Quantity(0) * unit, bin.gamma_start, bin.gamma_end))
for index, bin in enumerate(final_bins):
for translated_bin in translated_bins:
gamma_bin_min = bin.gamma_start
gamma_bin_max = bin.gamma_end
gamma_new_bin_min = translated_bin.gamma_start
gamma_new_bin_max = translated_bin.gamma_end
if gamma_new_bin_max > gamma_bin_min and gamma_new_bin_min < gamma_bin_max:
if gamma_new_bin_min < gamma_bin_min < gamma_new_bin_max:
overlap = gamma_new_bin_max - gamma_bin_min
elif gamma_new_bin_min < gamma_bin_max < gamma_new_bin_max:
overlap = gamma_bin_max - gamma_new_bin_min
elif gamma_bin_min <= gamma_new_bin_min and gamma_new_bin_max <= gamma_bin_max:
overlap = gamma_new_bin_max - gamma_new_bin_min
else:
raise AssertionError("Unexpected situation, bin has been widened instead of narrowed down")
n = overlap * translated_bin.n
final_bin = final_bins[index]
final_bin.n = final_bin.n + n
for bin in final_bins:
bin.n = bin.n / (bin.gamma_end - bin.gamma_start)


return gamma_values, np.array(list(map(lambda bin: bin.n.value, final_bins))) * unit

@staticmethod
def evaluate_SSA_integrand(gamma, k, p, gamma_min, gamma_max):
r"""Analytical integrand for the synchrotron self-absorption:
Expand Down Expand Up @@ -815,7 +770,7 @@ class InterpolatedDistribution(ParticleDistribution):
function to be used for integration, default is :class:`~numpy.trapz`
"""

def __init__(self, gamma, n, norm=1, mass=m_e, integrator=np.trapz):
def __init__(self, gamma, n, norm=1, mass=m_e, integrator=np.trapz, gamma_min=None, gamma_max=None):
super().__init__(mass, integrator, "InterpolatedDistribution")
if n.unit != u.Unit("cm-3"):
raise ValueError(
Expand All @@ -827,9 +782,9 @@ def __init__(self, gamma, n, norm=1, mass=m_e, integrator=np.trapz):
# scaling parameter
self.norm = norm
# perform the interpolation
self.log10_interp = self.log10_interpolation(gamma, n)
self.log10_interp = self.log10_interpolation(gamma, n, gamma_min, gamma_max)

def log10_interpolation(self, gamma, n):
def log10_interpolation(self, gamma, n, gamma_min=None, gamma_max=None):
"""Returns the function interpolating in log10 the particle spectrum as
a function of the Lorentz factor.
TODO: make possible to pass arguments to CubicSpline.
Expand All @@ -843,8 +798,8 @@ def log10_interpolation(self, gamma, n):

# min and max lorentz factor are now the first gamma values for which
# the input distribution is not null
self.gamma_min = np.min(_gamma)
self.gamma_max = np.max(_gamma)
self.gamma_min = gamma_min if gamma_min else np.min(_gamma)
self.gamma_max = gamma_max if gamma_max else np.max(_gamma)

return interpolator

Expand Down
48 changes: 47 additions & 1 deletion agnpy/spectra/tests/test_spectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
from pathlib import Path
import numpy as np
import astropy.units as u
from astropy.constants import m_e, m_p
from astropy.constants import m_e, m_p, c
import pytest
from astropy.coordinates import Distance

from agnpy import Blob, Synchrotron
from agnpy.spectra import (
PowerLaw,
BrokenPowerLaw,
Expand Down Expand Up @@ -740,3 +743,46 @@ def test_interpolation_physical(self):
assert u.allclose(
n_e(gamma_init), 2 * n_init, atol=0 * u.Unit("cm-3"), rtol=1e-3
)


class TestSpectraTimeEvolution:

@pytest.mark.parametrize("p", [0.5, 1.2, 2.4])
@pytest.mark.parametrize("time", [1, 60, 120] * u.s)
def test_compare_numerical_results_with_analytical_calculation(self, p, time):
"""Test time evolution of spectral electron density for synchrotron energy losses.
Use a simple power low spectrum for easy calculation of analytical results."""
gamma_min = 1e2
gamma_max = 1e7
k = 0.1 * u.Unit("cm-3")
initial_n_e = PowerLaw(k, p, gamma_min=gamma_min, gamma_max=gamma_max, mass=m_e)
blob = Blob(1e16 * u.cm, Distance(1e27, unit=u.cm).z, 0, 10, 1 * u.G, n_e=initial_n_e)
synch = Synchrotron(blob)
evaluated_n_e = initial_n_e.evaluate_time(time, synch.electron_energy_loss_per_time, subintervals_count=50)

def gamma_before(gamma_after_time, time):
"""Reverse-time calculation of the gamma value before the synchrotron energy loss,
using formula -dE/dt ~ (E ** 2)"""
coef = synch._electron_energy_loss_formula_prefix() / (m_e * c ** 2)
return 1 / ((1 / gamma_after_time) - time * coef)

def integral_analytical(gamma_min, gamma_max):
"""Integral for the power-law distribution"""
return k * (gamma_max ** (1 - p) - gamma_min ** (1 - p)) / (1 - p)

assert u.isclose(
gamma_before(evaluated_n_e.gamma_max, time),
gamma_max,
rtol=0.05
)
assert u.isclose(
evaluated_n_e.integrate(evaluated_n_e.gamma_min, evaluated_n_e.gamma_max),
integral_analytical(gamma_before(evaluated_n_e.gamma_min, time), gamma_before(evaluated_n_e.gamma_max, time)),
rtol=0.05
)
assert u.isclose(
# only the highest energy range where most losses occur
evaluated_n_e.integrate(evaluated_n_e.gamma_max / 10, evaluated_n_e.gamma_max),
integral_analytical(gamma_before(evaluated_n_e.gamma_max / 10, time), gamma_before(evaluated_n_e.gamma_max, time)),
rtol=0.05
)
7 changes: 5 additions & 2 deletions agnpy/synchrotron/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,13 @@ def electron_energy_loss_per_time(self, gamma):
# (4/3 * sigma_T * c * magn_energy_dens) * gamma^2
# The part in brackets is fixed, so as an improvement we could calculate it once and cache,
# and later we could use the formula (fixed * gamma^2)
magn_energy_dens = (self.blob.B.to("T") ** 2) / (2 * mu0)
fixed = (4 / 3) * sigma_T * c * magn_energy_dens
fixed = self._electron_energy_loss_formula_prefix()
return (fixed * gamma ** 2).to("erg/s")

def _electron_energy_loss_formula_prefix(self):
magn_energy_dens = (self.blob.B.to("T") ** 2) / (2 * mu0)
return (4 / 3) * sigma_T * c * magn_energy_dens

@staticmethod
def evaluate_tau_ssa(
nu,
Expand Down

0 comments on commit b11ed43

Please sign in to comment.