run_spark_job

coffea.processor.run_spark_job(fileset, processor_instance, executor, executor_args={}, spark=None, partitionsize=200000, thread_workers=16)[source]

A wrapper to submit spark jobs

A convenience wrapper to submit jobs for spark datasets, which is a dictionary of dataset: [file list] entries. Presently supports reading of parquet files converted from root. For more customized processing, e.g. to read other objects from the files and pass them into data frames, one can write a similar function in their user code.

Parameters:
  • fileset (dict) – dictionary {dataset: [file, file], }

  • processor_instance (ProcessorABC) –

    An instance of a class deriving from ProcessorABC

    Note

    The processor instance must define all the columns in data and MC that it reads as .columns

  • executor

    anything that inherits from SparkExecutor like spark_executor

    In general, a function that takes 3 arguments: items, function accumulator and performs some action equivalent to: for item in items: accumulator += function(item)

  • executor_args – arguments to send to the creation of a spark session

  • spark

    an optional already created spark instance

    if None then we create an ephemeral spark instance using a config

  • partitionsize – partition size to try to aim for (coalescese only, repartition too expensive)

  • thread_workers – how many spark jobs to let fly in parallel during processing steps