Skip to content

declearn.utils.run_as_processes

Run coroutines concurrently within individual processes.

Parameters:

Name Type Description Default
*routines Union[Tuple[Callable[..., Any], Tuple[Any, ...]], Tuple[Callable[..., Any], Dict[str, Any]], Tuple[Callable[..., Any], Tuple[Any, ...], Dict[str, Any]]]

Sequence of routines that need running concurrently, each formatted as either: - a 3-elements tuple containing the function to run, a tuple of positional args and a dict of kwargs. - a 2-elements tuple containing the function to run, and a tuple storing its (positional) arguments. - a 2-elements tuple containing the function to run, and a dict storing its keyword arguments.

()
auto_stop bool

Whether to automatically interrupt all running routines as soon as one failed and raised an exception. This can avoid infinite runtime (e.g. if one awaits for a failed routine to send information), but may also prevent some exceptions from being caught due to the early stopping of routines that would have failed later. Hence it may be disabled in contexts where it is interesting to wait for all routines to fail rather than assume that they are co-dependent.

True

Returns:

Name Type Description
success bool

Whether all routines were run without raising an exception.

outputs list[RuntimeError or Any]

List of routine-wise output value or RuntimeError exception that either wraps an actual exception and its traceback, or indicates that the process was interrupted while running.

Source code in declearn/utils/_multiprocess.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def run_as_processes(
    *routines: Union[
        Tuple[Callable[..., Any], Tuple[Any, ...]],
        Tuple[Callable[..., Any], Dict[str, Any]],
        Tuple[Callable[..., Any], Tuple[Any, ...], Dict[str, Any]],
    ],
    auto_stop: bool = True,
) -> Tuple[bool, List[Union[Any, RuntimeError]]]:
    """Run coroutines concurrently within individual processes.

    Parameters
    ----------
    *routines: tuple(function, tuple(any, ...))
        Sequence of routines that need running concurrently, each
        formatted as either:
        - a 3-elements tuple containing the function to run,
            a tuple of positional args and a dict of kwargs.
        - a 2-elements tuple containing the function to run,
            and a tuple storing its (positional) arguments.
        - a 2-elements tuple containing the function to run,
            and a dict storing its keyword arguments.
    auto_stop: bool, default=True
        Whether to automatically interrupt all running routines as
        soon as one failed and raised an exception. This can avoid
        infinite runtime (e.g. if one awaits for a failed routine
        to send information), but may also prevent some exceptions
        from being caught due to the early stopping of routines that
        would have failed later. Hence it may be disabled in contexts
        where it is interesting to wait for all routines to fail rather
        than assume that they are co-dependent.

    Returns
    -------
    success: bool
        Whether all routines were run without raising an exception.
    outputs: list[RuntimeError or Any]
        List of routine-wise output value or RuntimeError exception
        that either wraps an actual exception and its traceback, or
        indicates that the process was interrupted while running.
    """
    # Wrap routines into named processes and set up exceptions catching.
    queue = (
        mp.Manager().Queue()
    )  # type: Queue  # Queue[Tuple[str, Union[Any, RuntimeError]]] (py >=3.9)
    processes, names = prepare_routine_processes(routines, queue)
    # Run the processes concurrently.
    run_processes(processes, auto_stop)
    # Return success flag and re-ordered outputs and exceptions.
    success = all(process.exitcode == 0 for process in processes)
    dequeue = dict([queue.get_nowait() for _ in range(queue.qsize())])
    int_err = RuntimeError("Process was interrupted while running.")
    outputs = [dequeue.get(name, int_err) for name in names]
    return success, outputs