- class coffea.processor.WorkQueueExecutor(status: bool = True, unit: str = 'items', desc: str = 'Processing', compression: int | None = 9, function_name: str | None = None, retries: int = 2, manager_name: str | None = None, port: int | ~typing.Tuple[int, int] | None = None, filepath: str = '.', events_total: int | None = None, x509_proxy: str | None = None, verbose: bool = False, print_stdout: bool = False, status_display_interval: int | None = 10, debug_log: str | None = None, stats_log: str | None = None, transactions_log: str | None = None, tasks_accum_log: str | None = None, password_file: str | None = None, ssl: bool | ~typing.Tuple[str, str] = False, environment_file: str | None = None, extra_input_files: ~typing.List = <factory>, wrapper: str | None = None, resource_monitor: str | None = 'off', resources_mode: str | None = 'max-seen', split_on_exhaustion: bool | None = True, fast_terminate_workers: int | None = None, cores: int | None = None, memory: int | None = None, disk: int | None = None, gpus: int | None = None, treereduction: int = 20, chunksize: int = 100000, dynamic_chunksize: ~typing.Dict | None = None, custom_init: ~typing.Callable | None = None, bar_format: str | None = None, chunks_accum_in_mem: int | None = None, master_name: str | None = None, chunks_per_accum: int | None = None)[source]
Execute using Work Queue
For more information, see Work Queue Executor
- Parameters:
items (sequence or generator) – Sequence of input arguments
function (callable) – A function to be called on each input, which returns an accumulator instance
accumulator (Accumulatable) – An accumulator to collect the output of the function
status (bool) – If true (default), enable progress bar
unit (str) – Label of progress bar unit
desc (str) – Label of progress bar description
compression (int, optional) – Compress accumulator outputs in flight with LZ4, at level specified (default 9)
sets level to 1 (minimal compression)options (# work queue specific)
cores (int) – Maximum number of cores for work queue task. If unset, use a whole worker.
memory (int) – Maximum amount of memory (in MB) for work queue task. If unset, use a whole worker.
disk (int) – Maximum amount of disk space (in MB) for work queue task. If unset, use a whole worker.
gpus (int) – Number of GPUs to allocate to each task. If unset, use zero.
resource_monitor (str) –
If given, one of ‘off’, ‘measure’, or ‘watchdog’. Default is ‘off’. - ‘off’: turns off resource monitoring. Overriden to ‘watchdog’ if resources_mode
is not set to ‘fixed’.
- ’measure’: turns on resource monitoring for Work Queue. The
resources used per task are measured.
- ’watchdog’: in addition to measuring resources, tasks are terminated if they
go above the cores, memory, or disk specified.
resources_mode (str) –
one of ‘fixed’, ‘max-seen’, or ‘max-throughput’. Default is ‘max-seen’. Sets the strategy to automatically allocate resources to tasks. - ‘fixed’: allocate cores, memory, and disk specified for each task. - ‘max-seen’ or ‘auto’: use the cores, memory, and disk given as maximum values to allocate,
but first try each task by allocating the maximum values seen. Leads to a good compromise between parallelism and number of retries.
- ’max-throughput’: Like max-seen, but first tries the task with an
allocation that maximizes overall throughput.
If resources_mode is other than ‘fixed’, preprocessing and accumulation tasks always use the ‘max-seen’ strategy, as the former tasks always use the same resources, the latter has a distribution of resources that increases over time.
split_on_exhaustion (bool) – Whether to split a processing task in half according to its chunksize when it exhausts its the cores, memory, or disk allocated to it. If False, a task that exhausts resources permanently fails. Default is True.
fast_terminate_workers (int) – Terminate workers on which tasks have been running longer than average. The time limit is computed by multiplying the average runtime of tasks by the value of ‘fast_terminate_workers’. Since there are legitimately slow tasks, no task may trigger fast termination in two distinct workers. Less than 1 disables it.
manager_name (str) – Name to refer to this work queue manager. Sets port to 0 (any available port) if port not given.
port (int or tuple(int, int)) – Port number or range (inclusive of ports )for work queue manager program. Defaults to 9123 if manager_name not given.
password_file (str) – Location of a file containing a password used to authenticate workers.
ssl (bool or tuple(str, str)) – Enable ssl encryption between manager and workers. If a tuple, then it should be of the form (key, cert), where key and cert are paths to the files containing the key and certificate in pem format. If True, auto-signed temporary key and cert are generated for the session.
extra_input_files (list) – A list of files in the current working directory to send along with each task. Useful for small custom libraries and configuration files needed by the processor.
x509_proxy (str) – Path to the X509 user proxy. If None (the default), use the value of the environment variable X509_USER_PROXY, or fallback to the file /tmp/x509up_u${UID} if exists. If False, disables the default behavior and no proxy is sent.
environment_file (optional, str) – Conda python environment tarball to use. If not given, assume that the python environment is already setup at the execution site.
wrapper (str) – Wrapper script to run/open python environment tarball. Defaults to python_package_run found in PATH.
treereduction (int) – Number of processed chunks per accumulation task. Defaults is 20.
verbose (bool) – If true, emit a message on each task submission and completion. Default is false.
print_stdout (bool) – If true (default), print the standard output of work queue task on completion.
debug_log (str) – Filename for debug output
stats_log (str) – Filename for tasks statistics output
transactions_log (str) – Filename for tasks lifetime reports output
tasks_accum_log (str) – Filename for the log of tasks that have been processed and accumulated.
filepath (str) – Path to the parent directory where to create the staging directory. Default is “.” (current working directory).
custom_init (function, optional) – A function that takes as an argument the queue’s WorkQueue object. The function is called just before the first work unit is submitted to the queue.
Attributes Summary
Methods Summary
(items, function, accumulator)Call self as a function.
Attributes Documentation
Methods Documentation
- __call__(items: Iterable, function: Callable, accumulator: Addable | MutableSet | MutableMapping)[source]
Call self as a function.