WorkQueueExecutor

class coffea.processor.WorkQueueExecutor(status: bool = True, unit: str = 'items', desc: str = 'Processing', compression: int | None = 9, function_name: str | None = None, retries: int = 2, manager_name: str | None = None, port: int | ~typing.Tuple[int, int] | None = None, filepath: str = '.', events_total: int | None = None, x509_proxy: str | None = None, verbose: bool = False, print_stdout: bool = False, status_display_interval: int | None = 10, debug_log: str | None = None, stats_log: str | None = None, transactions_log: str | None = None, tasks_accum_log: str | None = None, password_file: str | None = None, ssl: bool | ~typing.Tuple[str, str] = False, environment_file: str | None = None, extra_input_files: ~typing.List = <factory>, wrapper: str | None = None, resource_monitor: str | None = 'off', resources_mode: str | None = 'max-seen', split_on_exhaustion: bool | None = True, fast_terminate_workers: int | None = None, cores: int | None = None, memory: int | None = None, disk: int | None = None, gpus: int | None = None, treereduction: int = 20, chunksize: int = 100000, dynamic_chunksize: ~typing.Dict | None = None, custom_init: ~typing.Callable | None = None, bar_format: str | None = None, chunks_accum_in_mem: int | None = None, master_name: str | None = None, chunks_per_accum: int | None = None)[source]

Bases: ExecutorBase

Execute using Work Queue

For more information, see Work Queue Executor

Parameters:
  • items (sequence or generator) – Sequence of input arguments

  • function (callable) – A function to be called on each input, which returns an accumulator instance

  • accumulator (Accumulatable) – An accumulator to collect the output of the function

  • status (bool) – If true (default), enable progress bar

  • unit (str) – Label of progress bar unit

  • desc (str) – Label of progress bar description

  • compression (int, optional) – Compress accumulator outputs in flight with LZ4, at level specified (default 9) None` sets level to 1 (minimal compression)

  • options (# work queue specific)

  • cores (int) – Maximum number of cores for work queue task. If unset, use a whole worker.

  • memory (int) – Maximum amount of memory (in MB) for work queue task. If unset, use a whole worker.

  • disk (int) – Maximum amount of disk space (in MB) for work queue task. If unset, use a whole worker.

  • gpus (int) – Number of GPUs to allocate to each task. If unset, use zero.

  • resource_monitor (str) –

    If given, one of ‘off’, ‘measure’, or ‘watchdog’. Default is ‘off’. - ‘off’: turns off resource monitoring. Overriden to ‘watchdog’ if resources_mode

    is not set to ‘fixed’.

    • ’measure’: turns on resource monitoring for Work Queue. The

      resources used per task are measured.

    • ’watchdog’: in addition to measuring resources, tasks are terminated if they

      go above the cores, memory, or disk specified.

  • resources_mode (str) –

    one of ‘fixed’, ‘max-seen’, or ‘max-throughput’. Default is ‘max-seen’. Sets the strategy to automatically allocate resources to tasks. - ‘fixed’: allocate cores, memory, and disk specified for each task. - ‘max-seen’ or ‘auto’: use the cores, memory, and disk given as maximum values to allocate,

    but first try each task by allocating the maximum values seen. Leads to a good compromise between parallelism and number of retries.

    • ’max-throughput’: Like max-seen, but first tries the task with an

      allocation that maximizes overall throughput.

    If resources_mode is other than ‘fixed’, preprocessing and accumulation tasks always use the ‘max-seen’ strategy, as the former tasks always use the same resources, the latter has a distribution of resources that increases over time.

  • split_on_exhaustion (bool) – Whether to split a processing task in half according to its chunksize when it exhausts its the cores, memory, or disk allocated to it. If False, a task that exhausts resources permanently fails. Default is True.

  • fast_terminate_workers (int) – Terminate workers on which tasks have been running longer than average. The time limit is computed by multiplying the average runtime of tasks by the value of ‘fast_terminate_workers’. Since there are legitimately slow tasks, no task may trigger fast termination in two distinct workers. Less than 1 disables it.

  • manager_name (str) – Name to refer to this work queue manager. Sets port to 0 (any available port) if port not given.

  • port (int or tuple(int, int)) – Port number or range (inclusive of ports )for work queue manager program. Defaults to 9123 if manager_name not given.

  • password_file (str) – Location of a file containing a password used to authenticate workers.

  • ssl (bool or tuple(str, str)) – Enable ssl encryption between manager and workers. If a tuple, then it should be of the form (key, cert), where key and cert are paths to the files containing the key and certificate in pem format. If True, auto-signed temporary key and cert are generated for the session.

  • extra_input_files (list) – A list of files in the current working directory to send along with each task. Useful for small custom libraries and configuration files needed by the processor.

  • x509_proxy (str) – Path to the X509 user proxy. If None (the default), use the value of the environment variable X509_USER_PROXY, or fallback to the file /tmp/x509up_u${UID} if exists. If False, disables the default behavior and no proxy is sent.

  • environment_file (optional, str) – Conda python environment tarball to use. If not given, assume that the python environment is already setup at the execution site.

  • wrapper (str) – Wrapper script to run/open python environment tarball. Defaults to python_package_run found in PATH.

  • treereduction (int) – Number of processed chunks per accumulation task. Defaults is 20.

  • verbose (bool) – If true, emit a message on each task submission and completion. Default is false.

  • print_stdout (bool) – If true (default), print the standard output of work queue task on completion.

  • debug_log (str) – Filename for debug output

  • stats_log (str) – Filename for tasks statistics output

  • transactions_log (str) – Filename for tasks lifetime reports output

  • tasks_accum_log (str) – Filename for the log of tasks that have been processed and accumulated.

  • filepath (str) – Path to the parent directory where to create the staging directory. Default is “.” (current working directory).

  • custom_init (function, optional) – A function that takes as an argument the queue’s WorkQueue object. The function is called just before the first work unit is submitted to the queue.

Attributes Summary

bar_format

chunks_accum_in_mem

chunks_per_accum

chunksize

compression

cores

custom_init

debug_log

disk

dynamic_chunksize

environment_file

events_total

fast_terminate_workers

filepath

gpus

manager_name

master_name

memory

password_file

port

print_stdout

resource_monitor

resources_mode

retries

split_on_exhaustion

ssl

stats_log

status_display_interval

tasks_accum_log

transactions_log

treereduction

verbose

wrapper

x509_proxy

Methods Summary

__call__(items, function, accumulator)

Call self as a function.

Attributes Documentation

bar_format: str | None = None
chunks_accum_in_mem: int | None = None
chunks_per_accum: int | None = None
chunksize: int = 100000
compression: int | None = 9
cores: int | None = None
custom_init: Callable | None = None
debug_log: str | None = None
disk: int | None = None
dynamic_chunksize: Dict | None = None
environment_file: str | None = None
events_total: int | None = None
fast_terminate_workers: int | None = None
filepath: str = '.'
gpus: int | None = None
manager_name: str | None = None
master_name: str | None = None
memory: int | None = None
password_file: str | None = None
port: int | Tuple[int, int] | None = None
print_stdout: bool = False
resource_monitor: str | None = 'off'
resources_mode: str | None = 'max-seen'
retries: int = 2
split_on_exhaustion: bool | None = True
ssl: bool | Tuple[str, str] = False
stats_log: str | None = None
status_display_interval: int | None = 10
tasks_accum_log: str | None = None
transactions_log: str | None = None
treereduction: int = 20
verbose: bool = False
wrapper: str | None = None
x509_proxy: str | None = None

Methods Documentation

__call__(items: Iterable, function: Callable, accumulator: Addable | MutableSet | MutableMapping)[source]

Call self as a function.