Runner

class coffea.processor.Runner(executor: ~coffea.processor.executor.ExecutorBase, pre_executor: ~coffea.processor.executor.ExecutorBase | None = None, chunksize: int = 100000, maxchunks: int | None = None, metadata_cache: ~collections.abc.MutableMapping | None = None, dynamic_chunksize: ~typing.Dict | None = None, skipbadfiles: bool = False, xrootdtimeout: int | None = 60, align_clusters: bool = False, savemetrics: bool = False, mmap: bool = False, schema: ~coffea.nanoevents.schemas.base.BaseSchema | None = <class 'coffea.nanoevents.schemas.base.BaseSchema'>, cachestrategy: ~typing.Literal['dask-worker'] | ~typing.Callable[[...], ~collections.abc.MutableMapping] | None = None, processor_compression: int = 1, use_skyhook: bool | None = False, skyhook_options: ~typing.Dict | None = <factory>, format: str = 'root')[source]

Bases: object

A tool to run a processor using uproot for data delivery

A convenience wrapper to submit jobs for a file set, which is a dictionary of dataset: [file list] entries. Supports only uproot TTree reading, via NanoEvents or LazyDataFrame. For more customized processing, e.g. to read other objects from the files and pass them into data frames, one can write a similar function in their user code.

Parameters:
  • executor (ExecutorBase instance) – Executor, which implements a callable with inputs: items, function, accumulator and performs some action equivalent to: for item in items: accumulator += function(item)

  • pre_executor (ExecutorBase instance) – Executor, used to calculate fileset metadata Defaults to executor

  • chunksize (int, optional) – Maximum number of entries to process at a time in the data frame, default: 100k

  • maxchunks (int, optional) – Maximum number of chunks to process per dataset Defaults to processing the whole dataset

  • metadata_cache (mapping, optional) – A dict-like object to use as a cache for (file, tree) metadata that is used to determine chunking. Defaults to a in-memory LRU cache that holds 100k entries (about 1MB depending on the length of filenames, etc.) If you edit an input file (please don’t) during a session, the session can be restarted to clear the cache.

  • dynamic_chunksize (dict, optional) – Whether to adapt the chunksize for units of work to run in the targets given. Currently supported are ‘wall_time’ (in seconds), and ‘memory’ (in MB). E.g., with {“wall_time”: 120, “memory”: 2048}, the chunksize will be dynamically adapted so that processing jobs each run in about two minutes, using two GB of memory. (Currently only for the WorkQueueExecutor.)

Attributes Summary

align_clusters

cachestrategy

chunksize

dynamic_chunksize

format

maxchunks

metadata_cache

mmap

pre_executor

processor_compression

retries

savemetrics

skipbadfiles

use_dataframes

use_skyhook

xrootdtimeout

Methods Summary

__call__(fileset, treename, processor_instance)

Run the processor_instance on a given fileset

automatic_retries(retries, skipbadfiles, ...)

This should probably defined on Executor-level.

get_cache(cachestrategy)

metadata_fetcher(xrootdtimeout, ...)

preprocess(fileset, treename)

Run the processor_instance on a given fileset

read_coffea_config()

run(fileset, processor_instance[, treename])

Run the processor_instance on a given fileset

Attributes Documentation

align_clusters: bool = False
cachestrategy: Literal['dask-worker'] | Callable[[...], MutableMapping] | None = None
chunksize: int = 100000
dynamic_chunksize: Dict | None = None
format: str = 'root'
maxchunks: int | None = None
metadata_cache: MutableMapping | None = None
mmap: bool = False
pre_executor: ExecutorBase | None = None
processor_compression: int = 1
retries
savemetrics: bool = False
skipbadfiles: bool = False
use_dataframes
use_skyhook: bool | None = False
xrootdtimeout: int | None = 60

Methods Documentation

__call__(fileset: Dict, treename: str, processor_instance: ProcessorABC) Addable | MutableSet | MutableMapping[source]

Run the processor_instance on a given fileset

Parameters:
  • fileset (dict) – A dictionary {dataset: [file, file], } Optionally, if some files’ tree name differ, the dictionary can be specified: {dataset: {'treename': 'name', 'files': [file, file]}, }

  • treename (str) – name of tree inside each root file, can be None; treename can also be defined in fileset, which will override the passed treename

  • processor_instance (ProcessorABC) – An instance of a class deriving from ProcessorABC

static automatic_retries(retries: int, skipbadfiles: bool, func, *args, **kwargs)[source]

This should probably defined on Executor-level.

static get_cache(cachestrategy)[source]
static metadata_fetcher(xrootdtimeout: int, align_clusters: bool, item: FileMeta) Addable | MutableSet | MutableMapping[source]
preprocess(fileset: Dict, treename: str) Generator[source]

Run the processor_instance on a given fileset

Parameters:
  • fileset (dict) – A dictionary {dataset: [file, file], } Optionally, if some files’ tree name differ, the dictionary can be specified: {dataset: {'treename': 'name', 'files': [file, file]}, }

  • treename (str) – name of tree inside each root file, can be None; treename can also be defined in fileset, which will override the passed treename

static read_coffea_config()[source]
run(fileset: Dict | str | List[WorkItem] | Generator, processor_instance: ProcessorABC, treename: str = None) Addable | MutableSet | MutableMapping[source]

Run the processor_instance on a given fileset

Parameters:
  • fileset (dict | str | List[WorkItem] | Generator) –

    • A dictionary {dataset: [file, file], } Optionally, if some files’ tree name differ, the dictionary can be specified: {dataset: {'treename': 'name', 'files': [file, file]}, }

    • A single file name

    • File chunks for self.preprocess()

    • Chunk generator

  • treename (str, optional) – name of tree inside each root file, can be None; treename can also be defined in fileset, which will override the passed treename Not needed if processing premade chunks

  • processor_instance (ProcessorABC) – An instance of a class deriving from ProcessorABC