import os
import six
from time import time
import numpy as np
import mceq_config as config
from MCEq.misc import normalize_hadronic_model_name, info
from MCEq.particlemanager import ParticleManager
import MCEq.data
[docs]class MCEqRun(object):
"""Main class for handling the calculation.
This class is the main user interface for the caclulation. It will
handle initialization and various error/configuration checks. The
setup has to be accomplished before invoking the integration routine
is :func:`MCeqRun.solve`. Changes of configuration, such as:
- interaction model in :meth:`MCEqRun.set_interaction_model`,
- primary flux in :func:`MCEqRun.set_primary_model`,
- zenith angle in :func:`MCEqRun.set_theta_deg`,
- density profile in :func:`MCEqRun.set_density_model`,
- member particles of the special ``obs_`` group in :func:`MCEqRun.set_obs_particles`,
can be made on an active instance of this class, while calling
:func:`MCEqRun.solve` subsequently to calculate the solution
corresponding to the settings.
The result can be retrieved by calling :func:`MCEqRun.get_solution`.
Args:
interaction_model (string): PDG ID of the particle
density_model (string,sting,string): model type, location, season
primary_model (class, param_tuple): classes derived from
:class:`crflux.models.PrimaryFlux` and its parameters as tuple
theta_deg (float): zenith angle :math:`\\theta` in degrees,
measured positively from vertical direction
adv_set (dict): advanced settings, see :mod:`mceq_config`
obs_ids (list): list of particle name strings. Those lepton decay
products will be scored in the special ``obs_`` categories
"""
def __init__(self, interaction_model, primary_model, theta_deg, **kwargs):
self._mceq_db = MCEq.data.HDF5Backend()
interaction_model = normalize_hadronic_model_name(interaction_model)
# Save atmospheric parameters
self.density_config = kwargs.pop('density_model', config.density_model)
self.theta_deg = theta_deg
#: Interface to interaction tables of the HDF5 database
self._interactions = MCEq.data.Interactions(mceq_hdf_db=self._mceq_db)
#: handler for cross-section data of type :class:`MCEq.data.HadAirCrossSections`
self._int_cs = MCEq.data.InteractionCrossSections(
mceq_hdf_db=self._mceq_db)
#: handler for cross-section data of type :class:`MCEq.data.HadAirCrossSections`
self._cont_losses = MCEq.data.ContinuousLosses(mceq_hdf_db=self._mceq_db,
material=config.dedx_material)
#: Interface to decay tables of the HDF5 database
self._decays = MCEq.data.Decays(mceq_hdf_db=self._mceq_db)
#: Particle manager (initialized/updated in set_interaction_model)
self.pman = None
# Particle list to keep track of previously initialized particles
self._particle_list = None
# General Matrix dimensions and shortcuts, controlled by
# grid of yield matrices
self._energy_grid = self._mceq_db.energy_grid
# Initialize solution vector
self._solution = np.zeros(1)
# Initialize empty state (particle density) vector
self._phi0 = np.zeros(1)
# Initialize matrix builder (initialized in set_interaction_model)
self.matrix_builder = None
# Save initial condition (primary flux) to restore after dimensional resizing
self._restore_initial_condition = []
# Set interaction model and compute grids and matrices
self.set_interaction_model(
interaction_model,
particle_list = kwargs.pop('particle_list', None),
build_matrices = kwargs.pop('build_matrices', True)
)
# Default GPU device id for CUDA
self._cuda_device = kwargs.pop('cuda_gpu_id', config.cuda_gpu_id)
# Print particle list after tracking particles have been initialized
self.pman.print_particle_tables(2)
# Set atmosphere and geometry
self.integration_path, self.int_grid, self.grid_var = None, None, None
self.set_density_model(self.density_config)
# Set initial flux condition
if primary_model is not None:
self.set_primary_model(*primary_model)
@property
def e_grid(self):
"""Energy grid (bin centers)"""
return self._energy_grid.c
@property
def e_bins(self):
"""Energy grid (bin edges)"""
return self._energy_grid.b
@property
def e_widths(self):
"""Energy grid (bin widths)"""
return self._energy_grid.w
@property
def dim(self):
"""Energy grid (dimension)"""
return self._energy_grid.d
@property
def dim_states(self):
"""Number of cascade particles times dimension of grid
(dimension of the equation system)"""
return self.pman.dim_states
[docs] def closest_energy(self, kin_energy):
"""Convenience function to obtain the nearest grid energy
to the `energy` argument, provided as kinetik energy in lab. frame."""
eidx = (np.abs(self._energy_grid.c - kin_energy)).argmin()
return self._energy_grid.c[eidx]
[docs] def get_solution(self,
particle_name,
mag=0.,
grid_idx=None,
integrate=False,
return_as=config.return_as):
"""Retrieves solution of the calculation on the energy grid.
Some special prefixes are accepted for lepton names:
- the total flux of muons, muon neutrinos etc. from all sources/mothers
can be retrieved by the prefix ``total_``, i.e. ``total_numu``
- the conventional flux of muons, muon neutrinos etc. from all sources
can be retrieved by the prefix ``conv_``, i.e. ``conv_numu``
- correspondigly, the flux of leptons which originated from the decay
of a charged pion carries the prefix ``pi_`` and from a kaon ``k_``
- conventional leptons originating neither from pion nor from kaon
decay are collected in a category without any prefix, e.g. ``numu`` or
``mu+``
Args:
particle_name (str): The name of the particle such, e.g.
``total_mu+`` for the total flux spectrum of positive muons or
``pr_antinumu`` for the flux spectrum of prompt anti muon neutrinos
mag (float, optional): 'magnification factor': the solution is
multiplied by ``sol`` :math:`= \\Phi \\cdot E^{mag}`
grid_idx (int, optional): if the integrator has been configured to save
intermediate solutions on a depth grid, then ``grid_idx`` specifies
the index of the depth grid for which the solution is retrieved. If
not specified the flux at the surface is returned
integrate (bool, optional): return averge particle number instead of
flux (multiply by bin width)
Returns:
(numpy.array): flux of particles on energy grid :attr:`e_grid`
"""
res = np.zeros(self._energy_grid.d)
ref = self.pman.pname2pref
sol = None
if grid_idx is not None and len(self.grid_sol) == 0:
raise Exception(
'Solution not has not been computed on grid. Check input.')
if grid_idx is None:
sol = np.copy(self._solution)
elif grid_idx >= len(self.grid_sol) or grid_idx is None:
sol = self.grid_sol[-1, :]
else:
sol = self.grid_sol[grid_idx, :]
def sum_lr(lep_str, prefix):
result = np.zeros(self.dim)
nsuccess = 0
for ls in lep_str, lep_str + '_l', lep_str + '_r':
if prefix + ls not in ref:
info(
15, 'No separate left and right handed particles,',
'or, unavailable particle prefix {0}.'.format(prefix +
ls))
continue
result += sol[ref[prefix + ls].lidx:ref[prefix + ls].uidx]
nsuccess += 1
if nsuccess == 0 and config.excpt_on_missing_particle:
raise Exception(
'Requested particle {0} not found.'.format(particle_name))
return result
lep_str = particle_name.split(
'_')[1] if '_' in particle_name else particle_name
if particle_name.startswith('total_'):
# Note: This has changed from previous MCEq versions,
# since pi_ and k_ prefixes are mere tracking counters
# and no full particle species anymore
res = sum_lr(lep_str, prefix='')
elif particle_name.startswith('conv_'):
# Note: This changed from previous MCEq versions,
# conventional is defined as total - prompt
res = (self.get_solution('total_' + lep_str,
mag=0,
grid_idx=grid_idx,
integrate=False,
return_as='kinetic energy') -
self.get_solution('pr_' + lep_str,
mag=0,
grid_idx=grid_idx,
integrate=False,
return_as='kinetic energy'))
elif particle_name.startswith('pr_'):
if 'prcas_' + lep_str in ref:
res += sum_lr(lep_str, prefix='prcas_')
if 'prres_' + lep_str in ref:
res += sum_lr(lep_str, prefix='prres_')
if 'em_' + lep_str in ref:
res += sum_lr(lep_str, prefix='em_')
else:
try:
res = sum_lr(particle_name, prefix='')
except KeyError:
info(10,
'Requested particle {0} not found.'.format(particle_name))
# When returning in Etot, interpolate on different grid
if return_as == 'total energy':
etot_bins = self.e_bins + ref[lep_str].mass
etot_grid = np.sqrt(etot_bins[1:] * etot_bins[:-1])
if not integrate:
return etot_grid, res * etot_grid**mag
else:
return etot_grid, res * etot_grid**mag * (etot_bins[1:] -
etot_bins[:-1])
elif return_as == 'kinetic energy':
if not integrate:
return res * self._energy_grid.c**mag
else:
return res * self._energy_grid.c**mag * self._energy_grid.w
elif return_as == 'total momentum':
ptot_bins = np.sqrt((self.e_bins + ref[lep_str].mass)**2 -
ref[lep_str].mass**2)
ptot_grid = np.sqrt(ptot_bins[1:] * ptot_bins[:-1])
dEkindp = ptot_grid / np.sqrt(ptot_grid**2 + ref[lep_str].mass**2)
res *= dEkindp
if not integrate:
return ptot_grid, res * ptot_grid**mag
else:
return ptot_grid, res * ptot_grid**mag * (ptot_bins[1:] -
ptot_bins[:-1])
else:
raise Exception(
"Unknown 'return_as' variable choice.",
'the options are "kinetic energy", "total energy", "total momentum"'
)
[docs] def set_interaction_model(self,
interaction_model,
particle_list=None,
update_particle_list=True,
force=False,
build_matrices=True):
"""Sets interaction model and/or an external charm model for calculation.
Decay and interaction matrix will be regenerated automatically
after performing this call.
Args:
interaction_model (str): name of interaction model
charm_model (str, optional): name of charm model
force (bool): force loading interaction model
"""
interaction_model = normalize_hadronic_model_name(interaction_model)
info(1, interaction_model)
if not force and (self._interactions.iam == interaction_model
) and particle_list != self._particle_list:
info(2, 'Skip, since current model identical to',
interaction_model + '.')
return
self._int_cs.load(interaction_model)
# TODO: simplify this, stuff not needed anymore
if not update_particle_list and self._particle_list is not None:
info(10, 'Re-using particle list.')
self._interactions.load(interaction_model,
parent_list=self._particle_list)
self.pman.set_interaction_model(self._int_cs, self._interactions)
self.pman.set_decay_channels(self._decays)
self.pman.set_continuous_losses(self._cont_losses)
elif self._particle_list is None:
info(10, 'New initialization of particle list.')
# First initialization
if particle_list is None:
self._interactions.load(interaction_model)
else:
self._interactions.load(interaction_model,
parent_list=particle_list)
self._decays.load(parent_list=self._interactions.particles)
self._particle_list = self._interactions.particles + self._decays.particles
# Create particle database
self.pman = ParticleManager(self._particle_list, self._energy_grid,
self._int_cs)
self.pman.set_interaction_model(self._int_cs, self._interactions)
self.pman.set_decay_channels(self._decays)
self.pman.set_continuous_losses(self._cont_losses)
self.matrix_builder = MatrixBuilder(self.pman)
elif (update_particle_list and particle_list != self._particle_list):
info(10, 'Updating particle list.')
# Updated particle list received
if particle_list is None:
self._interactions.load(interaction_model)
else:
self._interactions.load(interaction_model,
parent_list=particle_list)
self._decays.load(parent_list=self._interactions.particles)
self._particle_list = self._interactions.particles + self._decays.particles
self.pman.set_interaction_model(
self._int_cs,
self._interactions,
updated_parent_list=self._particle_list)
self.pman.set_decay_channels(self._decays)
self.pman.set_continuous_losses(self._cont_losses)
else:
raise Exception('Should not happen in practice.')
self._resize_vectors_and_restore()
# initialize matrices
if not build_matrices:
return
self.int_m, self.dec_m = self.matrix_builder.construct_matrices(
skip_decay_matrix=False)
def _resize_vectors_and_restore(self):
"""Update solution and grid vectors if the number of particle species
or the interaction models change. The previous state, such as the
initial spectrum, are restored."""
# Update dimensions if particle dimensions changed
self._phi0 = np.zeros(self.dim_states)
self._solution = np.zeros(self.dim_states)
# Restore insital condition if present
if len(self._restore_initial_condition) > 0:
for con in self._restore_initial_condition:
con[0](*con[1:])
[docs] def set_primary_model(self, mclass, tag):
"""Sets primary flux model.
This functions is quick and does not require re-generation of
matrices.
Args:
interaction_model (:class:`CRFluxModel.PrimaryFlux`): reference
to primary model **class**
tag (tuple): positional argument list for model class
"""
info(1, mclass.__name__, tag if tag is not None else '')
# Save primary flux model for restauration after interaction model changes
self._restore_initial_condition = [
(self.set_primary_model, mclass, tag)]
# Initialize primary model object
self.pmodel = mclass(tag)
self.get_nucleon_spectrum = np.vectorize(self.pmodel.p_and_n_flux)
try:
self.dim_states
except AttributeError:
self.finalize_pmodel = True
# Save initial condition
minimal_energy = 3.
if (2212, 0) in self.pman:
e_tot = self._energy_grid.c + self.pman[(2212, 0)].mass
else:
info(
10,
'No protons in eqn system, quering primary flux with kinetic energy.'
)
e_tot = self._energy_grid.c
min_idx = np.argmin(np.abs(e_tot - minimal_energy))
self._phi0 *= 0
p_top, n_top = self.get_nucleon_spectrum(e_tot[min_idx:])[1:]
if (2212, 0) in self.pman:
self._phi0[min_idx + self.pman[(2212, 0)].lidx:self.pman[(
2212, 0)].uidx] = 1e-4 * p_top
else:
info(
1,
'Warning protons not part of equation system, can not set primary flux.'
)
if (2112, 0) in self.pman and not self.pman[(2112, 0)].is_resonance:
self._phi0[min_idx + self.pman[(2112, 0)].lidx:self.pman[(
2112, 0)].uidx] = 1e-4 * n_top
elif (2212, 0) in self.pman:
info(2, 'Neutrons not part of equation system,',
'substituting initial flux with protons.')
self._phi0[min_idx + self.pman[(2212, 0)].lidx:self.pman[(
2212, 0)].uidx] += 1e-4 * n_top
[docs] def set_single_primary_particle(self, E, corsika_id=None, pdg_id=None, append=False):
"""Set type and kinetic energy of a single primary nucleus to
calculation of particle yields.
The functions uses the superposition theorem, where the flux of
a nucleus with mass A and charge Z is modeled by using Z protons
and A-Z neutrons at energy :math:`E_{nucleon}= E_{nucleus} / A`
The nucleus type is defined via :math:`\\text{CORSIKA ID} = A*100 + Z`. For
example iron has the CORSIKA ID 5226.
Single leptons or hadrons can be defined by specifiying `pdg_id` instead of
`corsika_id`.
The `append` argument can be used to compose an initial state with
multiple particles. If it is `False` the initial condition is reset to zero
before adding the particle.
A continuous input energy range is allowed between
:math:`50*A~ \\text{GeV} < E_\\text{nucleus} < 10^{10}*A \\text{GeV}`.
Args:
E (float): kinetic energy of a nucleus in GeV
corsika_id (int): ID of a nucleus (see text)
pdg_id (int): PDG ID of a particle
append (bool): If True, keep previous state and append a new particle.
"""
import warnings
from scipy.linalg import solve
from MCEq.misc import getAZN_corsika, getAZN
if corsika_id and pdg_id:
raise Exception('Provide either corsika or PDG ID')
info(
2, 'CORSIKA ID {0}, PDG ID {1}, energy {2:5.3g} GeV'.format(
corsika_id, pdg_id, E))
if append == False:
self._restore_initial_condition = [(self.set_single_primary_particle, E,
corsika_id, pdg_id)]
self._phi0 *= 0.
else:
self._restore_initial_condition.append((self.set_single_primary_particle, E,
corsika_id, pdg_id))
egrid = self._energy_grid.c
ebins = self._energy_grid.b
ewidths = self._energy_grid.w
if corsika_id:
n_nucleons, n_protons, n_neutrons = getAZN_corsika(corsika_id)
elif pdg_id:
n_nucleons, n_protons, n_neutrons = getAZN(pdg_id)
En = E / float(n_nucleons) if n_nucleons > 0 else E
if En < np.min(self._energy_grid.c):
raise Exception('energy per nucleon too low for primary ' +
str(corsika_id))
info(3, ('superposition: n_protons={0}, n_neutrons={1}, ' +
'energy per nucleon={2:5.3g} GeV').format(
n_protons, n_neutrons, En))
cenbin = np.argwhere(En < ebins)[0][0] - 1
# Equalize the first three moments for 3 normalizations around the central
# bin
emat = np.vstack(
(ewidths[cenbin - 1:cenbin + 2],
ewidths[cenbin - 1:cenbin + 2] * egrid[cenbin - 1:cenbin + 2],
ewidths[cenbin - 1:cenbin + 2] * egrid[cenbin - 1:cenbin + 2]**2))
if n_nucleons == 0:
# This case handles other exotic projectiles
b_particle = np.array([1., En, En**2])
lidx = self.pman[pdg_id].lidx
with warnings.catch_warnings():
warnings.simplefilter('ignore')
self._phi0[lidx + cenbin - 1:lidx + cenbin + 2] += solve(
emat, b_particle)
return
if n_protons > 0:
b_protons = np.array(
[n_protons, En * n_protons, En**2 * n_protons])
p_lidx = self.pman[2212].lidx
with warnings.catch_warnings():
warnings.simplefilter('ignore')
self._phi0[p_lidx + cenbin - 1:p_lidx + cenbin + 2] += solve(
emat, b_protons)
if n_neutrons > 0:
b_neutrons = np.array(
[n_neutrons, En * n_neutrons, En**2 * n_neutrons])
n_lidx = self.pman[2112].lidx
with warnings.catch_warnings():
warnings.simplefilter('ignore')
self._phi0[n_lidx + cenbin - 1:n_lidx + cenbin + 2] += solve(
emat, b_neutrons)
[docs] def set_initial_spectrum(self, spectrum, pdg_id=None, append=False):
"""Set a user-defined spectrum for an arbitrary species as initial condition.
This function is an equivalent to :func:`set_single_primary_particle`. It
allows to define an arbitrary spectrum for each available particle species
as initial condition for the integration. Set the `append` argument to `True`
for subsequent species to define initial spectra combined from different particles.
The (differential) spectrum has to be distributed on the energy grid as dN/dptot, i.e.
divided by the bin widths and with the total momentum units in GeV(/c).
Args:
spectrum (np.array): spectrum dN/dptot
pdg_id (int): PDG ID in case of a particle
"""
from MCEq.misc import getAZN_corsika, getAZN
info(2, 'PDG ID {0}'.format(pdg_id))
if not append:
self._restore_initial_condition = [(self.set_initial_spectrum,
pdg_id, append)]
self._phi0 *= 0
else:
self._restore_initial_condition.append((self.set_initial_spectrum,
pdg_id, append))
egrid = self._energy_grid.c
ebins = self._energy_grid.b
ewidths = self._energy_grid.w
if len(spectrum) != self.dim:
raise Exception(
'Lengths of spectrum and energy grid do not match.')
self._phi0[self.pman[pdg_id].lidx:self.pman[pdg_id].uidx] += spectrum
[docs] def set_density_model(self, density_config):
"""Sets model of the atmosphere.
To choose, for example, a CORSIKA parametrization for the Southpole in January,
do the following::
mceq_instance.set_density_model(('CORSIKA', ('PL_SouthPole', 'January')))
More details about the choices can be found in :mod:`MCEq.geometry.density_profiles`. Calling
this method will issue a recalculation of the interpolation and the integration path.
From version 1.2 and above, the `density_config` parameter can be a reference to
an instance of a density class directly. The class has to be derived either from
:class:`MCEq.geometry.density_profiles.EarthsAtmosphere` or
:class:`MCEq.geometry.density_profiles.GeneralizedTarget`.
Args:
density_config (tuple of strings): (parametrization type, arguments)
"""
import MCEq.geometry.density_profiles as dprof
# Check if string arguments or an instance of the density class is provided
if not isinstance(density_config, (dprof.EarthsAtmosphere, dprof.GeneralizedTarget)):
base_model, model_config = density_config
available_models = [
'MSIS00', 'MSIS00_IC', 'CORSIKA', 'AIRS', 'Isothermal',
'GeneralizedTarget'
]
if base_model not in available_models:
info(0, 'Unknown density model. Available choices are:\n',
'\n'.join(available_models))
raise Exception('Choose a different profile.')
info(1, 'Setting density profile to', base_model, model_config)
if base_model == 'MSIS00':
self.density_model = dprof.MSIS00Atmosphere(*model_config)
elif base_model == 'MSIS00_IC':
self.density_model = dprof.MSIS00IceCubeCentered(*model_config)
elif base_model == 'CORSIKA':
self.density_model = dprof.CorsikaAtmosphere(*model_config)
elif base_model == 'AIRS':
self.density_model = dprof.AIRSAtmosphere(*model_config)
elif base_model == 'Isothermal':
self.density_model = dprof.IsothermalAtmosphere(*model_config)
elif base_model == 'GeneralizedTarget':
self.density_model = dprof.GeneralizedTarget()
else:
raise Exception('Unknown atmospheric base model.')
self.density_config = density_config
else:
self.density_model = density_config
self.density_config = density_config
if self.theta_deg is not None and isinstance(self.density_model, dprof.EarthsAtmosphere):
self.set_theta_deg(self.theta_deg)
elif isinstance(self.density_model, dprof.GeneralizedTarget):
self.integration_path = None
else:
raise Exception('Density model not supported.')
# TODO: Make the pman aware of that density might have changed and
# indices as well
# self.pmod._gen_list_of_particles()
[docs] def set_theta_deg(self, theta_deg):
"""Sets zenith angle :math:`\\theta` as seen from a detector.
Currently only 'down-going' angles (0-90 degrees) are supported.
Args:
theta_deg (float): zenith angle in the range 0-90 degrees
"""
import MCEq.geometry.density_profiles as dprof
info(2, 'Zenith angle {0:6.2f}'.format(theta_deg))
if isinstance(self.density_model, dprof.GeneralizedTarget):
raise Exception('GeneralizedTarget does not support angles.')
if self.density_model.theta_deg == theta_deg:
info(2,
'Theta selection correponds to cached value, skipping calc.')
return
self.density_model.set_theta(theta_deg)
self.integration_path = None
[docs] def set_mod_pprod(self,
prim_pdg,
sec_pdg,
x_func,
x_func_args,
delay_init=False):
"""Sets combination of projectile/secondary for error propagation.
The production spectrum of ``sec_pdg`` in interactions of
``prim_pdg`` is modified according to the function passed to
:func:`InteractionYields.init_mod_matrix`
Args:
prim_pdg (int): interacting (primary) particle PDG ID
sec_pdg (int): secondary particle PDG ID
x_func (object): reference to function
x_func_args (tuple): arguments passed to ``x_func``
delay_init (bool): Prevent init of mceq matrices if you are
planning to add more modifications
"""
info(
1, '{0}/{1}, {2}, {3}'.format(prim_pdg, sec_pdg, x_func.__name__,
str(x_func_args)))
init = self._interactions._set_mod_pprod(prim_pdg, sec_pdg, x_func,
x_func_args)
# Need to regenerate matrices completely
return int(init)
[docs] def unset_mod_pprod(self, dont_fill=False):
"""Removes modifications from :func:`MCEqRun.set_mod_pprod`.
Args:
skip_fill (bool): If `true` do not regenerate matrices
(has to be done at a later step by hand)
"""
from collections import defaultdict
info(1, 'Particle production modifications reset to defaults.')
self._interactions.mod_pprod = defaultdict(lambda: {})
# Need to regenerate matrices completely
if not dont_fill:
self.regenerate_matrices()
[docs] def regenerate_matrices(self, skip_decay_matrix=False):
"""Call this function after applying particle prod. modifications aka
Barr parameters"""
# TODO: Not all particles need to be reset and there is some performance loss
# This can be optmized by refreshing only the particles that change or through
# lazy evaluation, i.e. hadronic channels dict. calls data.int..get_matrix on demand
self.pman.set_interaction_model(self._int_cs,
self._interactions,
force=True)
self.int_m, self.dec_m = self.matrix_builder.construct_matrices(
skip_decay_matrix=skip_decay_matrix)
[docs] def solve(self, int_grid=None, grid_var='X', **kwargs):
"""Launches the solver.
The setting `integrator` in the config file decides which solver
to launch.
Args:
int_grid (list): list of depths at which results are recorded
grid_var (str): Can be depth `X` or something else (currently only `X` supported)
kwargs (dict): Arguments are passed directly to the solver methods.
"""
info(2, "Launching {0} solver".format(config.integrator))
if not kwargs.pop('skip_integration_path', False):
if int_grid is not None and np.any(np.diff(int_grid) < 0):
raise Exception('The X values in int_grid are required to be strickly',
'increasing.')
# Calculate integration path if not yet happened
self._calculate_integration_path(int_grid, grid_var)
else:
info(2,'Warning: integration path calculation skipped.')
phi0 = np.copy(self._phi0)
nsteps, dX, rho_inv, grid_idcs = self.integration_path
info(2, 'for {0} integration steps.'.format(nsteps))
import MCEq.solvers
start = time()
if config.kernel_config.lower() == 'numpy':
kernel = MCEq.solvers.solv_numpy
args = (nsteps, dX, rho_inv, self.int_m, self.dec_m, phi0,
grid_idcs)
elif (config.kernel_config.lower() == 'cuda'):
kernel = MCEq.solvers.solv_CUDA_sparse
try:
self.cuda_context.set_matrices(self.int_m, self.dec_m)
except AttributeError:
from MCEq.solvers import CUDASparseContext
self.cuda_context = CUDASparseContext(
self.int_m, self.dec_m, device_id=self._cuda_device)
args = (nsteps, dX, rho_inv, self.cuda_context, phi0, grid_idcs)
elif (config.kernel_config.lower() == 'mkl'):
kernel = MCEq.solvers.solv_MKL_sparse
args = (nsteps, dX, rho_inv, self.int_m, self.dec_m, phi0,
grid_idcs)
else:
raise Exception(
"Unsupported integrator setting '{0}'.".format(
config.kernel_config))
self._solution, self.grid_sol = kernel(*args)
info(
2, 'time elapsed during integration: {0:5.2f}sec'.format(time() -
start))
def _calculate_integration_path(self, int_grid, grid_var, force=False):
if (self.integration_path and np.alltrue(int_grid == self.int_grid)
and np.alltrue(self.grid_var == grid_var) and not force):
info(5, 'skipping calculation.')
return
self.int_grid, self.grid_var = int_grid, grid_var
if grid_var != 'X':
raise NotImplementedError(
'the choice of grid variable other than the depth X are not possible, yet.'
)
max_X = self.density_model.max_X
ri = self.density_model.r_X2rho
max_lint = self.matrix_builder.max_lint
max_ldec = self.matrix_builder.max_ldec
info(2, 'X_surface = {0:7.2f}g/cm2'.format(max_X))
dX_vec = []
rho_inv_vec = []
X = 0.0
step = 0
grid_step = 0
grid_idcs = []
if True or (max_ldec / self.density_model.max_den > max_lint
and config.leading_process == 'decays'):
info(3, "using decays as leading eigenvalues")
def delta_X(X):
return config.stability_margin / (max_ldec * ri(X))
elif config.leading_process == 'interactions':
info(2, "using interactions as leading eigenvalues")
def delta_X(X):
return config.stability_margin / max_lint
else:
def delta_X(X):
dX = min(
config.stability_margin / (max_ldec * ri(X)),
config.stability_margin / max_lint)
# if dX/self.density_model.max_X < 1e-7:
# raise Exception(
# 'Stiffness warning: dX <= 1e-7. Check configuration or' +
# 'manually call MCEqRun._calculate_integration_path(int_grid, "X", force=True).')
return dX
dXmax = config.dXmax
while X < max_X:
dX = min(delta_X(X), dXmax)
if (np.any(int_grid) and (grid_step < len(int_grid))
and (X + dX >= int_grid[grid_step])):
dX = int_grid[grid_step] - X
grid_idcs.append(step)
grid_step += 1
dX_vec.append(dX)
rho_inv_vec.append(ri(X))
X = X + dX
step += 1
# Integrate
dX_vec = np.array(dX_vec)
rho_inv_vec = np.array(rho_inv_vec)
self.integration_path = len(dX_vec), dX_vec, rho_inv_vec, grid_idcs
[docs] def n_particles(self, label, grid_idx=None, min_energy_cutoff=1e-1):
"""Returns number of particles of type `label` at a grid step above
an energy threshold for counting.
Args:
label (str): Particle name
grid_idx (int): Depth grid index (for profiles)
min_energy_cutoff (float): Energy threshold > mceq_config.e_min
"""
ie_min = np.argmin(
np.abs(self.e_bins -
self.e_bins[self.e_bins >= min_energy_cutoff][0]))
info(
10,
'Energy cutoff for particle number calculation {0:4.3e} GeV'.format(
self.e_bins[ie_min]))
info(
15,
'First bin is between {0:3.2e} and {1:3.2e} with midpoint {2:3.2e}'
.format(self.e_bins[ie_min], self.e_bins[ie_min + 1],
self.e_grid[ie_min]))
return np.sum(
self.get_solution(label, mag=0, integrate=True, grid_idx=grid_idx)[ie_min:])
[docs] def n_mu(self, grid_idx=None, min_energy_cutoff=1e-1):
"""Returns the number of positive and negative muons at a grid step above
`min_energy_cutoff`.
Args:
grid_idx (int): Depth grid index (for profiles)
min_energy_cutoff (float): Energy threshold > mceq_config.e_min
"""
return (self.n_particles('total_mu+', grid_idx=grid_idx, min_energy_cutoff=min_energy_cutoff) +
self.n_particles('total_mu-', grid_idx=grid_idx, min_energy_cutoff=min_energy_cutoff))
[docs] def n_e(self, grid_idx=None, min_energy_cutoff=1e-1):
"""Returns the number of electrons plus positrons at a grid step above
`min_energy_cutoff`.
Args:
grid_idx (int): Depth grid index (for profiles)
min_energy_cutoff (float): Energy threshold > mceq_config.e_min
"""
return (self.n_particles('e+', grid_idx=grid_idx, min_energy_cutoff=min_energy_cutoff) +
self.n_particles('e-', grid_idx=grid_idx, min_energy_cutoff=min_energy_cutoff))
[docs] def z_factor(self, projectile_pdg, secondary_pdg, definition='primary_e'):
"""Energy dependent Z-factor according to Thunman et al. (1996)"""
proj = self.pman[projectile_pdg]
sec = self.pman[secondary_pdg]
if not proj.is_projectile:
raise Exception('{0} is not a projectile particle.'.format(
proj.name))
info(
10, 'Computing e-dependent Zfactor for {0} -> {1}'.format(
proj.name, sec.name))
if not proj.is_secondary(sec):
raise Exception('{0} is not a secondary particle of {1}.'.format(
sec.name, proj.name))
if proj == 2112:
nuc_flux = self.pmodel.p_and_n_flux(self.e_grid)[2]
else:
nuc_flux = self.pmodel.p_and_n_flux(self.e_grid)[1]
zfac = np.zeros(self.dim)
smat = proj.hadr_yields[sec]
proj_cs = proj.inel_cross_section()
zfac = np.zeros_like(self.e_grid)
# Definition wrt CR energy (different from Thunman) on x-axis
if definition == 'primary_e':
min_energy = 2.
for p_eidx, e in enumerate(self.e_grid):
if e < min_energy:
min_idx = p_eidx + 1
continue
zfac[p_eidx] = np.sum(
smat[min_idx:p_eidx + 1, p_eidx] * nuc_flux[p_eidx] /
nuc_flux[min_idx:p_eidx + 1] * proj_cs[p_eidx] /
proj_cs[min_idx:p_eidx + 1])
return zfac
else:
# Like in Thunman et al. 1996
for p_eidx, _ in enumerate(self.e_grid):
zfac[p_eidx] = np.sum(smat[p_eidx, p_eidx:] *
nuc_flux[p_eidx:] / nuc_flux[p_eidx] *
proj_cs[p_eidx:] / proj_cs[p_eidx])
return zfac
[docs] def decay_z_factor(self, parent_pdg, child_pdg):
"""Energy dependent Z-factor according to Lipari (1993)."""
proj = self.pman[parent_pdg]
sec = self.pman[child_pdg]
if proj.is_stable:
raise Exception('{0} does not decay.'.format(proj.name))
info(
10, 'Computing e-dependent decay Zfactor for {0} -> {1}'.format(
proj.name, sec.name))
if not proj.is_child(sec):
raise Exception('{0} is not a a child particle of {1}.'.format(
sec.name, proj.name))
cr_gamma = self.pmodel.nucleon_gamma(self.e_grid)
zfac = np.zeros(self.dim)
zfac = np.zeros_like(self.e_grid)
for p_eidx, e in enumerate(self.e_grid):
# if e < min_energy:
# min_idx = p_eidx + 1
# continue
xlab, xdist = proj.dNdec_dxlab(e, sec)
zfac[p_eidx] = np.trapz(xlab**(-cr_gamma[p_eidx] - 2.) * xdist,
x=xlab)
return zfac
[docs]class MatrixBuilder(object):
"""This class constructs the interaction and decay matrices."""
def __init__(self, particle_manager):
self._pman = particle_manager
self._energy_grid = self._pman._energy_grid
self.int_m = None
self.dec_m = None
self._construct_differential_operator()
[docs] def construct_matrices(self, skip_decay_matrix=False):
r"""Constructs the matrices for calculation.
These are:
- :math:`\boldsymbol{M}_{int} = (-\boldsymbol{1} + \boldsymbol{C}){\boldsymbol{\Lambda}}_{int}`,
- :math:`\boldsymbol{M}_{dec} = (-\boldsymbol{1} + \boldsymbol{D}){\boldsymbol{\Lambda}}_{dec}`.
For debug_levels >= 2 some general information about matrix shape and the number of
non-zero elements is printed. The intermediate matrices :math:`\boldsymbol{C}` and
:math:`\boldsymbol{D}` are deleted afterwards to save memory.
Set the ``skip_decay_matrix`` flag to avoid recreating the decay matrix. This is not necessary
if, for example, particle production is modified, or the interaction model is changed.
Args:
skip_decay_matrix (bool): Omit re-creating D matrix
"""
from itertools import product
info(
3, "Start filling matrices. Skip_decay_matrix = {0}".format(
skip_decay_matrix))
self._fill_matrices(skip_decay_matrix=skip_decay_matrix)
cparts = self._pman.cascade_particles
# interaction part
# -I + C
# In first interaction mode it is just C
self.max_lint = 0.
for parent, child in product(cparts, cparts):
idx = (child.mceqidx, parent.mceqidx)
# Main diagonal
if child.mceqidx == parent.mceqidx and parent.can_interact:
# Subtract unity from the main diagonals
info(10, 'subtracting main C diagonal from', child.name,
parent.name)
self.C_blocks[idx][np.diag_indices(self.dim)] -= 1.
if idx in self.C_blocks:
# Multiply with Lambda_int and keep track the maximal
# interaction length for the calculation of integration steps
self.max_lint = np.max([
self.max_lint,
np.max(parent.inverse_interaction_length())
])
self.C_blocks[idx] *= parent.inverse_interaction_length()
if child.mceqidx == parent.mceqidx and parent.has_contloss:
if config.enable_muon_energy_loss and abs(
parent.pdg_id[0]) == 13:
info(5, 'Cont. loss for', parent.name)
self.C_blocks[idx] += self.cont_loss_operator(
parent.pdg_id)
if config.enable_em_ion and abs(parent.pdg_id[0]) == 11:
info(5, 'Cont. loss for', parent.name)
self.C_blocks[idx] += self.cont_loss_operator(
parent.pdg_id)
self.int_m = self._csr_from_blocks(self.C_blocks)
# -I + D
if not skip_decay_matrix or self.dec_m is None:
self.max_ldec = 0.
for parent, child in product(cparts, cparts):
idx = (child.mceqidx, parent.mceqidx)
# Main diagonal
if child.mceqidx == parent.mceqidx and not parent.is_stable:
# Subtract unity from the main diagonals
info(10, 'subtracting main D diagonal from', child.name,
parent.name)
self.D_blocks[idx][np.diag_indices(self.dim)] -= 1.
if idx not in self.D_blocks:
info(25, parent.pdg_id[0], child.pdg_id, 'not in D_blocks')
continue
# Multiply with Lambda_dec and keep track of the
# maximal decay length for the calculation of integration steps
self.max_ldec = max(
[self.max_ldec,
np.max(parent.inverse_decay_length())])
self.D_blocks[idx] *= parent.inverse_decay_length()
self.dec_m = self._csr_from_blocks(self.D_blocks)
for mname, mat in [('C', self.int_m), ('D', self.dec_m)]:
mat_density = (float(mat.nnz) / float(np.prod(mat.shape)))
info(5, "{0} Matrix info:".format(mname))
info(5, " density : {0:3.2%}".format(mat_density))
info(5, " shape : {0} x {1}".format(*mat.shape))
info(5, " nnz : {0}".format(mat.nnz))
info(10, " sum :", mat.sum())
info(3, "Done filling matrices.")
return self.int_m, self.dec_m
def _average_operator(self, op_mat):
"""Averages the continuous loss operator by performing
1/max_step explicit euler steps"""
n_steps = int(1. / config.loss_step_for_average)
info(
10,
'Averaging continuous loss using {0} intermediate steps.'.format(
n_steps))
op_step = np.eye(
self._energy_grid.d) + op_mat * config.loss_step_for_average
return np.linalg.matrix_power(op_step, n_steps) - np.eye(
self._energy_grid.d)
[docs] def cont_loss_operator(self, pdg_id):
"""Returns continuous loss operator that can be summed with appropriate
position in the C matrix."""
op_mat = -np.diag(1 / self._energy_grid.c).dot(
self.op_matrix.dot(np.diag(self._pman[pdg_id].dEdX)))
if config.average_loss_operator:
return self._average_operator(op_mat)
else:
return op_mat
@property
def dim(self):
"""Energy grid (dimension)"""
return self._pman.dim
@property
def dim_states(self):
"""Number of cascade particles times dimension of grid
(dimension of the equation system)"""
return self._pman.dim_states
def _zero_mat(self):
"""Returns a new square zero valued matrix with dimensions of grid.
"""
return np.zeros((self._pman.dim, self._pman.dim))
def _csr_from_blocks(self, blocks):
"""Construct a csr matrix from a dictionary of submatrices (blocks)
Note::
It's super pain the a** to construct a properly indexed sparse matrix
directly from the blocks, since bmat totally messes up the order.
"""
from scipy.sparse import csr_matrix
new_mat = np.zeros((self.dim_states, self.dim_states))
for (c, p), d in six.iteritems(blocks):
rc, rp = self._pman.mceqidx2pref[c], self._pman.mceqidx2pref[p]
try:
new_mat[rc.lidx:rc.uidx, rp.lidx:rp.uidx] = d
except ValueError:
raise Exception(
'Dimension mismatch: matrix {0}x{1}, p={2}:({3},{4}), c={5}:({6},{7})'
.format(self.dim_states, self.dim_states, rp.name, rp.lidx,
rp.uidx, rc.name, rc.lidx, rc.uidx))
return csr_matrix(new_mat)
def _follow_chains(self, p, pprod_mat, p_orig, idcs, propmat, reclev=0):
"""Some recursive magic.
"""
info(40, reclev * '\t', 'entering with', p.name)
# print 'orig, p', p_orig.pdg_id, p.pdg_id
for d in p.children:
info(40, reclev * '\t', 'following to', d.name)
if not d.is_resonance:
# print 'adding stuff', p_orig.pdg_id, p.pdg_id, d.pdg_id
dprop = self._zero_mat()
p._assign_decay_idx(d, idcs, d.hadridx, dprop)
propmat[(d.mceqidx, p_orig.mceqidx)] += dprop.dot(pprod_mat)
if config.debug_level >= 20:
pstr = 'res'
dstr = 'Mchain'
if idcs == p.hadridx:
pstr = 'prop'
dstr = 'Mprop'
info(
40, reclev * '\t',
'setting {0}[({1},{3})->({2},{4})]'.format(
dstr, p_orig.name, d.name, pstr, 'prop'))
if d.is_mixed or d.is_resonance:
dres = self._zero_mat()
p._assign_decay_idx(d, idcs, d.residx, dres)
reclev += 1
self._follow_chains(d, dres.dot(pprod_mat), p_orig, d.residx,
propmat, reclev)
else:
info(20, reclev * '\t', '\t terminating at', d.name)
def _fill_matrices(self, skip_decay_matrix=False):
"""Generates the interaction and decay matrices from scratch.
"""
from collections import defaultdict
# Fill decay matrix blocks
if not skip_decay_matrix or self.dec_m is None:
# Initialize empty D matrix
self.D_blocks = defaultdict(lambda: self._zero_mat())
for p in self._pman.cascade_particles:
# Fill parts of the D matrix related to p as mother
if not p.is_stable and bool(p.children) and not p.is_tracking:
self._follow_chains(p,
np.diag(np.ones((self.dim))),
p,
p.hadridx,
self.D_blocks,
reclev=0)
else:
info(20, p.name, 'stable or not added to D matrix')
# Initialize empty C blocks
self.C_blocks = defaultdict(lambda: self._zero_mat())
for p in self._pman.cascade_particles:
# if p doesn't interact, skip interaction matrices
if not p.is_projectile:
if p.is_hadron:
info(
1, 'No interactions by {0} ({1}).'.format(
p.name, p.pdg_id))
continue
for s in p.hadr_secondaries:
# if s not in self.pman.cascade_particles:
# print 'Doing nothing with', p.pdg_id, s.pdg_id
# continue
if not s.is_resonance:
cmat = self._zero_mat()
p._assign_hadr_dist_idx(s, p.hadridx, s.hadridx, cmat)
self.C_blocks[(s.mceqidx, p.mceqidx)] += cmat
cmat = self._zero_mat()
p._assign_hadr_dist_idx(s, p.hadridx, s.residx, cmat)
self._follow_chains(s,
cmat,
p,
s.residx,
self.C_blocks,
reclev=1)
def _construct_differential_operator(self):
"""Constructs a derivative operator for the contiuous losses.
This implmentation uses a 6th-order finite differences operator,
only depends on the energy grid. This is an operator for a sub-matrix
of dimension (energy grid, energy grid) for a single particle. It
can be likewise applied to all particle species. The dEdX values are
applied later in ...
"""
# First rows of operator matrix (values are truncated at the edges
# of a matrix.)
diags_leftmost = [0, 1, 2, 3]
coeffs_leftmost = [-11, 18, -9, 2]
denom_leftmost = 6
diags_left_1 = [-1, 0, 1, 2, 3]
coeffs_left_1 = [-3, -10, 18, -6, 1]
denom_left_1 = 12
diags_left_2 = [-2, -1, 0, 1, 2, 3]
coeffs_left_2 = [3, -30, -20, 60, -15, 2]
denom_left_2 = 60
# Centered diagonals
# diags = [-3, -2, -1, 1, 2, 3]
# coeffs = [-1, 9, -45, 45, -9, 1]
# denom = 60.
diags = diags_left_2
coeffs = coeffs_left_2
denom = 60.
# Last rows at the right of operator matrix
diags_right_2 = [-d for d in diags_left_2[::-1]]
coeffs_right_2 = [-d for d in coeffs_left_2[::-1]]
denom_right_2 = denom_left_2
diags_right_1 = [-d for d in diags_left_1[::-1]]
coeffs_right_1 = [-d for d in coeffs_left_1[::-1]]
denom_right_1 = denom_left_1
diags_rightmost = [-d for d in diags_leftmost[::-1]]
coeffs_rightmost = [-d for d in coeffs_leftmost[::-1]]
denom_rightmost = denom_leftmost
h = np.log(self._energy_grid.b[1:] / self._energy_grid.b[:-1])
dim_e = self._energy_grid.d
last = dim_e - 1
op_matrix = np.zeros((dim_e, dim_e))
op_matrix[0, np.asarray(diags_leftmost)] = np.asarray(
coeffs_leftmost) / (denom_leftmost * h[0])
op_matrix[1, 1 +
np.asarray(diags_left_1)] = np.asarray(coeffs_left_1) / (
denom_left_1 * h[1])
op_matrix[2, 2 +
np.asarray(diags_left_2)] = np.asarray(coeffs_left_2) / (
denom_left_2 * h[2])
op_matrix[last, last + np.asarray(diags_rightmost)] = np.asarray(
coeffs_rightmost) / (denom_rightmost * h[last])
op_matrix[last - 1, last - 1 +
np.asarray(diags_right_1)] = np.asarray(coeffs_right_1) / (
denom_right_1 * h[last - 1])
op_matrix[last - 2, last - 2 +
np.asarray(diags_right_2)] = np.asarray(coeffs_right_2) / (
denom_right_2 * h[last - 2])
for row in range(3, dim_e - 3):
op_matrix[row, row +
np.asarray(diags)] = np.asarray(coeffs) / (denom *
h[row])
self.op_matrix = op_matrix