Runner
- class coffea.processor.Runner(executor: ~coffea.processor.executor.ExecutorBase, pre_executor: ~coffea.processor.executor.ExecutorBase | None = None, chunksize: int = 100000, maxchunks: int | None = None, metadata_cache: ~collections.abc.MutableMapping | None = None, dynamic_chunksize: ~typing.Dict | None = None, skipbadfiles: bool = False, xrootdtimeout: int | None = 60, align_clusters: bool = False, savemetrics: bool = False, mmap: bool = False, schema: ~coffea.nanoevents.schemas.base.BaseSchema | None = <class 'coffea.nanoevents.schemas.base.BaseSchema'>, cachestrategy: ~typing.Literal['dask-worker'] | ~typing.Callable[[...], ~collections.abc.MutableMapping] | None = None, processor_compression: int = 1, use_skyhook: bool | None = False, skyhook_options: ~typing.Dict | None = <factory>, format: str = 'root')[source]
Bases:
object
A tool to run a processor using uproot for data delivery
A convenience wrapper to submit jobs for a file set, which is a dictionary of dataset: [file list] entries. Supports only uproot TTree reading, via NanoEvents or LazyDataFrame. For more customized processing, e.g. to read other objects from the files and pass them into data frames, one can write a similar function in their user code.
- Parameters:
executor (ExecutorBase instance) – Executor, which implements a callable with inputs: items, function, accumulator and performs some action equivalent to:
for item in items: accumulator += function(item)
pre_executor (ExecutorBase instance) – Executor, used to calculate fileset metadata Defaults to executor
chunksize (int, optional) – Maximum number of entries to process at a time in the data frame, default: 100k
maxchunks (int, optional) – Maximum number of chunks to process per dataset Defaults to processing the whole dataset
metadata_cache (mapping, optional) – A dict-like object to use as a cache for (file, tree) metadata that is used to determine chunking. Defaults to a in-memory LRU cache that holds 100k entries (about 1MB depending on the length of filenames, etc.) If you edit an input file (please don’t) during a session, the session can be restarted to clear the cache.
dynamic_chunksize (dict, optional) – Whether to adapt the chunksize for units of work to run in the targets given. Currently supported are ‘wall_time’ (in seconds), and ‘memory’ (in MB). E.g., with {“wall_time”: 120, “memory”: 2048}, the chunksize will be dynamically adapted so that processing jobs each run in about two minutes, using two GB of memory. (Currently only for the WorkQueueExecutor.)
Attributes Summary
Methods Summary
__call__
(fileset, treename, processor_instance)Run the processor_instance on a given fileset
automatic_retries
(retries, skipbadfiles, ...)This should probably defined on Executor-level.
get_cache
(cachestrategy)metadata_fetcher
(xrootdtimeout, ...)preprocess
(fileset, treename)Run the processor_instance on a given fileset
run
(fileset, processor_instance[, treename])Run the processor_instance on a given fileset
Attributes Documentation
- cachestrategy: Literal['dask-worker'] | Callable[[...], MutableMapping] | None = None
- metadata_cache: MutableMapping | None = None
- retries
- use_dataframes
Methods Documentation
- __call__(fileset: Dict, treename: str, processor_instance: ProcessorABC) Addable | MutableSet | MutableMapping [source]
Run the processor_instance on a given fileset
- Parameters:
fileset (dict) – A dictionary
{dataset: [file, file], }
Optionally, if some files’ tree name differ, the dictionary can be specified:{dataset: {'treename': 'name', 'files': [file, file]}, }
treename (str) – name of tree inside each root file, can be
None
; treename can also be defined in fileset, which will override the passed treenameprocessor_instance (ProcessorABC) – An instance of a class deriving from ProcessorABC
- static automatic_retries(retries: int, skipbadfiles: bool, func, *args, **kwargs)[source]
This should probably defined on Executor-level.
- static metadata_fetcher(xrootdtimeout: int, align_clusters: bool, item: FileMeta) Addable | MutableSet | MutableMapping [source]
- preprocess(fileset: Dict, treename: str) Generator [source]
Run the processor_instance on a given fileset
- Parameters:
fileset (dict) – A dictionary
{dataset: [file, file], }
Optionally, if some files’ tree name differ, the dictionary can be specified:{dataset: {'treename': 'name', 'files': [file, file]}, }
treename (str) – name of tree inside each root file, can be
None
; treename can also be defined in fileset, which will override the passed treename
- run(fileset: Dict | str | List[WorkItem] | Generator, processor_instance: ProcessorABC, treename: str = None) Addable | MutableSet | MutableMapping [source]
Run the processor_instance on a given fileset
- Parameters:
fileset (dict | str | List[WorkItem] | Generator) –
A dictionary
{dataset: [file, file], }
Optionally, if some files’ tree name differ, the dictionary can be specified:{dataset: {'treename': 'name', 'files': [file, file]}, }
A single file name
File chunks for self.preprocess()
Chunk generator
treename (str, optional) – name of tree inside each root file, can be
None
; treename can also be defined in fileset, which will override the passed treename Not needed if processing premade chunksprocessor_instance (ProcessorABC) – An instance of a class deriving from ProcessorABC