Source code for pyRTX.core.parallel_utils

import os, sys, inspect
from multiprocessing import Pool
from pathos.multiprocessing import ProcessingPool
from functools import partial, wraps

"""
A set of utilities focused on increasing the computational efficiency of numerical calculations.
"""


[docs]def get_ncores(lst, **kwargs): """ Returns the number of cores for parallel computations. If not specified, the number of cores is set to the number available on your machine minus one. Parameters ---------- lst : list or numpy.ndarray The list of items to be processed in parallel. **kwargs : dict Additional keyword arguments. If 'n_cores' is present, it will be used as the number of cores. Returns ------- int The number of cores to use for parallel computation. """ if ('n_cores' in kwargs.keys()) and (isinstance(kwargs['n_cores'], int)) and (os.cpu_count() > kwargs['n_cores'] >= 1): n_cores = kwargs['n_cores'] else: n_cores = os.cpu_count() - 1 n_cores = min(len(lst), n_cores) if len(lst) else 1 return n_cores
[docs]def is_method(func, cls_inst): """ Checks if the wrapped object is a method of a class. Parameters ---------- func : function The function to check. cls_inst : object The class instance. Returns ------- bool True if the function is a method of the class, False otherwise. """ return (not inspect.isclass(cls_inst)) and (hasattr(cls_inst, func.__name__)) and ('self' in inspect.getargspec(func).args)
[docs]def get_unwrapped(*args, **kwargs): """ Returns the unwrapped version of a class method. Parameters ---------- *args : tuple The arguments to pass to the method. **kwargs : dict The keyword arguments to pass to the method. 'method' must be present and contain the name of the method to unwrap. Returns ------- object The result of the unwrapped method call. """ cls_inst = args[0] cls_attr = kwargs['method'] return getattr(cls_inst, cls_attr).__wrapped__(*args)
[docs]def parallel(func): """ A decorator for parallel computations. The decorated object can be a class method or a regular function. It applies the function to every item of an iterable, performing the computations in parallel. The output will be an iterator which contains the return value of every function call. The decorated object can accept multiple arguments, but the last positional argument must be an item of the input iterable (e.g., an item of a list, numpy array, or range object). If the keyword argument 'n_cores' is specified, the decorated object will run on `n_cores` processes. Otherwise, the number of cores is set to the number available on your machine. Parameters ---------- func : function The function to be decorated. Returns ------- function The decorated function. Examples -------- >>> iterable = [2, 5, 6, 7, 3, 4, 1] >>> @parallel ... def target_func(some_input, item): ... return some_input * item >>> results = target_func(10, iterable, n_cores=5) """ @wraps(func) def wrapper(*args, **kwargs): """ Wrapper for the target function. It creates a multiprocessing pool to run the target function in parallel. """ # Check if the last positional input is an iterable. if iter(args[-1]): lst = args[-1] # Get the number of cores to use. n_cores = get_ncores(lst, **kwargs) # Case 0: sequential computations (n_cores = 1). if get_ncores(lst, **kwargs) == 1: results = [func(*args[:-1], it) for it in lst] # Case 1: parallel computations for a class method. elif is_method(func, args[0]): # Run the multiprocessing pool. task = partial(get_unwrapped, *args[:-1], method = func.__name__) with Pool(n_cores) as p: results = p.map(task, lst) # Case 2: parallel computations for a regular function. else: # Run the multiprocessing pool from 'pathos' library (it does not use pickle). task = partial(func, *args[:-1]) with ProcessingPool(n_cores) as p: results = p.map(task, lst) return results return wrapper