Source code for ligo.skymap.extern.ptemcee.interruptible_pool

# Copied and adapted from https://github.com/willvousden/ptemcee
#
# The MIT License (MIT)
#
# Copyright (c) 2010-2013 Daniel Foreman-Mackey & contributors.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""
Python's multiprocessing.Pool class doesn't interact well with
``KeyboardInterrupt`` signals, as documented in places such as:

* `<http://stackoverflow.com/questions/1408356/>`_
* `<http://stackoverflow.com/questions/11312525/>`_
* `<http://noswap.com/blog/python-multiprocessing-keyboardinterrupt>`_

Various workarounds have been shared. Here, we adapt the one proposed in the
last link above, by John Reese, and shared as

* `<https://github.com/jreese/multiprocessing-keyboardinterrupt/>`_

Our version is a drop-in replacement for multiprocessing.Pool ... as long as
the map() method is the only one that needs to be interrupt-friendly.

Contributed by Peter K. G. Williams <peter@newton.cx>.

*Added in version 2.1.0*

"""

__all__ = ["InterruptiblePool"]

import signal
import functools
from multiprocessing.pool import Pool
from multiprocessing import TimeoutError


def _initializer_wrapper(actual_initializer, *rest):
    """
    We ignore SIGINT. It's up to our parent to kill us in the typical
    condition of this arising from ``^C`` on a terminal. If someone is
    manually killing us with that signal, well... nothing will happen.

    """
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    if actual_initializer is not None:
        actual_initializer(*rest)


[docs] class InterruptiblePool(Pool): """ A modified version of :class:`multiprocessing.pool.Pool` that has better behavior with regard to ``KeyboardInterrupts`` in the :func:`map` method. :param processes: (optional) The number of worker processes to use; defaults to the number of CPUs. :param initializer: (optional) Either ``None``, or a callable that will be invoked by each worker process when it starts. :param initargs: (optional) Arguments for *initializer*; it will be called as ``initializer(*initargs)``. :param kwargs: (optional) Extra arguments. Python 2.7 supports a ``maxtasksperchild`` parameter. """ wait_timeout = 3600 def __init__(self, processes=None, initializer=None, initargs=(), **kwargs): new_initializer = functools.partial(_initializer_wrapper, initializer) super(InterruptiblePool, self).__init__(processes, new_initializer, initargs, **kwargs)
[docs] def map(self, func, iterable, chunksize=None): """ Equivalent of ``map()`` built-in, without swallowing ``KeyboardInterrupt``. :param func: The function to apply to the items. :param iterable: An iterable of items that will have `func` applied to them. """ # The key magic is that we must call r.get() with a timeout, because # a Condition.wait() without a timeout swallows KeyboardInterrupts. r = self.map_async(func, iterable, chunksize) while True: try: return r.get(self.wait_timeout) except TimeoutError: pass except KeyboardInterrupt: self.terminate() self.join() raise
# Other exceptions propagate up.