Work Queue Executor

Work Queue is a distributed computing framework used to build large scale manager-worker applications, developed by the Cooperative Computing Lab (CCL) at the University of Notre Dame. This executor functions as the manager program which divides up a Coffea data analysis workload into discrete tasks. A large number of worker processes running on cluster or cloud systems will execute the tasks.

To set up Coffea and Work Queue together, you may need to create a Conda environment, install the software, and then create a tarball containing the environment. The tarball is sent to each worker in order to provide the same environment as the manager machine.

# Create a new environment
conda create --yes --name coffea-env -c conda-forge python coffea xrootd ndcctools conda conda-pack
conda activate coffea-env

# Pack the environment into a portable tarball.
conda-pack --output coffea-env.tar.gz

To run an analysis, you must set up a work queue executor with appropriate arguments. Here is a complete example:

###############################################################################
# Example of Coffea with the Work Queue executor.
#
# To execute, start this application, and then start workers that will connect
# to it and execute tasks.
#
# Note that, as written, this only processes 4 data chunks and should complete
# in a short time.  For a real run, change maxchunks=None in the main program
# below.
#
# For simple testing this script will automatically use one local worker. To
# scale this up, see the wq.Factory configuration below to change to your
# favorite batch system.
###############################################################################

###############################################################################
# Sample processor class given in the Coffea manual.
###############################################################################
import work_queue as wq

from coffea.processor import Runner
from coffea.processor import WorkQueueExecutor

###############################################################################
# Collect and display setup info.
###############################################################################

print("------------------------------------------------")
print("Example Coffea Analysis with Work Queue Executor")
print("------------------------------------------------")

import getpass

wq_manager_name = "coffea-wq-{}".format(getpass.getuser())
wq_port = 9123

print("Manager Name: -M " + wq_manager_name)
print("------------------------------------------------")


###############################################################################
# Define a custom Coffea processor
###############################################################################

from coffea import processor
from coffea.nanoevents.methods import candidate
import hist
from collections import defaultdict
import awkward as ak

# register our candidate behaviors
ak.behavior.update(candidate.behavior)


class MyProcessor(processor.ProcessorABC):
    @property
    def accumulator(self):
        return {
            "sumw": defaultdict(float),
            "mass": hist.Hist(
                hist.axis.StrCategory([], name="dataset", label="Dataset"),
                hist.axis.Regular(
                    60, 60, 120, name="mass", label=r"$m_{\mu\mu}$ [GeV]"
                ),
                name="Events",
            ),
        }

    def process(self, events):
        # Note: This is required to ensure that behaviors are registered
        # when running this code in a remote task.
        ak.behavior.update(candidate.behavior)

        output = self.accumulator

        dataset = events.metadata["dataset"]
        muons = ak.zip(
            {
                "pt": events.Muon_pt,
                "eta": events.Muon_eta,
                "phi": events.Muon_phi,
                "mass": events.Muon_mass,
                "charge": events.Muon_charge,
            },
            with_name="PtEtaPhiMCandidate",
        )

        cut = (ak.num(muons) == 2) & (ak.sum(muons.charge) == 0)
        # add first and second muon in every event together
        dimuon = muons[cut][:, 0] + muons[cut][:, 1]

        output["sumw"][dataset] += len(events)
        output["mass"].fill(
            dataset=dataset,
            mass=dimuon.mass,
        )

        return output

    def postprocess(self, accumulator):
        return accumulator


###############################################################################
# Sample data sources come from CERN opendata.
###############################################################################

fileset = {
    "DoubleMuon": [
        "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
        "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012C_DoubleMuParked.root",
    ],
}


###############################################################################
# Configuration of the Work Queue Executor
###############################################################################

# secret passed between manager and workers for authentication
my_password_file = "password.txt"
with open(my_password_file, "w") as f:
    f.write("my_secret_password")

work_queue_executor_args = {
    # Automatically allocate cores, memory and disk to tasks. Adjusts to
    # maximum values measured. Initially, tasks use whole workers.
    "resources_mode": "auto",
    # Split a processing task in half according to its chunksize when it
    # exhausts the resources allocated to it.
    "split_on_exhaustion": True,
    # Options to control how workers find this manager.
    "master_name": wq_manager_name,
    # Port for manager to listen on: if zero, will choose automatically.
    "port": wq_port,
    # Secret passed between manager and workers
    "password_file": my_password_file,
    # The named conda environment tarball will be transferred to each worker,
    # and activated. This is useful when coffea is not installed in the remote
    # machines. conda enviroments are created with conda-pack, and should at
    # least include coffea, ndcctools (both from conda-forge channel)
    # and their dependencies.
    #
    # "environment_file": "coffea-env.tar.gz",
    # Debugging: Display notes about each task submitted/complete.
    "verbose": True,
    # Debugging: Display output of task if not empty.
    "print_stdout": False,
    # Debugging: Produce a lot at the manager side of things.
    "debug_log": "coffea-wq.log",
}

executor = WorkQueueExecutor(**work_queue_executor_args)


###############################################################################
# Run the analysis using local Work Queue workers
###############################################################################

import time

tstart = time.time()

workers = wq.Factory(
    # local runs:
    batch_type="local",
    manager_host_port="localhost:{}".format(wq_port)
    # with a batch system, e.g., condor.
    # (If coffea not at the installation site, then a conda
    # environment_file should be defined in the work_queue_executor_args.)
    # batch_type="condor", manager_name=wq_manager_name
)

workers.max_workers = 2
workers.min_workers = 1
workers.cores = 2
workers.memory = 1000  # MB
workers.disk = 2000  # MB
workers.password = my_password_file

# Instead of declaring the python environment per task, you can set it in
# the factory directly. This is useful if you are going to run a workflow
# several times using the same set of workers. It also ensures that the worker
# itself executes in a friendly environment.
# workers.python_package = "coffea-env.tar.gz"
#
# The factory tries to write temporary files to $TMPDIR (usually /tmp). When
# this is not available, or causes errors, this scracth directory can be
# manually set.
# workers.scratch_dir = "./my-scratch-dir"

with workers:
    # define the Runner instance
    run_fn = Runner(
        executor=executor,
        chunksize=100000,
        maxchunks=4,  # change this to None for a large run
    )
    # execute the analysis on the given dataset
    hists = run_fn(fileset, "Events", MyProcessor())

elapsed = time.time() - tstart


print(hists)
print(hists["mass"])

# (assert only valid when using maxchunks=4)
assert hists["sumw"]["DoubleMuon"] == 400224

When executing this example, you should see that Coffea begins to run, and a progress bar shows the creation of tasks. Workers are created locally using the factory declared.

You can also launch workers outside python. For testing purposes, you can start a single worker on the same machine, and direct it to connect to your manager process, like this:

work_queue_worker -P password.txt <hostname> 9123

Or:

work_queue_worker -P password.txt -M coffea-wq-${USER}

With a single worker, the process will be gradual as it completes one task (or a few tasks) at a time. The output will be similar to this:

------------------------------------------------
Example Coffea Analysis with Work Queue Executor
------------------------------------------------
Manager Name: -M coffea-wq-btovar
------------------------------------------------
Listening for work queue workers on port 9123.
submitted preprocessing task id 1 item pre_0, with 1 file
submitted preprocessing task id 2 item pre_1, with 1 file
preprocessing task id 2 item pre_1 with 1 events on localhost. return code 0 (success)
allocated cores: 2.0, memory: 1000 MB, disk 2000 MB, gpus: 0.0
measured cores: 0.3, memory: 120 MB, disk 6 MB, gpus: 0.0, runtime 3.1 s
preprocessing task id 1 item pre_0 with 1 events on localhost. return code 0 (success)
allocated cores: 2.0, memory: 1000 MB, disk 2000 MB, gpus: 0.0
measured cores: 0.3, memory: 120 MB, disk 6 MB, gpus: 0.0, runtime 2.9 s
submitted processing task id 3 item p_2, with 100056 event
submitted processing task id 4 item p_3, with 100056 event
submitted processing task id 5 item p_4, with 100056 event
submitted processing task id 6 item p_5, with 100056 event
Preprocessing 100% ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━      2/2 [ 0:00:06 < 0:00:00 | 0.3  file/s ]
    Submitted   0%                                  0/400224 [ 0:00:00 < -:--:-- | ?   event/s ]
    Processed   0%                                  0/400224 [ 0:00:00 < -:--:-- | ?   event/s ]
    Accumulated 0%                                       0/1 [ 0:00:00 < -:--:-- | ?   tasks/s ]

To run at larger scale, you will need to run a large number of workers on a cluster or other infrastructure. For example, to submit 32 workers to an HTCondor pool:

condor_submit_workers -M coffea-wq-${USER} -P password.txt 1

Similarly, you can run the worker’s factory outside the manager. In that way, you can have the manager and the factory running on different machines:

work_queue_factory -T condor -M coffea-wq-${USER} -P password.txt --max-workers 10 --cores 8 --python-env=env.tar.gz

For more information on starting and managing workers on various batch systems and clusters, see the Work Queue documentation