Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_array_pool(ma2):
pool = ArrayPool(['MA2', 'S1'])
N = 100
bs = 100
total = 1000
rej_pool = elfi.Rejection(ma2['d'], batch_size=bs, pool=pool)
means = rej_pool.sample(N, n_sim=total).sample_means_array
assert len(pool.stores['MA2']) == total/bs
assert len(pool.stores['S1']) == total/bs
assert len(pool) == total/bs
assert not 't1' in pool.stores
batch2 = pool[2]
# Test against in memory pool with using batches
pool2 = OutputPool(['MA2', 'S1'])
rej = elfi.Rejection(ma2['d'], batch_size=bs, pool=pool2, seed=pool.seed)
rej.sample(N, n_sim=total)
for bi in range(int(total/bs)):
assert np.array_equal(pool.stores['S1'][bi], pool2.stores['S1'][bi])
def test_multi_parameter_linear_adjustment():
"""A regression test against values obtained in the notebook."""
seed = 20170511
threshold = 0.2
batch_size = 1000
n_samples = 500
m = ma2.get_model(true_params=[0.6, 0.2], seed_obs=seed)
summary_names = ['S1', 'S2']
parameter_names = ['t1', 't2']
linear_adjustment = LinearAdjustment()
res = elfi.Rejection(
m['d'],
batch_size=batch_size,
output_names=['S1', 'S2'],
# output_names=summary_names, # fails ?!?!?
seed=seed).sample(
n_samples, threshold=threshold)
adjusted = adjust_posterior(
model=m,
sample=res,
parameter_names=parameter_names,
summary_names=summary_names,
adjustment=linear_adjustment)
t1 = adjusted.outputs['t1']
t2 = adjusted.outputs['t2']
t1_mean, t1_var = (0.51606048286584782, 0.017253007645871756)
def test_rejection_with_quantile():
m, true_params = setup_ma2_with_informative_data()
quantile = 0.01
N = 1000
batch_size = 20000
rej = elfi.Rejection(m['d'], batch_size=batch_size)
res = rej.sample(N, quantile=quantile)
check_inference_with_informative_data(res.samples, N, true_params)
# Check that there are no repeating values indicating a seeding problem
assert len(np.unique(res.discrepancies)) == N
assert res.accept_rate == quantile
assert res.n_sim == int(N / quantile)
def test_become_with_priors(self, ma2):
parameters = ma2.parameter_names.copy()
parent_names = ma2.get_parents('t1')
ma2['t1'].become(elfi.Prior('uniform', 0, model=ma2))
# Test that parameters are preserved
assert parameters == ma2.parameter_names
# Test that hidden nodes are removed
for name in parent_names:
assert not ma2.has_node(name)
# Test that inference still works
r = elfi.Rejection(ma2, 'd')
r.sample(10)
def test_multivariate(multivariate_model):
n_samples = 10
rej = elfi.Rejection(multivariate_model['d'], batch_size=5)
sample = rej.sample(n_samples)
assert sample.outputs['t1'].shape == (n_samples, 3)
assert sample.outputs['d'].shape == (n_samples,)
assert sample.is_multivariate
def test_pool_restarts(ma2):
pool = elfi.ArrayPool(['t1', 'd'], name='test')
rej = elfi.Rejection(ma2, 'd', batch_size=10, pool=pool, seed=123)
rej.sample(1, n_sim=30)
pool.save()
# Do not save the pool...
rej = elfi.Rejection(ma2, 'd', batch_size=10, pool=pool)
rej.set_objective(3, n_sim=60)
while not rej.finished:
rej.iterate()
# ...but just flush the array content
pool.get_store('t1').array.fs.flush()
pool.get_store('d').array.fs.flush()
assert(len(pool)==6)
assert(len(pool.stores['t1'].array)==60)
def test_compare_models():
m = gauss.get_model()
res1 = elfi.Rejection(m['d']).sample(100)
# use less informative prior
m['mu'].become(elfi.Prior('uniform', -10, 50))
res2 = elfi.Rejection(m['d']).sample(100)
# use different simulator
m['gauss'].become(elfi.Simulator(ma2.MA2, m['mu'], m['sigma'], observed=m.observed['gauss']))
res3 = elfi.Rejection(m['d']).sample(100)
p = elfi.compare_models([res1, res2, res3])
assert p[0] > p[1]
assert p[1] > p[2]
def test_multiprocessing_kwargs(simple_model):
pre = elfi.get_client()
m = simple_model
num_proc = 2
elfi.set_client('multiprocessing', num_processes=num_proc)
rej = elfi.Rejection(m['k1'])
assert rej.client.num_cores == num_proc
elfi.set_client('multiprocessing', processes=num_proc)
rej = elfi.Rejection(m['k1'])
assert rej.client.num_cores == num_proc
elfi.set_client(pre)
N = 100
bs = 100
total = 1000
rej_pool = elfi.Rejection(ma2['d'], batch_size=bs, pool=pool)
means = rej_pool.sample(N, n_sim=total).sample_means_array
assert len(pool.stores['MA2']) == total/bs
assert len(pool.stores['S1']) == total/bs
assert len(pool) == total/bs
assert not 't1' in pool.stores
batch2 = pool[2]
# Test against in memory pool with using batches
pool2 = OutputPool(['MA2', 'S1'])
rej = elfi.Rejection(ma2['d'], batch_size=bs, pool=pool2, seed=pool.seed)
rej.sample(N, n_sim=total)
for bi in range(int(total/bs)):
assert np.array_equal(pool.stores['S1'][bi], pool2.stores['S1'][bi])
# Test running the inference again
rej_pool.sample(N, n_sim=total)
# Test using the same pool with another sampler
rej_pool_new = elfi.Rejection(ma2['d'], batch_size=bs, pool=pool)
assert len(pool) == total/bs
assert np.array_equal(means, rej_pool_new.sample(N, n_sim=total).sample_means_array)
# Test closing and opening the pool
pool.close()
pool = ArrayPool.open(pool.name)
assert len(pool) == total/bs
def test_sample_object_to_dict():
data_rej = OrderedDict()
data_smc = OrderedDict()
m = get_model(n_obs=100, true_params=[.6, .2])
batch_size, n = 1, 2
schedule = [0.7, 0.2, 0.05]
rej = elfi.Rejection(m['d'], batch_size=batch_size)
res_rej = rej.sample(n, threshold=0.1)
smc = elfi.SMC(m['d'], batch_size=batch_size)
res_smc = smc.sample(n, schedule)
sample_object_to_dict(data_rej, res_rej)
sample_object_to_dict(data_smc, res_smc, skip='populations')
assert any(x not in data_rej for x in ['meta', 'output']) is True
assert any(x not in data_smc for x in ['meta', 'output', 'populations']) is True