#!/usr/bin/env python
"""
This module contains dyPolyChord's high-level functionality for
performing dynamic nested sampling calculations. This is done using the
algorithm described in Appendix F of "Dynamic nested sampling: an
improved algorithm for parameter estimation and evidence calculation"
(Higson et al., 2019). For more details see the dyPolyChord
doumentation at https://dypolychord.readthedocs.io/en/latest/.
"""
from __future__ import division # Enforce float division for Python2
import copy
import os
import traceback
import sys
import shutil
import warnings
import numpy as np
import scipy.signal
import nestcheck.data_processing
import nestcheck.io_utils
import nestcheck.write_polychord_output
import dyPolyChord.nlive_allocation
import dyPolyChord.output_processing
# The only main public-facing API for this module is the run_dypolychord
# function.
__all__ = ['run_dypolychord']
[docs]@nestcheck.io_utils.timing_decorator
def run_dypolychord(run_polychord, dynamic_goal, settings_dict_in, **kwargs):
r"""Performs dynamic nested sampling using the algorithm described in
Appendix F of "Dynamic nested sampling: an improved algorithm for
parameter estimation and evidence calculation" (Higson et al., 2019).
The likelihood, prior and PolyChord sampler are contained in the
run_polychord callable (first argument).
Dynamic nested sampling is performed in 4 steps:
1) Generate an initial nested sampling run with a constant number of live
points n_init. This process is run in chunks using PolyChord's max_ndead
setting to allow periodic saving of .resume files so the initial run can
be resumed at different points.
2) Calculate an allocation of the number of live points at each likelihood
for use in step 3. Also clean up resume files and save relevant
information.
3) Generate a dynamic nested sampling run using the calculated live point
allocation from step 2.
4) Combine the initial and dynamic runs and write combined output files
in the PolyChord format. Remove the intermediate output files produced
which are no longer needed.
The output files are of the same format produced by PolyChord, and
contain posterior samples and an estimate of the Bayesian evidence.
Further analysis, including estimating uncertainties, can be performed
with the nestcheck package.
Like for PolyChord, the output files are saved in base_dir (specified
in settings_dict_in, default value is 'chains'). Their names are
determined by file_root (also specified in settings_dict_in).
dyPolyChord ensures the following following files are always produced:
* [base_dir]/[file_root].stats: run statistics including an estimate of
the Bayesian evidence;
* [base_dir]/[file_root]_dead.txt: posterior samples;
* [base_dir]/[file_root]_dead-birth.txt: as above but with an extra
column containing information about when points were sampled.
For more information about the output format, see PolyChord's
documentation. Note that dyPolyChord is not able to produce all of the
types of output files made by PolyChord - see check_settings'
documentation for more information.
In addition, a number of intermediate files are produced during the dynamic
nested sampling process which are removed by default when the process
finishes. See clean_extra_output's documentation for more details.
Parameters
----------
run_polychord: callable
A callable which performs nested sampling using PolyChord
for a given likelihood and prior, and takes a settings dictionary as
its argument. Note that the likelihood and prior must be specified
within the run_polychord callable. For helper functions for creating
such callables, see the documentation for dyPolyChord.pypolychord
(Python likelihoods) and dyPolyChord.polychord (C++ and Fortran
likelihoods). Examples can be found at:
https://dypolychord.readthedocs.io/en/latest/demo.html
dynamic_goal: float or int
Number in [0, 1] which determines how to allocate computational effort
between parameter estimation and evidence calculation. See the dynamic
nested sampling paper for more details.
settings_dict: dict
PolyChord settings to use (see check_settings for information on
allowed and default settings).
nlive_const: int, optional
Used to calculate total number of samples if max_ndead not specified in
settings. The total number of samples used in this case is the
estimated number that would be taken by a nested sampling run with a
constant number of live points nlive_const.
ninit: int, optional
Number of live points to use for the initial exploratory run (Step 1).
ninit_step: int, optional
Number of samples taken between saving .resume files in Step 1.
seed_increment: int, optional
If random seeding is used (PolyChord seed setting >= 0), this increment
is added to PolyChord's random seed each time it is run to avoid
repeated points.
When running in parallel using MPI, PolyChord hashes the seed with the
MPI rank using IEOR. Hence you need seed_increment to be > number of
processors to ensure no two processes use the same seed.
When running repeated results you need to increment the seed used for
each run by some number > seed_increment.
smoothing_filter: func or None, optional
Smoothing function to apply to the nlive allocation array of target
live points. Use smoothing_filter=None for no smoothing.
comm: None or mpi4py MPI.COMM object, optional
For MPI parallelisation.
stats_means_errs: bool, optional
Whether to include estimates of the log evidence logZ and the
parameter mean values and their uncertainties in the .stats file.
This is passed to nestcheck's write_run_output; see its documentation
for more details.
clean: bool, optional
Clean the additional output files made by dyPolyChord, leaving only
output files for the combined run in PolyChord format.
When debugging this can be set to False to allow inspection of
intermediate output.
resume_dyn_run: bool, optional
Resume a partially completed dyPolyChord run using its cached output
files. Resuming is only possible if the initial exploratory run
finished and the process reached the dynamic run stage. If the run
is resumed with different settings to what were used the first time
then this may give unexpected results.
"""
try:
nlive_const = kwargs.pop('nlive_const', settings_dict_in['nlive'])
except KeyError:
# If nlive_const is not specified in the arguments or the settings
# dictionary, default to nlive_const = 100
nlive_const = kwargs.pop('nlive_const', 100)
ninit = kwargs.pop('ninit', 10)
init_step = kwargs.pop('init_step', ninit)
seed_increment = kwargs.pop('seed_increment', 100)
default_smoothing = (lambda x: scipy.signal.savgol_filter(
x, 1 + (2 * ninit), 3, mode='nearest'))
smoothing_filter = kwargs.pop('smoothing_filter', default_smoothing)
comm = kwargs.pop('comm', None)
stats_means_errs = kwargs.pop('stats_means_errs', True)
clean = kwargs.pop('clean', True)
resume_dyn_run = kwargs.pop('resume_dyn_run', False)
if kwargs:
raise TypeError('Unexpected **kwargs: {0}'.format(kwargs))
# Set Up
# ------
# Set up rank if running with MPI
if comm is not None:
rank = comm.Get_rank()
else:
rank = 0
settings_dict = None # Define for rank != 0
if rank == 0:
settings_dict_in, output_settings = check_settings(
settings_dict_in)
if (settings_dict_in['seed'] >= 0 and comm is not None and
comm.Get_size() > 1):
warnings.warn((
'N.B. random seeded results will not be reproducible when '
'running dyPolyChord with multiple MPI processes. You have '
'seed={} and {} MPI processes.').format(
settings_dict_in['seed'], comm.Get_size()), UserWarning)
root_name = os.path.join(settings_dict_in['base_dir'],
settings_dict_in['file_root'])
if resume_dyn_run:
# Check if the files we need to resume the dynamic run all exist.
files_needed = ['_dyn_info.pkl', '_init_dead.txt', '_dyn_dead.txt',
'_dyn.resume']
files_needed = [root_name + ending for ending in files_needed]
files_exist = [os.path.isfile(name) for name in files_needed]
if all(files_exist):
skip_initial_run = True
print('resume_dyn_run=True so I am skipping the initial '
'exploratory run and resuming the dynamic run')
else:
skip_initial_run = False
# Only print a message if some of the files are present
if any(files_exist):
msg = (
'resume_dyn_run=True but I could not resume as not '
'all of the files I need are present. Perhaps the initial '
'exploratory run did not finish? The dyPolyChord process '
'can only be resumed from after the dynamic run '
'starts.\nFiles I am missing are:')
for i, name in enumerate(files_needed):
if not files_exist[i]:
msg += '\n' + name
print(msg)
else:
skip_initial_run = False
if skip_initial_run:
dyn_info = nestcheck.io_utils.pickle_load(root_name + '_dyn_info')
else:
# Step 1: do initial run
# ----------------------
if rank == 0:
# Make a copy of settings_dict so we don't edit settings
settings_dict = copy.deepcopy(settings_dict_in)
settings_dict['file_root'] = settings_dict['file_root'] + '_init'
settings_dict['nlive'] = ninit
if dynamic_goal == 0:
# We definitely won't need to resume midway through in this case,
# so just run PolyChord normally
run_polychord(settings_dict, comm=comm)
if rank == 0:
final_seed = settings_dict['seed']
if settings_dict['seed'] >= 0:
final_seed += seed_increment
step_ndead = None
resume_outputs = None
else:
step_ndead, resume_outputs, final_seed = run_and_save_resumes(
run_polychord, settings_dict, init_step, seed_increment,
comm=comm)
# Step 2: calculate an allocation of live points
# ----------------------------------------------
if rank == 0:
try:
# Get settings for dynamic run based on initial run
dyn_info = process_initial_run(
settings_dict_in, nlive_const=nlive_const,
smoothing_filter=smoothing_filter,
step_ndead=step_ndead, resume_outputs=resume_outputs,
ninit=ninit, dynamic_goal=dynamic_goal,
final_seed=final_seed)
except: # pragma: no cover
# We need a bare except statement here to ensure that if
# any type of error occurs in the rank == 0 process when
# running in parallel with MPI then we also abort all the
# other processes.
if comm is None or comm.Get_size() == 1:
raise # Just one process so raise error normally.
else:
# Print error info.
traceback.print_exc(file=sys.stdout)
print('Error in process with rank == 0: forcing MPI abort')
sys.stdout.flush() # Make sure message prints before abort
comm.Abort(1)
# Step 3: do dynamic run
# ----------------------
# Get settings for dynamic run
if rank == 0:
settings_dict = get_dynamic_settings(settings_dict_in, dyn_info)
if resume_dyn_run:
settings_dict['write_resume'] = True
settings_dict['read_resume'] = True
# Do the run
run_polychord(settings_dict, comm=comm)
# Step 4: process output and tidy
# -------------------------------
if rank == 0:
try:
# Combine initial and dynamic runs
run = dyPolyChord.output_processing.process_dypolychord_run(
settings_dict_in['file_root'], settings_dict_in['base_dir'],
dynamic_goal=dynamic_goal)
# Save combined output in PolyChord format
nestcheck.write_polychord_output.write_run_output(
run, stats_means_errs=stats_means_errs, **output_settings)
if clean:
# Remove temporary files
root_name = os.path.join(settings_dict_in['base_dir'],
settings_dict_in['file_root'])
dyPolyChord.output_processing.clean_extra_output(root_name)
except: # pragma: no cover
# We need a bare except statement here to ensure that if
# any type of error occurs in the rank == 0 process when
# running in parallel with MPI then we also abort all the
# other processes.
if comm is None or comm.Get_size() == 1:
raise # Just one process so raise error normally.
else:
# Print error info.
traceback.print_exc(file=sys.stdout)
print('Error in process with rank == 0: forcing MPI abort')
sys.stdout.flush() # Make sure message prints before abort
comm.Abort(1)
# Helper functions
# ----------------
def check_settings(settings_dict_in):
"""
Check the input dictionary of PolyChord settings and add default values.
Some setting values are mandatory for dyPolyChord - if one of these is set
to a value which is not allowed then a UserWarning is issued and the
program proceeds with the mandatory setting.
Parameters
----------
settings_dict_in: dict
PolyChord settings to use.
Returns
-------
settings_dict: dict
Updated settings dictionary including default and mandatory values.
output_settings: dict
Settings for writing output files which are saved until the final
output files are calculated at the end.
"""
default_settings = {'nlive': 100,
'num_repeats': 20,
'file_root': 'temp',
'base_dir': 'chains',
'seed': -1,
'do_clustering': True,
'max_ndead': -1,
'equals': True,
'posteriors': True}
mandatory_settings = {'nlives': {},
'write_dead': True,
'write_stats': True,
'write_paramnames': False,
'write_prior': False,
'write_live': False,
'write_resume': False,
'read_resume': False,
'cluster_posteriors': False,
'boost_posterior': 0.0}
settings_dict = copy.deepcopy(settings_dict_in)
# Assign default settings.
for key, value in default_settings.items():
if key not in settings_dict:
settings_dict[key] = value
# Produce warning if settings_dict_in has different values for any
# mandatory settings.
for key, value in mandatory_settings.items():
if key in settings_dict_in and settings_dict_in[key] != value:
warnings.warn((
'dyPolyChord currently only allows the setting {0}={1}, '
'so I am proceeding with this. You tried to specify {0}={2}.'
.format(key, value, settings_dict_in[key])), UserWarning)
settings_dict[key] = value
# Extract output settings (not needed until later)
output_settings = {}
for key in ['posteriors', 'equals']:
output_settings[key] = settings_dict[key]
settings_dict[key] = False
return settings_dict, output_settings
def run_and_save_resumes(run_polychord, settings_dict_in, init_step,
seed_increment, comm=None):
"""
Run PolyChord while pausing after every init_step samples (dead points)
generated to save a resume file before continuing.
Parameters
----------
run_polychord: callable
Callable which runs PolyChord with the desired likelihood and prior,
and takes a settings dictionary as its argument.
settings_dict: dict
PolyChord settings to use (see check_settings for information on
allowed and default settings).
ninit_step: int, optional
Number of samples taken between saving .resume files in Step 1.
seed_increment: int, optional
If seeding is used (PolyChord seed setting >= 0), this increment is
added to PolyChord's random seed each time it is run to avoid
repeated points.
When running in parallel using MPI, PolyChord hashes the seed with the
MPI rank using IEOR. Hence you need seed_increment to be > number of
processors to ensure no two processes use the same seed.
When running repeated results you need to increment the seed used for
each run by some number > seed_increment.
comm: None or mpi4py MPI.COMM object, optional
For MPI parallelisation.
Returns
-------
step_ndead: list of ints
Numbers of dead points at which resume files are saved.
resume_outputs: dict
Dictionary containing run output (contents of .stats file) at each
resume. Keys are elements of step_ndead.
final_seed: int
Random seed. This is incremented after each run so it can be used
when resuming without generating correlated points.
"""
settings_dict = copy.deepcopy(settings_dict_in)
# set up rank if running with MPI
if comm is not None:
# Define variables for rank != 0
step_ndead = None
resume_outputs = None
final_seed = None
# Get rank
rank = comm.Get_rank()
else:
rank = 0
if rank == 0:
root_name = os.path.join(settings_dict['base_dir'],
settings_dict['file_root'])
try:
os.remove(root_name + '.resume')
except OSError:
pass
settings_dict['write_resume'] = True
settings_dict['read_resume'] = True
step_ndead = []
resume_outputs = {}
add_points = True
while add_points:
if rank == 0:
settings_dict['max_ndead'] = (len(step_ndead) + 1) * init_step
run_polychord(settings_dict, comm=comm)
if rank == 0:
try:
if settings_dict['seed'] >= 0:
settings_dict['seed'] += seed_increment
run_output = nestcheck.data_processing.process_polychord_stats(
settings_dict['file_root'], settings_dict['base_dir'])
# Store run outputs for getting number of likelihood calls
# while accounting for resuming a run.
resume_outputs[run_output['ndead']] = run_output
step_ndead.append(run_output['ndead'] - settings_dict['nlive'])
if len(step_ndead) >= 2 and step_ndead[-1] == step_ndead[-2]:
add_points = False
# store resume file in new file path
shutil.copyfile(
root_name + '.resume',
root_name + '_' + str(step_ndead[-1]) + '.resume')
except: # pragma: no cover
# We need a bare except statement here to ensure that if
# any type of error occurs in the rank == 0 process when
# running in parallel with MPI then we also abort all the
# other processes.
if comm is None or comm.Get_size() == 1:
raise # Just one process so raise error normally.
else:
# Print error info.
traceback.print_exc(file=sys.stdout)
print('Error in process with rank == 0: forcing MPI abort')
sys.stdout.flush() # Make sure message prints before abort
comm.Abort(1)
if comm is not None:
add_points = comm.bcast(add_points, root=0)
if rank == 0:
final_seed = settings_dict['seed']
return step_ndead, resume_outputs, final_seed
def process_initial_run(settings_dict_in, **kwargs):
"""Loads the initial exploratory run and analyses it to create information
about the second, dynamic run. This information is returned as a dictionary
and also cached.
Parameters
----------
settings_dict_in: dict
Initial PolyChord settings (see check_settings for information on
allowed and default settings).
dynamic_goal: float or int
Number in [0, 1] which determines how to allocate computational effort
between parameter estimation and evidence calculation. See the dynamic
nested sampling paper for more details.
nlive_const: int
Used to calculate total number of samples if max_ndead not specified in
settings. The total number of samples used is the estimated number that
would be taken by a nested sampling run with a constant number of live
points nlive_const.
ninit: int
Number of live points to use for the initial exploratory run (Step 1).
smoothing_filter: func
Smoothing to apply to the nlive allocation (if any).
step_ndead: list of ints
Numbers of dead points at which resume files are saved.
resume_outputs: dict
Dictionary containing run output (contents of .stats file) at each
resume. Keys are elements of step_ndead.
final_seed: int
Random seed at the end of the initial run.
Returns
-------
dyn_info: dict
Information about the second dynamic run which is calculated from
analysing the initial exploratory run.
"""
dynamic_goal = kwargs.pop('dynamic_goal')
nlive_const = kwargs.pop('nlive_const')
ninit = kwargs.pop('ninit')
smoothing_filter = kwargs.pop('smoothing_filter')
step_ndead = kwargs.pop('step_ndead')
resume_outputs = kwargs.pop('resume_outputs')
final_seed = kwargs.pop('final_seed')
if kwargs:
raise TypeError('Unexpected **kwargs: {0}'.format(kwargs))
init_run = nestcheck.data_processing.process_polychord_run(
settings_dict_in['file_root'] + '_init',
settings_dict_in['base_dir'])
# Calculate max number of samples
if settings_dict_in['max_ndead'] > 0:
samp_tot = settings_dict_in['max_ndead']
assert (settings_dict_in['max_ndead']
> init_run['logl'].shape[0]), (
'all points used in inital run - '
'none left for dynamic run!')
else:
samp_tot = init_run['logl'].shape[0] * (nlive_const / ninit)
assert nlive_const > ninit
dyn_info = dyPolyChord.nlive_allocation.allocate(
init_run, samp_tot, dynamic_goal,
smoothing_filter=smoothing_filter)
dyn_info['final_seed'] = final_seed
dyn_info['ninit'] = ninit
root_name = os.path.join(settings_dict_in['base_dir'],
settings_dict_in['file_root'])
if dyn_info['peak_start_ind'] != 0:
# subtract 1 as ndead=1 corresponds to point 0
resume_steps = np.asarray(step_ndead) - 1
# Work out which resume file to load. This is the first resume file
# before dyn_info['peak_start_ind']. If there are no such files then we
# do not reload and instead start the second dynamic run by sampling
# from the entire prior.
indexes_before_peak = np.where(
resume_steps < dyn_info['peak_start_ind'])[0]
if indexes_before_peak.shape[0] > 0:
resume_ndead = step_ndead[indexes_before_peak[-1]]
# copy resume step to dynamic file root
shutil.copyfile(
root_name + '_init_' + str(resume_ndead) + '.resume',
root_name + '_dyn.resume')
# Save resume info
dyn_info['resume_ndead'] = resume_ndead
try:
dyn_info['resume_nlike'] = (
resume_outputs[resume_ndead]['nlike'])
except KeyError:
pass # protect from error reading nlike from .stats file
if dynamic_goal != 0:
# Remove all the temporary resume files. Use set to avoid
# duplicates as these cause OSErrors.
for snd in set(step_ndead):
os.remove(root_name + '_init_' + str(snd) + '.resume')
nestcheck.io_utils.pickle_save(
dyn_info, root_name + '_dyn_info', overwrite_existing=True)
return dyn_info
def get_dynamic_settings(settings_dict_in, dyn_info):
"""Loads the initial exploratory run and analyses it to create
information about the second, dynamic run. This information is returned
in a dictionary.
Parameters
----------
settings_dict_in: dict
Initial PolyChord settings (see check_settings for information on
allowed and default settings).
dynamic_goal: float or int
Number in (0, 1) which determines how to allocate computational effort
between parameter estimation and evidence calculation. See the dynamic
nested sampling paper for more details.
Returns
-------
settings_dict: dict
PolyChord settings for dynamic run.
"""
settings_dict = copy.deepcopy(settings_dict_in)
settings_dict['seed'] = dyn_info['final_seed']
if settings_dict['seed'] >= 0:
assert settings_dict_in['seed'] >= 0, (
'if input seed was <0 it should not have been edited')
if dyn_info['peak_start_ind'] != 0:
settings_dict['nlive'] = dyn_info['ninit']
else:
settings_dict['nlive'] = dyn_info['nlives_dict'][
min(dyn_info['nlives_dict'].keys())]
settings_dict['nlives'] = dyn_info['nlives_dict']
# To write .ini files correctly, read_resume must be type bool not
# np.bool
settings_dict['read_resume'] = (
bool(dyn_info['peak_start_ind'] != 0))
settings_dict['file_root'] = settings_dict_in['file_root'] + '_dyn'
return settings_dict