Work Queue Executor
Work Queue is a distributed computing framework used to build large scale manager-worker applications, developed by the Cooperative Computing Lab (CCL) at the University of Notre Dame. This executor functions as the manager program which divides up a Coffea data analysis workload into discrete tasks. A large number of worker processes running on cluster or cloud systems will execute the tasks.
To set up Coffea and Work Queue together, you may need to create a Conda environment, install the software, and then create a tarball containing the environment. The tarball is sent to each worker in order to provide the same environment as the manager machine.
# Create a new environment
conda create --yes --name coffea-env -c conda-forge python coffea xrootd ndcctools conda conda-pack
conda activate coffea-env
# Pack the environment into a portable tarball.
conda-pack --output coffea-env.tar.gz
To run an analysis, you must set up a work queue executor with appropriate arguments. Here is a complete example:
###############################################################################
# Example of Coffea with the Work Queue executor.
#
# To execute, start this application, and then start workers that will connect
# to it and execute tasks.
#
# Note that, as written, this only processes 4 data chunks and should complete
# in a short time. For a real run, change maxchunks=None in the main program
# below.
#
# For simple testing this script will automatically use one local worker. To
# scale this up, see the wq.Factory configuration below to change to your
# favorite batch system.
###############################################################################
###############################################################################
# Sample processor class given in the Coffea manual.
###############################################################################
import work_queue as wq
from coffea.processor import Runner
from coffea.processor import WorkQueueExecutor
###############################################################################
# Collect and display setup info.
###############################################################################
print("------------------------------------------------")
print("Example Coffea Analysis with Work Queue Executor")
print("------------------------------------------------")
import getpass
wq_manager_name = "coffea-wq-{}".format(getpass.getuser())
wq_port = 9123
print("Manager Name: -M " + wq_manager_name)
print("------------------------------------------------")
###############################################################################
# Define a custom Coffea processor
###############################################################################
from coffea import processor
from coffea.nanoevents.methods import candidate
import hist
from collections import defaultdict
import awkward as ak
# register our candidate behaviors
ak.behavior.update(candidate.behavior)
class MyProcessor(processor.ProcessorABC):
@property
def accumulator(self):
return {
"sumw": defaultdict(float),
"mass": hist.Hist(
hist.axis.StrCategory([], name="dataset", label="Dataset"),
hist.axis.Regular(
60, 60, 120, name="mass", label=r"$m_{\mu\mu}$ [GeV]"
),
name="Events",
),
}
def process(self, events):
# Note: This is required to ensure that behaviors are registered
# when running this code in a remote task.
ak.behavior.update(candidate.behavior)
output = self.accumulator
dataset = events.metadata["dataset"]
muons = ak.zip(
{
"pt": events.Muon_pt,
"eta": events.Muon_eta,
"phi": events.Muon_phi,
"mass": events.Muon_mass,
"charge": events.Muon_charge,
},
with_name="PtEtaPhiMCandidate",
)
cut = (ak.num(muons) == 2) & (ak.sum(muons.charge) == 0)
# add first and second muon in every event together
dimuon = muons[cut][:, 0] + muons[cut][:, 1]
output["sumw"][dataset] += len(events)
output["mass"].fill(
dataset=dataset,
mass=dimuon.mass,
)
return output
def postprocess(self, accumulator):
return accumulator
###############################################################################
# Sample data sources come from CERN opendata.
###############################################################################
fileset = {
"DoubleMuon": [
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012C_DoubleMuParked.root",
],
}
###############################################################################
# Configuration of the Work Queue Executor
###############################################################################
# secret passed between manager and workers for authentication
my_password_file = "password.txt"
with open(my_password_file, "w") as f:
f.write("my_secret_password")
work_queue_executor_args = {
# Automatically allocate cores, memory and disk to tasks. Adjusts to
# maximum values measured. Initially, tasks use whole workers.
"resources_mode": "auto",
# Split a processing task in half according to its chunksize when it
# exhausts the resources allocated to it.
"split_on_exhaustion": True,
# Options to control how workers find this manager.
"master_name": wq_manager_name,
# Port for manager to listen on: if zero, will choose automatically.
"port": wq_port,
# Secret passed between manager and workers
"password_file": my_password_file,
# The named conda environment tarball will be transferred to each worker,
# and activated. This is useful when coffea is not installed in the remote
# machines. conda enviroments are created with conda-pack, and should at
# least include coffea, ndcctools (both from conda-forge channel)
# and their dependencies.
#
# "environment_file": "coffea-env.tar.gz",
# Debugging: Display notes about each task submitted/complete.
"verbose": True,
# Debugging: Display output of task if not empty.
"print_stdout": False,
# Debugging: Produce a lot at the manager side of things.
"debug_log": "coffea-wq.log",
}
executor = WorkQueueExecutor(**work_queue_executor_args)
###############################################################################
# Run the analysis using local Work Queue workers
###############################################################################
import time
tstart = time.time()
workers = wq.Factory(
# local runs:
batch_type="local",
manager_host_port="localhost:{}".format(wq_port)
# with a batch system, e.g., condor.
# (If coffea not at the installation site, then a conda
# environment_file should be defined in the work_queue_executor_args.)
# batch_type="condor", manager_name=wq_manager_name
)
workers.max_workers = 2
workers.min_workers = 1
workers.cores = 2
workers.memory = 1000 # MB
workers.disk = 2000 # MB
workers.password = my_password_file
# Instead of declaring the python environment per task, you can set it in
# the factory directly. This is useful if you are going to run a workflow
# several times using the same set of workers. It also ensures that the worker
# itself executes in a friendly environment.
# workers.python_package = "coffea-env.tar.gz"
#
# The factory tries to write temporary files to $TMPDIR (usually /tmp). When
# this is not available, or causes errors, this scracth directory can be
# manually set.
# workers.scratch_dir = "./my-scratch-dir"
with workers:
# define the Runner instance
run_fn = Runner(
executor=executor,
chunksize=100000,
maxchunks=4, # change this to None for a large run
)
# execute the analysis on the given dataset
hists = run_fn(fileset, "Events", MyProcessor())
elapsed = time.time() - tstart
print(hists)
print(hists["mass"])
# (assert only valid when using maxchunks=4)
assert hists["sumw"]["DoubleMuon"] == 400224
When executing this example, you should see that Coffea begins to run, and a progress bar shows the creation of tasks. Workers are created locally using the factory declared.
You can also launch workers outside python. For testing purposes, you can start a single worker on the same machine, and direct it to connect to your manager process, like this:
work_queue_worker -P password.txt <hostname> 9123
Or:
work_queue_worker -P password.txt -M coffea-wq-${USER}
With a single worker, the process will be gradual as it completes one task (or a few tasks) at a time. The output will be similar to this:
------------------------------------------------
Example Coffea Analysis with Work Queue Executor
------------------------------------------------
Manager Name: -M coffea-wq-btovar
------------------------------------------------
Listening for work queue workers on port 9123.
submitted preprocessing task id 1 item pre_0, with 1 file
submitted preprocessing task id 2 item pre_1, with 1 file
preprocessing task id 2 item pre_1 with 1 events on localhost. return code 0 (success)
allocated cores: 2.0, memory: 1000 MB, disk 2000 MB, gpus: 0.0
measured cores: 0.3, memory: 120 MB, disk 6 MB, gpus: 0.0, runtime 3.1 s
preprocessing task id 1 item pre_0 with 1 events on localhost. return code 0 (success)
allocated cores: 2.0, memory: 1000 MB, disk 2000 MB, gpus: 0.0
measured cores: 0.3, memory: 120 MB, disk 6 MB, gpus: 0.0, runtime 2.9 s
submitted processing task id 3 item p_2, with 100056 event
submitted processing task id 4 item p_3, with 100056 event
submitted processing task id 5 item p_4, with 100056 event
submitted processing task id 6 item p_5, with 100056 event
Preprocessing 100% ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2/2 [ 0:00:06 < 0:00:00 | 0.3 file/s ]
Submitted 0% 0/400224 [ 0:00:00 < -:--:-- | ? event/s ]
Processed 0% 0/400224 [ 0:00:00 < -:--:-- | ? event/s ]
Accumulated 0% 0/1 [ 0:00:00 < -:--:-- | ? tasks/s ]
To run at larger scale, you will need to run a large number of workers on a cluster or other infrastructure. For example, to submit 32 workers to an HTCondor pool:
condor_submit_workers -M coffea-wq-${USER} -P password.txt 1
Similarly, you can run the worker’s factory outside the manager. In that way, you can have the manager and the factory running on different machines:
work_queue_factory -T condor -M coffea-wq-${USER} -P password.txt --max-workers 10 --cores 8 --python-env=env.tar.gz
For more information on starting and managing workers on various batch systems and clusters, see the Work Queue documentation