Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_weights():
from coffea.processor import Weights
counts, test_eta, test_pt = dummy_jagged_eta_pt()
scale_central = np.random.normal(loc=1.0, scale=0.01, size=counts.size)
scale_up = scale_central * 1.10
scale_down = scale_central * 0.95
scale_up_shift = 0.10 * scale_central
scale_down_shift = 0.05 * scale_central
weight = Weights(counts.size)
weight.add('test', scale_central, weightUp=scale_up, weightDown=scale_down)
weight.add('testShift', scale_central, weightUp=scale_up_shift,
weightDown=scale_down_shift, shift=True)
var_names = weight.variations
expected_names = ['testShiftUp', 'testShiftDown', 'testUp', 'testDown']
for name in expected_names:
assert(name in var_names)
test_central = weight.weight()
import pandas as pd
import pickle as pkl
import lz4.frame as lz4f
from coffea.util import numpy as np
from coffea.processor.spark.spark_executor import agg_histos_raw, reduce_histos_raw
from coffea.processor.test_items import NanoTestProcessor
proc = NanoTestProcessor()
one = proc.accumulator.identity()
two = proc.accumulator.identity()
hlist1 = [lz4f.compress(pkl.dumps(one))]
hlist2 = [lz4f.compress(pkl.dumps(one)),lz4f.compress(pkl.dumps(two))]
harray1 = np.array(hlist1, dtype='O')
harray2 = np.array(hlist2, dtype='O')
series1 = pd.Series(harray1)
series2 = pd.Series(harray2)
df = pd.DataFrame({'histos': harray2})
# correctness of these functions is checked in test_spark_executor
agg1 = agg_histos_raw(series1, proc, 1)
agg2 = agg_histos_raw(series2, proc, 1)
red = reduce_histos_raw(df, proc, 1)
def test_clopper_pearson_interval():
from coffea.hist.plot import clopper_pearson_interval
# Reference values for CL=0.6800 calculated with ROOT's TEfficiency
num = np.array([1., 5., 10., 10.])
denom = np.array([10., 10., 10., 437.])
ref_hi = np.array([0.293313782248242, 0.6944224231766912, 1.0, 0.032438865381336446])
ref_lo = np.array([0.01728422272382846, 0.3055775768233088, 0.8325532074018731, 0.015839046981153772])
interval = clopper_pearson_interval(num, denom, coverage=0.68)
threshold = 1e-6
assert(all((interval[1, :] / ref_hi) - 1 < threshold))
assert(all((interval[0, :] / ref_lo) - 1 < threshold))
def test_clopper_pearson_interval():
from coffea.hist.plot import clopper_pearson_interval
# Reference values for CL=0.6800 calculated with ROOT's TEfficiency
num = np.array([1., 5., 10., 10.])
denom = np.array([10., 10., 10., 437.])
ref_hi = np.array([0.293313782248242, 0.6944224231766912, 1.0, 0.032438865381336446])
ref_lo = np.array([0.01728422272382846, 0.3055775768233088, 0.8325532074018731, 0.015839046981153772])
interval = clopper_pearson_interval(num, denom, coverage=0.68)
threshold = 1e-6
assert(all((interval[1, :] / ref_hi) - 1 < threshold))
assert(all((interval[0, :] / ref_lo) - 1 < threshold))
def test_hist_serdes():
import pickle
h_regular_bins = hist.Hist("regular joe",
hist.Bin("x", "x", 20, 0, 200),
hist.Bin("y", "why", 20, -3, 3))
h_regular_bins.fill(x=np.array([1.,2.,3.,4.,5.]),y=np.array([-2.,1.,0.,1.,2.]))
h_regular_bins.sum('x').identifiers('y')
spkl = pickle.dumps(h_regular_bins)
hnew = pickle.loads(spkl)
hnew.sum('x').identifiers('y')
assert(h_regular_bins._dense_shape == hnew._dense_shape)
assert(h_regular_bins._axes == hnew._axes)
def test_normal_interval():
from coffea.hist.plot import normal_interval
# Reference weighted efficiency and error from ROOTs TEfficiency
denom = np.array([ 89.01457591590004, 2177.066076428943 , 6122.5256890981855 ,
0. , 100.27757990710668])
num = np.array([ 75.14287743709515, 2177.066076428943 , 5193.454723043864 ,
0. , 84.97723540536361])
denom_sumw2 = np.array([ 94.37919737476827, 10000. , 6463.46795877633 ,
0. , 105.90898005417333])
num_sumw2 = np.array([ 67.2202147680005 , 10000. , 4647.983931785646 ,
0. , 76.01275761253757])
ref_hi = np.array([0.0514643476600107, 0. , 0.0061403263960343,
np.nan, 0.0480731185500146])
ref_lo = np.array([0.0514643476600107, 0. , 0.0061403263960343,
np.nan, 0.0480731185500146])
interval = normal_interval(num, denom, num_sumw2, denom_sumw2)
threshold = 1e-6
lo, hi = interval
assert len(ref_hi) == len(hi)
def test_lumimask():
lumimask = LumiMask("tests/samples/Cert_294927-306462_13TeV_EOY2017ReReco_Collisions17_JSON.txt")
runs = np.array([303825, 123], dtype=np.uint32)
lumis = np.array([115, 123], dtype=np.uint32)
mask = lumimask(runs, lumis)
print("mask:", mask)
assert(mask[0] == True)
assert(mask[1] == False)
# test underlying py_func
py_mask = np.zeros(dtype='bool', shape=runs.shape)
LumiMask._apply_run_lumi_mask_kernel.py_func(lumimask._masks,
runs, lumis,
py_mask)
assert(np.all(mask == py_mask))
eta2 = jca2.p4.eta
eta1 = jca1.p4.eta
print (np.sum(eta1.counts),np.sum(eta2.counts))
diffeta_temp = np.abs(eta1 - eta2)
diffeta = np.abs(jca1.p4.eta - jca2.p4.eta)
assert( (jca1.offsets == jca2.offsets).all() )
assert (diffm < 1e-8).flatten().all()
assert (diffpt < 1e-8).flatten().all()
assert (diffeta < 1e-8).flatten().all()
#test fast functions
fastfs = ['pt','eta','phi','mass']
for func in fastfs:
func1 = getattr(jca1,func)
func2 = getattr(jca1.p4,func)
dfunc = np.abs(func1 - func2)
assert (dfunc < 1e-8).flatten().all()
adistinct = jca1.distincts()
apair = jca1.pairs()
across = jca1.cross(jca2)
acrossn = jca1.cross(jca2, nested=True)
achoose2 = jca1.choose(2)
achoose3 = jca1.choose(3)
assert 'p4' in adistinct.columns
assert 'p4' in apair.columns
assert 'p4' in across.columns
assert 'p4' in acrossn.columns
assert 'p4' in achoose2.columns
assert 'p4' in achoose3.columns
def __init__(self):
self.p4 = thep4
self.px = px
self.py = py
self.pz = pz
self.en = energy
self.pt = np.hypot(px,py)
self.phi = np.arctan2(py,px)
self.eta = np.arctanh(pz/np.sqrt(px*px + py*py + pz*pz))
self.mass = np.sqrt(np.abs(energy*energy - (px*px + py*py + pz*pz)))
self.blah = energy*px
self.count = counts
addon1 = jca1.zeros_like()
addon2 = jca2.ones_like()
jca1['addon'] = addon1
jca2['addon'] = addon2
jca1.add_attributes(addonFlat=addon1.flatten(),addonJagged=addon1)
diffm = np.abs(jca1.p4.mass - jca2.p4.mass)
assert( (jca1.offsets == jca2.offsets).all() )
diffpt = np.abs(jca1.p4.pt - jca2.p4.pt)
assert( (jca1.offsets == jca2.offsets).all() )
eta2 = jca2.p4.eta
eta1 = jca1.p4.eta
print (np.sum(eta1.counts),np.sum(eta2.counts))
diffeta_temp = np.abs(eta1 - eta2)
diffeta = np.abs(jca1.p4.eta - jca2.p4.eta)
assert( (jca1.offsets == jca2.offsets).all() )
assert (diffm < 1e-8).flatten().all()
assert (diffpt < 1e-8).flatten().all()
assert (diffeta < 1e-8).flatten().all()
#test fast functions
fastfs = ['pt','eta','phi','mass']
for func in fastfs:
func1 = getattr(jca1,func)
func2 = getattr(jca1.p4,func)
dfunc = np.abs(func1 - func2)
assert (dfunc < 1e-8).flatten().all()
adistinct = jca1.distincts()
apair = jca1.pairs()
across = jca1.cross(jca2)