WorkQueueExecutor
- class coffea.processor.WorkQueueExecutor(status: bool = True, unit: str = 'items', desc: str = 'Processing', compression: int | None = 9, function_name: str | None = None, retries: int = 2, manager_name: str | None = None, port: int | ~typing.Tuple[int, int] | None = None, filepath: str = '.', events_total: int | None = None, x509_proxy: str | None = None, verbose: bool = False, print_stdout: bool = False, status_display_interval: int | None = 10, debug_log: str | None = None, stats_log: str | None = None, transactions_log: str | None = None, tasks_accum_log: str | None = None, password_file: str | None = None, ssl: bool | ~typing.Tuple[str, str] = False, environment_file: str | None = None, extra_input_files: ~typing.List = <factory>, wrapper: str | None = None, resource_monitor: str | None = 'off', resources_mode: str | None = 'max-seen', split_on_exhaustion: bool | None = True, fast_terminate_workers: int | None = None, cores: int | None = None, memory: int | None = None, disk: int | None = None, gpus: int | None = None, treereduction: int = 20, chunksize: int = 100000, dynamic_chunksize: ~typing.Dict | None = None, custom_init: ~typing.Callable | None = None, bar_format: str | None = None, chunks_accum_in_mem: int | None = None, master_name: str | None = None, chunks_per_accum: int | None = None)[source]
Bases:
ExecutorBase
Execute using Work Queue
For more information, see Work Queue Executor
- Parameters:
items (sequence or generator) – Sequence of input arguments
function (callable) – A function to be called on each input, which returns an accumulator instance
accumulator (Accumulatable) – An accumulator to collect the output of the function
status (bool) – If true (default), enable progress bar
unit (str) – Label of progress bar unit
desc (str) – Label of progress bar description
compression (int, optional) – Compress accumulator outputs in flight with LZ4, at level specified (default 9)
None`
sets level to 1 (minimal compression)options (# work queue specific)
cores (int) – Maximum number of cores for work queue task. If unset, use a whole worker.
memory (int) – Maximum amount of memory (in MB) for work queue task. If unset, use a whole worker.
disk (int) – Maximum amount of disk space (in MB) for work queue task. If unset, use a whole worker.
gpus (int) – Number of GPUs to allocate to each task. If unset, use zero.
resource_monitor (str) –
If given, one of ‘off’, ‘measure’, or ‘watchdog’. Default is ‘off’. - ‘off’: turns off resource monitoring. Overriden to ‘watchdog’ if resources_mode
is not set to ‘fixed’.
- ’measure’: turns on resource monitoring for Work Queue. The
resources used per task are measured.
- ’watchdog’: in addition to measuring resources, tasks are terminated if they
go above the cores, memory, or disk specified.
resources_mode (str) –
one of ‘fixed’, ‘max-seen’, or ‘max-throughput’. Default is ‘max-seen’. Sets the strategy to automatically allocate resources to tasks. - ‘fixed’: allocate cores, memory, and disk specified for each task. - ‘max-seen’ or ‘auto’: use the cores, memory, and disk given as maximum values to allocate,
but first try each task by allocating the maximum values seen. Leads to a good compromise between parallelism and number of retries.
- ’max-throughput’: Like max-seen, but first tries the task with an
allocation that maximizes overall throughput.
If resources_mode is other than ‘fixed’, preprocessing and accumulation tasks always use the ‘max-seen’ strategy, as the former tasks always use the same resources, the latter has a distribution of resources that increases over time.
split_on_exhaustion (bool) – Whether to split a processing task in half according to its chunksize when it exhausts its the cores, memory, or disk allocated to it. If False, a task that exhausts resources permanently fails. Default is True.
fast_terminate_workers (int) – Terminate workers on which tasks have been running longer than average. The time limit is computed by multiplying the average runtime of tasks by the value of ‘fast_terminate_workers’. Since there are legitimately slow tasks, no task may trigger fast termination in two distinct workers. Less than 1 disables it.
manager_name (str) – Name to refer to this work queue manager. Sets port to 0 (any available port) if port not given.
port (int or tuple(int, int)) – Port number or range (inclusive of ports )for work queue manager program. Defaults to 9123 if manager_name not given.
password_file (str) – Location of a file containing a password used to authenticate workers.
ssl (bool or tuple(str, str)) – Enable ssl encryption between manager and workers. If a tuple, then it should be of the form (key, cert), where key and cert are paths to the files containing the key and certificate in pem format. If True, auto-signed temporary key and cert are generated for the session.
extra_input_files (list) – A list of files in the current working directory to send along with each task. Useful for small custom libraries and configuration files needed by the processor.
x509_proxy (str) – Path to the X509 user proxy. If None (the default), use the value of the environment variable X509_USER_PROXY, or fallback to the file /tmp/x509up_u${UID} if exists. If False, disables the default behavior and no proxy is sent.
environment_file (optional, str) – Conda python environment tarball to use. If not given, assume that the python environment is already setup at the execution site.
wrapper (str) – Wrapper script to run/open python environment tarball. Defaults to python_package_run found in PATH.
treereduction (int) – Number of processed chunks per accumulation task. Defaults is 20.
verbose (bool) – If true, emit a message on each task submission and completion. Default is false.
print_stdout (bool) – If true (default), print the standard output of work queue task on completion.
debug_log (str) – Filename for debug output
stats_log (str) – Filename for tasks statistics output
transactions_log (str) – Filename for tasks lifetime reports output
tasks_accum_log (str) – Filename for the log of tasks that have been processed and accumulated.
filepath (str) – Path to the parent directory where to create the staging directory. Default is “.” (current working directory).
custom_init (function, optional) – A function that takes as an argument the queue’s WorkQueue object. The function is called just before the first work unit is submitted to the queue.
Attributes Summary
Methods Summary
__call__
(items, function, accumulator)Call self as a function.
Attributes Documentation
Methods Documentation
- __call__(items: Iterable, function: Callable, accumulator: Addable | MutableSet | MutableMapping)[source]
Call self as a function.