Source code for bpc_utils.multiprocessing
"""Parallel execution support for BPC."""
import os
from .misc import nullcontext
from .typing import TYPE_CHECKING
if TYPE_CHECKING:
from types import ModuleType # isort: split
from .typing import Callable, ContextManager, Iterable, List, Mapping, Optional, T, Tuple
# multiprocessing support detection and CPU_CNT retrieval
try: # try first
import multiprocessing
except ImportError: # pragma: no cover
multiprocessing = None # type: ignore[assignment]
else: # CPU number if multiprocessing supported
if os.name == 'posix' and 'SC_NPROCESSORS_CONF' in getattr(os, 'sysconf_names'): # pragma: no cover
CPU_CNT = getattr(os, 'sysconf')('SC_NPROCESSORS_CONF')
elif hasattr(os, 'sched_getaffinity'): # pragma: no cover
CPU_CNT = len(getattr(os, 'sched_getaffinity')(0))
else: # pragma: no cover
CPU_CNT = os.cpu_count() or 1
finally: # alias and aftermath
mp = multiprocessing # type: Optional[ModuleType]
del multiprocessing
parallel_available = mp is not None and CPU_CNT > 1
[docs]def _mp_map_wrapper(args: 'Tuple[Callable[..., T], Iterable[object], Mapping[str, object]]') -> 'T':
"""Map wrapper function for :mod:`multiprocessing`.
Args:
args: the function to execute, the positional arguments and the keyword arguments packed into a tuple
Returns:
the function execution result
"""
func, posargs, kwargs = args
return func(*posargs, **kwargs)
[docs]def _mp_init_lock(lock: 'ContextManager[None]') -> None: # pragma: no cover
"""Initialize lock for :mod:`multiprocessing`.
Args:
lock: the lock to be shared among tasks
"""
global task_lock # pylint: disable=global-statement
task_lock = lock
[docs]def map_tasks(func: 'Callable[..., T]', iterable: 'Iterable[object]', posargs: 'Optional[Iterable[object]]' = None,
kwargs: 'Optional[Mapping[str, object]]' = None, *,
processes: 'Optional[int]' = None, chunksize: 'Optional[int]' = None) -> 'List[T]':
"""Execute tasks in parallel if :mod:`multiprocessing` is available, otherwise execute them sequentially.
Args:
func: the task function to execute
iterable: the items to process
posargs: additional positional arguments to pass to ``func``
kwargs: keyword arguments to pass to ``func``
processes: the number of worker processes (default: auto determine)
chunksize: chunk size for multiprocessing
Returns:
the return values of the task function applied on the input items and additional arguments
"""
global task_lock # pylint: disable=global-statement
if posargs is None:
posargs = ()
if kwargs is None:
kwargs = {}
# sequential execution
if not parallel_available or processes == 1:
return [func(item, *posargs, **kwargs) for item in iterable]
# parallel execution
processes = processes or CPU_CNT
lock = mp.Lock() # type: ignore[union-attr]
with mp.Pool(processes=processes, initializer=_mp_init_lock, initargs=(lock,)) as pool: # type: ignore[union-attr]
result = pool.map(
_mp_map_wrapper,
[(func, (item,) + tuple(posargs), kwargs) for item in iterable],
chunksize
) # type: List[T]
task_lock = nullcontext()
return result
task_lock = nullcontext() # type: ContextManager[None]
[docs]def TaskLock() -> 'ContextManager[None]':
"""Function that returns a lock for possibly concurrent tasks.
Returns:
a lock for possibly concurrent tasks
"""
return task_lock
__all__ = ['map_tasks', 'TaskLock']