Coffea Processors

This is a rendered copy of processor.ipynb. You can optionally run it interactively on binder at this link

Coffea relies mainly on uproot to provide access to ROOT files for analysis. As a usual analysis will involve processing tens to thousands of files, totalling gigabytes to terabytes of data, there is a certain amount of work to be done to build a parallelized framework to process the data in a reasonable amount of time. Of course, one can work directly within uproot to achieve this, as we’ll show in the beginning, but coffea provides the coffea.processor module, which allows users to worry just about the actual analysis code and not about how to implement efficient parallelization, assuming that the parallization is a trivial map-reduce operation (e.g. filling histograms and adding them together). The module provides the following key features:

  • A ProcessorABC abstract base class that can be derived from to implement the analysis code;

  • A NanoEvents interface to the arrays being read from the TTree as inputs;

  • A generic accumulate() utility to reduce the outputs to a single result, as showin in the accumulators notebook tutorial; and

  • A set of parallel executors to access multicore processing or distributed computing systems such as Dask, Parsl, Spark, WorkQueue, and others.

Let’s start by writing a simple processor class that reads some CMS open data and plots a dimuon mass spectrum. We’ll start by copying the ProcessorABC skeleton and filling in some details:

  • Remove flag, as we won’t use it

  • Adding a new histogram for \(m_{\mu \mu}\)

  • Building a Candidate record for muons, since we will read it with BaseSchema interpretation (the files used here could be read with NanoAODSchema but we want to show how to build vector objects from other TTree formats)

  • Calculating the dimuon invariant mass

[1]:
import hist
import dask
import awkward as ak
import hist.dask as hda
import dask_awkward as dak

from coffea import processor
from coffea.nanoevents.methods import candidate
from coffea.dataset_tools import (
    apply_to_fileset,
    max_chunks,
    preprocess,
)
from distributed import Client


client = Client()


class MyProcessor(processor.ProcessorABC):
    def __init__(self):
        pass

    def process(self, events):
        dataset = events.metadata['dataset']
        muons = ak.zip(
            {
                "pt": events.Muon_pt,
                "eta": events.Muon_eta,
                "phi": events.Muon_phi,
                "mass": events.Muon_mass,
                "charge": events.Muon_charge,
            },
            with_name="PtEtaPhiMCandidate",
            behavior=candidate.behavior,
        )

        h_mass = (
            hda.Hist.new
            .StrCat(["opposite", "same"], name="sign")
            .Log(1000, 0.2, 200., name="mass", label="$m_{\mu\mu}$ [GeV]")
            .Int64()
        )

        cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) == 0)
        # add first and second muon in every event together
        dimuon = muons[cut][:, 0] + muons[cut][:, 1]
        h_mass.fill(sign="opposite", mass=dimuon.mass)

        cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) != 0)
        dimuon = muons[cut][:, 0] + muons[cut][:, 1]
        h_mass.fill(sign="same", mass=dimuon.mass)

        return {
            dataset: {
                "entries": ak.num(events, axis=0),
                "mass": h_mass,
            }
        }

    def postprocess(self, accumulator):
        pass

If we were to just use bare uproot to execute this processor, we could do that with the following example, which:

  • Opens a CMS open data file

  • Creates a NanoEvents object using BaseSchema (roughly equivalent to the output of uproot.lazy)

  • Creates a MyProcessor instance

  • Runs the process() function, which returns our accumulators

[2]:
from coffea.nanoevents import NanoEventsFactory, BaseSchema

filename = "file://Run2012B_DoubleMuParked.root"
events = NanoEventsFactory.from_root(
    {filename: "Events"},
    steps_per_file=2_000,
    metadata={"dataset": "DoubleMuon"},
    schemaclass=BaseSchema,
).events()
p = MyProcessor()
out = p.process(events)
(computed,) = dask.compute(out)
print(computed)
/Users/saransh/Code/HEP/coffea/.env/lib/python3.11/site-packages/coffea/nanoevents/factory.py:299: RuntimeWarning: You have set steps_per_file to 2000, this should only be used for a
                small number of inputs (e.g. for early-stage/exploratory analysis) since it does not
                inform dask of each chunk lengths at creation time, which can cause unexpected
                slowdowns at scale. If you would like to process larger datasets please specify steps
                using the appropriate uproot "files" specification:
                    https://github.com/scikit-hep/uproot5/blob/v5.1.2/src/uproot/_dask.py#L109-L132.

  warnings.warn(
{'DoubleMuon': {'entries': 26084708, 'mass': Hist(
  StrCategory(['opposite', 'same'], name='sign'),
  Regular(1000, 0.2, 200, transform=log, name='mass', label='$m_{\\mu\\mu}$ [GeV]'),
  storage=Int64()) # Sum: 12819609.0 (12835141.0 with flow)}}
[3]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots()
computed["DoubleMuon"]["mass"].plot1d(ax=ax)
ax.set_xscale("log")
ax.legend(title="Dimuon charge")
[3]:
<matplotlib.legend.Legend at 0x29782ba10>
../_images/notebooks_processor_4_1.png

One could expand on this code to run over several chunks of the file, setting entry_start and entry_stop as appropriate. Then, several datasets could be processed by iterating over several files. However, the dask.compute and coffea.dataset_tools can help with this! We can preprocess multiple files and then use our custom MyProcessor class to generate the relevant dask task graph. Finally, the result can be obtained by calling dask.compute. Since these files are very large, we limit to just reading the first few chunks of events from each dataset with maxchunks.

[4]:
fileset = {
    'DoubleMuon': {
        "files": {
            'file://Run2012B_DoubleMuParked.root': "Events",
            'file://Run2012C_DoubleMuParked.root': "Events",
        }
    },
    'ZZ to 4mu': {
        "files": {
            'file://ZZTo4mu.root': "Events"
        }
    }
}


dataset_runnable, dataset_updated = preprocess(
    fileset,
    align_clusters=False,
    step_size=100_000,
    files_per_batch=1,
    skip_bad_files=True,
    save_form=False,
)
[5]:
to_compute = apply_to_fileset(
                MyProcessor(),
                max_chunks(dataset_runnable, 300),
                schemaclass=BaseSchema,
            )
(out,) = dask.compute(to_compute)
print(out)
{'DoubleMuon': {'DoubleMuon': {'entries': 56084708, 'mass': Hist(
  StrCategory(['opposite', 'same'], name='sign'),
  Regular(1000, 0.2, 200, transform=log, name='mass', label='$m_{\\mu\\mu}$ [GeV]'),
  storage=Int64()) # Sum: 28258324.0 (28290486.0 with flow)}}, 'ZZ to 4mu': {'ZZ to 4mu': {'entries': 1499064, 'mass': Hist(
  StrCategory(['opposite', 'same'], name='sign'),
  Regular(1000, 0.2, 200, transform=log, name='mass', label='$m_{\\mu\\mu}$ [GeV]'),
  storage=Int64()) # Sum: 288707.0 (289326.0 with flow)}}}

The run may depend on how many cores are available on the machine you are running this notebook and your connection to eospublic.cern.ch.

[6]:
fig, ax = plt.subplots(figsize=(10, 6))
out["DoubleMuon"]["DoubleMuon"]["mass"].plot1d(ax=ax)
ax.set_xscale("log")
ax.legend(title="Dimuon charge")
[6]:
<matplotlib.legend.Legend at 0x297ca0710>
../_images/notebooks_processor_9_1.png

Getting fancy

Let’s flesh out this analysis into a 4-muon analysis, searching for diboson events:

[7]:
from collections import defaultdict
import numba


@numba.njit
def find_4lep_kernel(events_leptons, builder):
    """Search for valid 4-lepton combinations from an array of events * leptons {charge, ...}

    A valid candidate has two pairs of leptons that each have balanced charge
    Outputs an array of events * candidates {indices 0..3} corresponding to all valid
    permutations of all valid combinations of unique leptons in each event
    (omitting permutations of the pairs)
    """
    for leptons in events_leptons:
        builder.begin_list()
        nlep = len(leptons)
        for i0 in range(nlep):
            for i1 in range(i0 + 1, nlep):
                if leptons[i0].charge + leptons[i1].charge != 0:
                    continue
                for i2 in range(nlep):
                    for i3 in range(i2 + 1, nlep):
                        if len({i0, i1, i2, i3}) < 4:
                            continue
                        if leptons[i2].charge + leptons[i3].charge != 0:
                            continue
                        builder.begin_tuple(4)
                        builder.index(0).integer(i0)
                        builder.index(1).integer(i1)
                        builder.index(2).integer(i2)
                        builder.index(3).integer(i3)
                        builder.end_tuple()
        builder.end_list()

    return builder


def find_4lep(events_leptons):
    if ak.backend(events_leptons) == "typetracer":
        # here we fake the output of find_4lep_kernel since
        # operating on length-zero data returns the wrong layout!
        ak.typetracer.length_zero_if_typetracer(events_leptons.charge) # force touching of the necessary data
        return ak.Array(ak.Array([[(0,0,0,0)]]).layout.to_typetracer(forget_length=True))
    return find_4lep_kernel(events_leptons, ak.ArrayBuilder()).snapshot()


class FancyDimuonProcessor(processor.ProcessorABC):
    def process(self, events):
        dataset_axis = hist.axis.StrCategory([], growth=True, name="dataset", label="Primary dataset")
        mass_axis = hist.axis.Regular(300, 0, 300, name="mass", label=r"$m_{\mu\mu}$ [GeV]")
        pt_axis = hist.axis.Regular(300, 0, 300, name="pt", label=r"$p_{T,\mu}$ [GeV]")

        h_nMuons = hda.Hist(
            dataset_axis,
            hda.hist.hist.axis.IntCategory(range(6), name="nMuons", label="Number of good muons"),
            storage="weight", label="Counts",
        )
        h_m4mu = hda.hist.Hist(dataset_axis, mass_axis, storage="weight", label="Counts")
        h_mZ1 = hda.hist.Hist(dataset_axis, mass_axis, storage="weight", label="Counts")
        h_mZ2 = hda.hist.Hist(dataset_axis, mass_axis, storage="weight", label="Counts")
        h_ptZ1mu1 = hda.hist.Hist(dataset_axis, pt_axis, storage="weight", label="Counts")
        h_ptZ1mu2 = hda.hist.Hist(dataset_axis, pt_axis, storage="weight", label="Counts")

        cutflow = defaultdict(int)

        dataset = events.metadata['dataset']
        muons = ak.zip({
            "pt": events.Muon_pt,
            "eta": events.Muon_eta,
            "phi": events.Muon_phi,
            "mass": events.Muon_mass,
            "charge": events.Muon_charge,
            "isolation": events.Muon_pfRelIso03_all,
        }, with_name="PtEtaPhiMCandidate", behavior=candidate.behavior)

        # make sure they are sorted by transverse momentum
        muons = muons[ak.argsort(muons.pt, axis=1)]

        cutflow['all events'] = ak.num(muons, axis=0)

        # impose some quality and minimum pt cuts on the muons
        muons = muons[
            (muons.pt > 5)
            & (muons.isolation < 0.2)
        ]
        cutflow['at least 4 good muons'] += ak.sum(ak.num(muons) >= 4)
        h_nMuons.fill(dataset=dataset, nMuons=ak.num(muons))

        # reduce first axis: skip events without enough muons
        muons = muons[ak.num(muons) >= 4]

        # find all candidates with helper function
        fourmuon = dak.map_partitions(find_4lep, muons)
        fourmuon = [muons[fourmuon[idx]] for idx in "0123"]

        fourmuon = ak.zip({
            "z1": ak.zip({
                "lep1": fourmuon[0],
                "lep2": fourmuon[1],
                "p4": fourmuon[0] + fourmuon[1],
            }),
            "z2": ak.zip({
                "lep1": fourmuon[2],
                "lep2": fourmuon[3],
                "p4": fourmuon[2] + fourmuon[3],
            }),
        })

        cutflow['at least one candidate'] += ak.sum(ak.num(fourmuon) > 0)

        # require minimum dimuon mass
        fourmuon = fourmuon[(fourmuon.z1.p4.mass > 60.) & (fourmuon.z2.p4.mass > 20.)]
        cutflow['minimum dimuon mass'] += ak.sum(ak.num(fourmuon) > 0)

        # choose permutation with z1 mass closest to nominal Z boson mass
        bestz1 = ak.singletons(ak.argmin(abs(fourmuon.z1.p4.mass - 91.1876), axis=1))
        fourmuon = ak.flatten(fourmuon[bestz1])

        h_m4mu.fill(
            dataset=dataset,
            mass=(fourmuon.z1.p4 + fourmuon.z2.p4).mass,
        )
        h_mZ1.fill(
            dataset=dataset,
            mass=fourmuon.z1.p4.mass,
        )
        h_mZ2.fill(
            dataset=dataset,
            mass=fourmuon.z2.p4.mass,
        )
        h_ptZ1mu1.fill(
            dataset=dataset,
            pt=fourmuon.z1.lep1.pt,
        )
        h_ptZ1mu2.fill(
            dataset=dataset,
            pt=fourmuon.z1.lep2.pt,
        )
        return {
            'nMuons': h_nMuons,
            'mass': h_m4mu,
            'mass_z1': h_mZ1,
            'mass_z2': h_mZ2,
            'pt_z1_mu1': h_ptZ1mu1,
            'pt_z1_mu2': h_ptZ1mu2,
            'cutflow': {dataset: cutflow},
        }

    def postprocess(self, accumulator):
        pass
[8]:
import time

tstart = time.time()

to_compute = apply_to_fileset(
                FancyDimuonProcessor(),
                max_chunks(dataset_runnable, 300),
                schemaclass=BaseSchema,
            )
(out,) = dask.compute(to_compute)
print(out)

elapsed = time.time() - tstart
print(elapsed)
{'DoubleMuon': {'nMuons': Hist(
  StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'),
  IntCategory([0, 1, 2, 3, 4, 5], name='nMuons', label='Number of good muons'),
  storage=Weight()) # Sum: WeightedSum(value=5.59547e+07, variance=5.59547e+07) (WeightedSum(value=5.60847e+07, variance=5.60847e+07) with flow), 'mass': Hist(
  StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'),
  Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'),
  storage=Weight()) # Sum: WeightedSum(value=54219, variance=54219) (WeightedSum(value=60748, variance=60748) with flow), 'mass_z1': Hist(
  StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'),
  Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'),
  storage=Weight()) # Sum: WeightedSum(value=60314, variance=60314) (WeightedSum(value=60748, variance=60748) with flow), 'mass_z2': Hist(
  StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'),
  Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'),
  storage=Weight()) # Sum: WeightedSum(value=60079, variance=60079) (WeightedSum(value=60748, variance=60748) with flow), 'pt_z1_mu1': Hist(
  StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'),
  Regular(300, 0, 300, name='pt', label='$p_{T,\\mu}$ [GeV]'),
  storage=Weight()) # Sum: WeightedSum(value=60736, variance=60736) (WeightedSum(value=60748, variance=60748) with flow), 'pt_z1_mu2': Hist(
  StrCategory(['DoubleMuon'], growth=True, name='dataset', label='Primary dataset'),
  Regular(300, 0, 300, name='pt', label='$p_{T,\\mu}$ [GeV]'),
  storage=Weight()) # Sum: WeightedSum(value=57002, variance=57002) (WeightedSum(value=60748, variance=60748) with flow), 'cutflow': {'DoubleMuon': defaultdict(<class 'int'>, {'all events': dask.awkward<numaxis0, type=Scalar, dtype=int64>, 'at least 4 good muons': dask.awkward<add, type=Scalar, dtype=int64>, 'at least one candidate': dask.awkward<add, type=Scalar, dtype=int64>, 'minimum dimuon mass': dask.awkward<add, type=Scalar, dtype=int64>})}}, 'ZZ to 4mu': {'nMuons': Hist(
  StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'),
  IntCategory([0, 1, 2, 3, 4, 5], name='nMuons', label='Number of good muons'),
  storage=Weight()) # Sum: WeightedSum(value=1.49776e+06, variance=1.49776e+06) (WeightedSum(value=1.49906e+06, variance=1.49906e+06) with flow), 'mass': Hist(
  StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'),
  Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'),
  storage=Weight()) # Sum: WeightedSum(value=79350, variance=79350) (WeightedSum(value=98261, variance=98261) with flow), 'mass_z1': Hist(
  StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'),
  Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'),
  storage=Weight()) # Sum: WeightedSum(value=98198, variance=98198) (WeightedSum(value=98261, variance=98261) with flow), 'mass_z2': Hist(
  StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'),
  Regular(300, 0, 300, name='mass', label='$m_{\\mu\\mu}$ [GeV]'),
  storage=Weight()) # Sum: WeightedSum(value=97699, variance=97699) (WeightedSum(value=98261, variance=98261) with flow), 'pt_z1_mu1': Hist(
  StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'),
  Regular(300, 0, 300, name='pt', label='$p_{T,\\mu}$ [GeV]'),
  storage=Weight()) # Sum: WeightedSum(value=98259, variance=98259) (WeightedSum(value=98261, variance=98261) with flow), 'pt_z1_mu2': Hist(
  StrCategory(['ZZ to 4mu'], growth=True, name='dataset', label='Primary dataset'),
  Regular(300, 0, 300, name='pt', label='$p_{T,\\mu}$ [GeV]'),
  storage=Weight()) # Sum: WeightedSum(value=97998, variance=97998) (WeightedSum(value=98261, variance=98261) with flow), 'cutflow': {'ZZ to 4mu': defaultdict(<class 'int'>, {'all events': dask.awkward<numaxis0, type=Scalar, dtype=int64>, 'at least 4 good muons': dask.awkward<add, type=Scalar, dtype=int64>, 'at least one candidate': dask.awkward<add, type=Scalar, dtype=int64>, 'minimum dimuon mass': dask.awkward<add, type=Scalar, dtype=int64>})}}}
109.41525983810425
[9]:
nevt = out['ZZ to 4mu']['cutflow']['ZZ to 4mu']['all events'] + out['DoubleMuon']['cutflow']['DoubleMuon']['all events']
print("Events/s:", (nevt / elapsed).compute())
Events/s: 526286.4803794603

What follows is just us looking at the output, you can execute it if you wish

[10]:
# scale ZZ simulation to expected yield
lumi = 11.6  # 1/fb
zzxs = 7200 * 0.0336**2  # approximate 8 TeV ZZ(4mu)
nzz = out['ZZ to 4mu']['cutflow']['ZZ to 4mu']['all events']

scaled = {}
for (name1, h1), (nam2, h2) in zip(out['ZZ to 4mu'].items(), out['DoubleMuon'].items()):
    if isinstance(h1, hist.Hist) and isinstance(h2, hist.Hist):
        scaled[name1] = h1.copy() + h2.copy()
        scaled[name1].view()[0, :] *= lumi * zzxs / nzz.compute()
[11]:
fig, ax = plt.subplots()
scaled['nMuons'].plot1d(ax=ax, overlay='dataset')
ax.set_yscale('log')
ax.set_ylim(1, None)
[11]:
(1, 54961323.59635242)
../_images/notebooks_processor_16_1.png
[12]:
fig, ax = plt.subplots()

scaled['mass'][:, ::hist.rebin(4)].plot1d(ax=ax, overlay='dataset');
../_images/notebooks_processor_17_0.png
[13]:
fig, ax = plt.subplots()

scaled['mass_z1'].plot1d(ax=ax, overlay='dataset');
../_images/notebooks_processor_18_0.png
[14]:
fig, ax = plt.subplots()

scaled['mass_z2'].plot1d(ax=ax, overlay='dataset')
ax.set_xlim(2, 300)
[14]:
(2.0, 300.0)
../_images/notebooks_processor_19_1.png
[15]:
fig, ax = plt.subplots()

scaled['pt_z1_mu1'].plot1d(ax=ax, overlay='dataset');
../_images/notebooks_processor_20_0.png
[16]:
fig, ax = plt.subplots()

scaled['pt_z1_mu2'].plot1d(ax=ax, overlay='dataset');
../_images/notebooks_processor_21_0.png