Source code for ligo.skymap.extern.ptemcee.sampler

# Copied and adapted from https://github.com/willvousden/ptemcee
#
# The MIT License (MIT)
#
# Copyright (c) 2010-2013 Daniel Foreman-Mackey & contributors.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

__all__ = ["Sampler", "default_beta_ladder"]

import numpy as np
import multiprocessing as multi
from . import util

[docs] def default_beta_ladder(ndim, ntemps=None, Tmax=None): """ Returns a ladder of :math:`\beta \\equiv 1/T` under a geometric spacing that is determined by the arguments ``ntemps`` and ``Tmax``. The temperature selection algorithm works as follows: Ideally, ``Tmax`` should be specified such that the tempered posterior looks like the prior at this temperature. If using adaptive parallel tempering, per `arXiv:1501.05823 <http://arxiv.org/abs/1501.05823>`_, choosing ``Tmax = inf`` is a safe bet, so long as ``ntemps`` is also specified. :param ndim: The number of dimensions in the parameter space. :param ntemps: (optional) If set, the number of temperatures to generate. :param Tmax: (optional) If set, the maximum temperature for the ladder. Temperatures are chosen according to the following algorithm: * If neither ``ntemps`` nor ``Tmax`` is specified, raise an exception (insufficient information). * If ``ntemps`` is specified but not ``Tmax``, return a ladder spaced so that a Gaussian posterior would have a 25% temperature swap acceptance ratio. * If ``Tmax`` is specified but not ``ntemps``: * If ``Tmax = inf``, raise an exception (insufficient information). * Else, space chains geometrically as above (for 25% acceptance) until ``Tmax`` is reached. * If ``Tmax`` and ``ntemps`` are specified: * If ``Tmax = inf``, place one chain at ``inf`` and ``ntemps-1`` in a 25% geometric spacing. * Else, use the unique geometric spacing defined by ``ntemps`` and ``Tmax``. """ if type(ndim) != int or ndim < 1: raise ValueError('Invalid number of dimensions specified.') if ntemps is None and Tmax is None: raise ValueError('Must specify one of ``ntemps`` and ``Tmax``.') if Tmax is not None and Tmax <= 1: raise ValueError('``Tmax`` must be greater than 1.') if ntemps is not None and (type(ntemps) != int or ntemps < 1): raise ValueError('Invalid number of temperatures specified.') tstep = np.array([25.2741, 7., 4.47502, 3.5236, 3.0232, 2.71225, 2.49879, 2.34226, 2.22198, 2.12628, 2.04807, 1.98276, 1.92728, 1.87946, 1.83774, 1.80096, 1.76826, 1.73895, 1.7125, 1.68849, 1.66657, 1.64647, 1.62795, 1.61083, 1.59494, 1.58014, 1.56632, 1.55338, 1.54123, 1.5298, 1.51901, 1.50881, 1.49916, 1.49, 1.4813, 1.47302, 1.46512, 1.45759, 1.45039, 1.4435, 1.4369, 1.43056, 1.42448, 1.41864, 1.41302, 1.40761, 1.40239, 1.39736, 1.3925, 1.38781, 1.38327, 1.37888, 1.37463, 1.37051, 1.36652, 1.36265, 1.35889, 1.35524, 1.3517, 1.34825, 1.3449, 1.34164, 1.33847, 1.33538, 1.33236, 1.32943, 1.32656, 1.32377, 1.32104, 1.31838, 1.31578, 1.31325, 1.31076, 1.30834, 1.30596, 1.30364, 1.30137, 1.29915, 1.29697, 1.29484, 1.29275, 1.29071, 1.2887, 1.28673, 1.2848, 1.28291, 1.28106, 1.27923, 1.27745, 1.27569, 1.27397, 1.27227, 1.27061, 1.26898, 1.26737, 1.26579, 1.26424, 1.26271, 1.26121, 1.25973]) if ndim > tstep.shape[0]: # An approximation to the temperature step at large # dimension tstep = 1.0 + 2.0*np.sqrt(np.log(4.0))/np.sqrt(ndim) else: tstep = tstep[ndim-1] appendInf = False if Tmax == np.inf: appendInf = True Tmax = None ntemps = ntemps - 1 if ntemps is not None: if Tmax is None: # Determine Tmax from ntemps. Tmax = tstep ** (ntemps - 1) else: if Tmax is None: raise ValueError('Must specify at least one of ``ntemps'' and finite ``Tmax``.') # Determine ntemps from Tmax. ntemps = int(np.log(Tmax) / np.log(tstep) + 2) betas = np.logspace(0, -np.log10(Tmax), ntemps) if appendInf: # Use a geometric spacing, but replace the top-most temperature with infinity. betas = np.concatenate((betas, [0])) return betas
class LikePriorEvaluator: """ Wrapper class for logl and logp. """ def __init__(self, logl, logp, loglargs=[], logpargs=[], loglkwargs={}, logpkwargs={}): self.logl = logl self.logp = logp self.loglargs = loglargs self.logpargs = logpargs self.loglkwargs = loglkwargs self.logpkwargs = logpkwargs def __call__(self, x): s = x.shape x = x.reshape((-1, x.shape[-1])) lp = self.logp(x, *self.logpargs, **self.logpkwargs) if np.any(np.isnan(lp)): raise ValueError('Prior function returned NaN.') # Can't return -inf, since this messes with beta=0 behaviour. ll = np.empty_like(lp) bad = (lp == -np.inf) ll[bad] = 0 ll[~bad] = self.logl(x[~bad], *self.loglargs, **self.loglkwargs) if np.any(np.isnan(ll)): raise ValueError('Log likelihood function returned NaN.') return ll.reshape(s[:-1]), lp.reshape(s[:-1])
[docs] class Sampler: """ A parallel-tempered ensemble sampler, using :class:`EnsembleSampler` for sampling within each parallel chain. :param nwalkers: The number of ensemble walkers at each temperature. :param dim: The dimension of parameter space. :param betas: (optional) Array giving the inverse temperatures, :math:`\\beta=1/T`, used in the ladder. The default is chosen according to :meth:`default_beta_ladder` using ``ntemps`` and ``Tmax``. :param ntemps: (optional) If set, the number of temperatures to use. :param Tmax: (optional) If set, the maximum temperature for the ladder. :param logl: The log-likelihood function. :param logp: The log-prior function. :param threads: (optional) The number of parallel threads to use in sampling. :param pool: (optional) Alternative to ``threads``. Any object that implements a ``map`` method compatible with the built-in ``map`` will do here. For example, :class:`multi.Pool` will do. :param a: (optional) Proposal scale factor. :param loglargs: (optional) Positional arguments for the log-likelihood function. :param logpargs: (optional) Positional arguments for the log-prior function. :param loglkwargs: (optional) Keyword arguments for the log-likelihood function. :param logpkwargs: (optional) Keyword arguments for the log-prior function. :param adaptation_lag: (optional) Time lag for temperature dynamics decay. Default: 10000. :param adaptation_time: (optional) Time-scale for temperature dynamics. Default: 100. :param vectorize: (optional) If ``True``, ``logl`` and ``logp`` are expected to accept a list of position vectors instead of just one. Note that ``pool`` will be ignored if this is ``True``. (default: ``False``) """ def __init__(self, nwalkers, dim, logl, logp, ntemps=None, Tmax=None, betas=None, threads=1, pool=None, a=2.0, loglargs=[], logpargs=[], loglkwargs={}, logpkwargs={}, adaptation_lag=10000, adaptation_time=100, random=None, vectorize=False): self._vectorize = vectorize if vectorize: pool = None if random is None: self._random = np.random.mtrand.RandomState() else: self._random = random self._likeprior = LikePriorEvaluator(logl, logp, loglargs, logpargs, loglkwargs, logpkwargs) self.a = a self.nwalkers = nwalkers self.dim = dim self.adaptation_time = adaptation_time self.adaptation_lag = adaptation_lag # Set temperature ladder. Append beta=0 to generated ladder. if betas is not None: self._betas = np.array(betas).copy() else: self._betas = default_beta_ladder(self.dim, ntemps=ntemps, Tmax=Tmax) # Make sure ladder is ascending in temperature. self._betas[::-1].sort() if self.nwalkers % 2 != 0: raise ValueError('The number of walkers must be even.') if self.nwalkers < 2 * self.dim: raise ValueError('The number of walkers must be greater than ``2*dimension``.') self.pool = pool if threads > 1 and pool is None: self.pool = multi.Pool(threads) self.reset()
[docs] def reset(self, random=None, betas=None, time=None): """ Clear the ``time``, ``chain``, ``logposterior``, ``loglikelihood``, ``acceptance_fraction``, ``tswap_acceptance_fraction`` stored properties. """ # Reset chain. self._chain = None self._logposterior = None self._loglikelihood = None self._beta_history = None # Reset sampler state. self._time = 0 self._p0 = None self._logposterior0 = None self._loglikelihood0 = None if betas is not None: self._betas = betas self.nswap = np.zeros(self.ntemps, dtype=np.float64) self.nswap_accepted = np.zeros(self.ntemps, dtype=np.float64) self.nprop = np.zeros((self.ntemps, self.nwalkers), dtype=np.float64) self.nprop_accepted = np.zeros((self.ntemps, self.nwalkers), dtype=np.float64) if random is not None: self._random = random if time is not None: self._time = time
[docs] def run_mcmc(self, *args, **kwargs): """ Identical to ``sample``, but returns the final ensemble and discards intermediate ensembles. """ for x in self.sample(*args, **kwargs): pass return x
[docs] def sample(self, p0=None, iterations=1, thin=1, storechain=True, adapt=False, swap_ratios=False): """ Advance the chains ``iterations`` steps as a generator. :param p0: The initial positions of the walkers. Shape should be ``(ntemps, nwalkers, dim)``. Can be omitted if resuming from a previous run. :param iterations: (optional) The number of iterations to perform. :param thin: (optional) The number of iterations to perform between saving the state to the internal chain. :param storechain: (optional) If ``True`` store the iterations in the ``chain`` property. :param adapt: (optional) If ``True``, the temperature ladder is dynamically adapted as the sampler runs to achieve uniform swap acceptance ratios between adjacent chains. See `arXiv:1501.05823 <http://arxiv.org/abs/1501.05823>`_ for details. :param swap_ratios: (optional) If ``True``, also yield the instantaneous inter-chain acceptance ratios in the fourth element of the yielded tuple. At each iteration, this generator yields * ``p``, the current position of the walkers. * ``logpost``, the current posterior values for the walkers. * ``loglike``, the current likelihood values for the walkers. * ``ratios``, the instantaneous inter-chain acceptance ratios (if requested). """ # Set initial walker positions. if p0 is not None: # Start anew. self._p0 = p = np.array(p0).copy() self._logposterior0 = None self._loglikelihood0 = None elif self._p0 is not None: # Now, where were we? p = self._p0 else: raise ValueError('Initial walker positions not specified.') # Check for dodgy inputs. if np.any(np.isinf(p)): raise ValueError('At least one parameter value was infinite.') if np.any(np.isnan(p)): raise ValueError('At least one parameter value was NaN.') # If we have no likelihood or prior values, compute them. if self._logposterior0 is None or self._loglikelihood0 is None: logl, logp = self._evaluate(p) logpost = self._tempered_likelihood(logl) + logp self._loglikelihood0 = logl self._logposterior0 = logpost else: logl = self._loglikelihood0 logpost = self._logposterior0 if (logpost == -np.inf).any(): raise ValueError('Attempting to start with samples outside posterior support.') # Expand the chain in advance of the iterations if storechain: isave = self._expand_chain(iterations // thin) for i in range(iterations): for j in [0, 1]: # Get positions of walkers to be updated and walker to be sampled. jupdate = j jsample = (j + 1) % 2 pupdate = p[:, jupdate::2, :] psample = p[:, jsample::2, :] zs = np.exp(self._random.uniform(low=-np.log(self.a), high=np.log(self.a), size=(self.ntemps, self.nwalkers//2))) qs = np.zeros((self.ntemps, self.nwalkers//2, self.dim)) for k in range(self.ntemps): js = self._random.randint(0, high=self.nwalkers // 2, size=self.nwalkers // 2) qs[k, :, :] = psample[k, js, :] + zs[k, :].reshape( (self.nwalkers // 2, 1)) * (pupdate[k, :, :] - psample[k, js, :]) qslogl, qslogp = self._evaluate(qs) qslogpost = self._tempered_likelihood(qslogl) + qslogp logpaccept = self.dim*np.log(zs) + qslogpost \ - logpost[:, jupdate::2] logr = np.log(self._random.uniform(low=0.0, high=1.0, size=(self.ntemps, self.nwalkers//2))) accepts = logr < logpaccept accepts = accepts.flatten() pupdate.reshape((-1, self.dim))[accepts, :] = \ qs.reshape((-1, self.dim))[accepts, :] logpost[:, jupdate::2].reshape((-1,))[accepts] = \ qslogpost.reshape((-1,))[accepts] logl[:, jupdate::2].reshape((-1,))[accepts] = \ qslogl.reshape((-1,))[accepts] accepts = accepts.reshape((self.ntemps, self.nwalkers//2)) self.nprop[:, jupdate::2] += 1.0 self.nprop_accepted[:, jupdate::2] += accepts p, ratios = self._temperature_swaps(self._betas, p, logpost, logl) # TODO Should the notion of a "complete" iteration really include the temperature # adjustment? if adapt and self.ntemps > 1: dbetas = self._get_ladder_adjustment(self._time, self._betas, ratios) self._betas += dbetas logpost += self._tempered_likelihood(logl, betas=dbetas) if (self._time + 1) % thin == 0: if storechain: self._chain[:, :, isave, :] = p self._logposterior[:, :, isave] = logpost self._loglikelihood[:, :, isave] = logl self._beta_history[:, isave] = self._betas isave += 1 self._time += 1 if swap_ratios: yield p, logpost, logl, ratios else: yield p, logpost, logl
def _evaluate(self, ps): if self._vectorize: return self._likeprior(ps) mapf = map if self.pool is None else self.pool.map results = list(mapf(self._likeprior, ps.reshape((-1, self.dim)))) logl = np.fromiter((r[0] for r in results), np.float64, count=len(results)).reshape((self.ntemps, -1)) logp = np.fromiter((r[1] for r in results), np.float64, count=len(results)).reshape((self.ntemps, -1)) return logl, logp def _tempered_likelihood(self, logl, betas=None): """ Compute tempered log likelihood. This is usually a mundane multiplication, except for the special case where beta == 0 *and* we're outside the likelihood support. Here, we find a singularity that demands more careful attention; we allow the likelihood to dominate the temperature, since wandering outside the likelihood support causes a discontinuity. """ if betas is None: betas = self._betas betas = betas.reshape((-1, 1)) with np.errstate(invalid='ignore'): loglT = logl * betas loglT[np.isnan(loglT)] = -np.inf return loglT def _temperature_swaps(self, betas, p, logpost, logl): """ Perform parallel-tempering temperature swaps on the state in ``p`` with associated ``logpost`` and ``logl``. """ ntemps = len(betas) ratios = np.zeros(ntemps - 1) for i in range(ntemps - 1, 0, -1): bi = betas[i] bi1 = betas[i - 1] dbeta = bi1 - bi iperm = self._random.permutation(self.nwalkers) i1perm = self._random.permutation(self.nwalkers) raccept = np.log(self._random.uniform(size=self.nwalkers)) paccept = dbeta * (logl[i, iperm] - logl[i - 1, i1perm]) self.nswap[i] += self.nwalkers self.nswap[i - 1] += self.nwalkers asel = (paccept > raccept) nacc = np.sum(asel) self.nswap_accepted[i] += nacc self.nswap_accepted[i - 1] += nacc ratios[i - 1] = nacc / self.nwalkers ptemp = np.copy(p[i, iperm[asel], :]) logltemp = np.copy(logl[i, iperm[asel]]) logprtemp = np.copy(logpost[i, iperm[asel]]) p[i, iperm[asel], :] = p[i - 1, i1perm[asel], :] logl[i, iperm[asel]] = logl[i - 1, i1perm[asel]] logpost[i, iperm[asel]] = logpost[i - 1, i1perm[asel]] \ - dbeta * logl[i - 1, i1perm[asel]] p[i - 1, i1perm[asel], :] = ptemp logl[i - 1, i1perm[asel]] = logltemp logpost[i - 1, i1perm[asel]] = logprtemp + dbeta * logltemp return p, ratios def _get_ladder_adjustment(self, time, betas0, ratios): """ Execute temperature adjustment according to dynamics outlined in `arXiv:1501.05823 <http://arxiv.org/abs/1501.05823>`_. """ betas = betas0.copy() # Modulate temperature adjustments with a hyperbolic decay. decay = self.adaptation_lag / (time + self.adaptation_lag) kappa = decay / self.adaptation_time # Construct temperature adjustments. dSs = kappa * (ratios[:-1] - ratios[1:]) # Compute new ladder (hottest and coldest chains don't move). deltaTs = np.diff(1 / betas[:-1]) deltaTs *= np.exp(dSs) betas[1:-1] = 1 / (np.cumsum(deltaTs) + 1 / betas[0]) # Don't mutate the ladder here; let the client code do that. return betas - betas0 def _expand_chain(self, nsave): """ Expand ``self._chain``, ``self._logposterior``, ``self._loglikelihood``, and ``self._beta_history`` ahead of run to make room for new samples. :param nsave: The number of additional iterations for which to make room. :return ``isave``: Returns the index at which to begin inserting new entries. """ if self._chain is None: isave = 0 self._chain = np.zeros((self.ntemps, self.nwalkers, nsave, self.dim)) self._logposterior = np.zeros((self.ntemps, self.nwalkers, nsave)) self._loglikelihood = np.zeros((self.ntemps, self.nwalkers, nsave)) self._beta_history = np.zeros((self.ntemps, nsave)) else: isave = self._chain.shape[2] self._chain = np.concatenate((self._chain, np.zeros((self.ntemps, self.nwalkers, nsave, self.dim))), axis=2) self._logposterior = np.concatenate((self._logposterior, np.zeros((self.ntemps, self.nwalkers, nsave))), axis=2) self._loglikelihood = np.concatenate((self._loglikelihood, np.zeros((self.ntemps, self.nwalkers, nsave))), axis=2) self._beta_history = np.concatenate((self._beta_history, np.zeros((self.ntemps, nsave))), axis=1) return isave
[docs] def log_evidence_estimate(self, logls=None, fburnin=0.1): """ Thermodynamic integration estimate of the evidence for the sampler. :param logls: (optional) The log-likelihoods to use for computing the thermodynamic evidence. If ``None`` (the default), use the stored log-likelihoods in the sampler. Should be of shape ``(Ntemps, Nwalkers, Nsamples)``. :param fburnin: (optional) The fraction of the chain to discard as burnin samples; only the final ``1-fburnin`` fraction of the samples will be used to compute the evidence; the default is ``fburnin = 0.1``. :return ``(logZ, dlogZ)``: Returns an estimate of the log-evidence and the error associated with the finite number of temperatures at which the posterior has been sampled. For details, see ``thermodynamic_integration_log_evidence``. """ if logls is None: if self.loglikelihood is not None: logls = self.loglikelihood else: raise ValueError('No log likelihood values available.') istart = int(logls.shape[2] * fburnin + 0.5) mean_logls = np.mean(np.mean(logls, axis=1)[:, istart:], axis=1) return util.thermodynamic_integration_log_evidence(self._betas, mean_logls)
@property def random(self): """ Returns the random number generator for the sampler. """ return self._random @property def betas(self): """ Returns the current inverse temperature ladder of the sampler. """ return self._betas @property def time(self): """ Returns the current time, in iterations, of the sampler. """ return self._time @property def chain(self): """ Returns the stored chain of samples; shape ``(Ntemps, Nwalkers, Nsteps, Ndim)``. """ return self._chain @property def flatchain(self): """Returns the stored chain, but flattened along the walker axis, so of shape ``(Ntemps, Nwalkers*Nsteps, Ndim)``. """ s = self.chain.shape return self._chain.reshape((s[0], -1, s[3])) @property def logprobability(self): """ Matrix of logprobability values; shape ``(Ntemps, Nwalkers, Nsteps)``. """ return self._logposterior @property def loglikelihood(self): """ Matrix of log-likelihood values; shape ``(Ntemps, Nwalkers, Nsteps)``. """ return self._loglikelihood @property def beta_history(self): """ Matrix of inverse temperatures; shape ``(Ntemps, Nsteps)``. """ return self._beta_history @property def tswap_acceptance_fraction(self): """ Returns an array of accepted temperature swap fractions for each temperature; shape ``(ntemps, )``. """ return self.nswap_accepted / self.nswap @property def ntemps(self): """ The number of temperature chains. """ return len(self._betas) @property def acceptance_fraction(self): """ Matrix of shape ``(Ntemps, Nwalkers)`` detailing the acceptance fraction for each walker. """ return self.nprop_accepted / self.nprop @property def acor(self): """ Returns a matrix of autocorrelation lengths for each parameter in each temperature of shape ``(Ntemps, Ndim)``. """ return self.get_autocorr_time()
[docs] def get_autocorr_time(self, window=50): """ Returns a matrix of autocorrelation lengths for each parameter in each temperature of shape ``(Ntemps, Ndim)``. :param window: (optional) The size of the windowing function. This is equivalent to the maximum number of lags to use. (default: 50) """ acors = np.zeros((self.ntemps, self.dim)) for i in range(self.ntemps): x = np.mean(self._chain[i, :, :, :], axis=0) acors[i, :] = util.autocorr_integrated_time(x, window=window) return acors