Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def make_loop(loop_state):
gpy_model = GPy.models.GPRegression(loop_state.X, loop_state.Y)
model = GPyModelWrapper(gpy_model)
return BayesianOptimizationLoop(space, model)
def test_log_acquisition_gradients():
x_init = np.random.rand(5, 2)
y_init = np.random.rand(5, 1)
gpy_model = GPy.models.GPRegression(x_init, y_init)
model = GPyModelWrapper(gpy_model)
ei = ExpectedImprovement(model)
log = LogAcquisition(ei)
x0 = np.random.rand(2)
assert np.all(scipy.optimize.check_grad(lambda x: log.evaluate_with_gradients(x[None, :])[0],
lambda x: log.evaluate_with_gradients(x[None, :])[1], x0) < 1e-6)
def test_multi_source_batch_experimental_design():
objective, space = multi_fidelity_forrester_function()
# Create initial data
random_design = RandomDesign(space)
x_init = random_design.get_samples(10)
intiial_results = objective.evaluate(x_init)
y_init = np.array([res.Y for res in intiial_results])
# Create multi source acquisition optimizer
acquisition_optimizer = GradientAcquisitionOptimizer(space)
multi_source_acquisition_optimizer = MultiSourceAcquisitionOptimizer(acquisition_optimizer, space)
# Create GP model
gpy_model = GPy.models.GPRegression(x_init, y_init)
model = GPyModelWrapper(gpy_model)
# Create acquisition
acquisition = ModelVariance(model)
# Create batch candidate point calculator
batch_candidate_point_calculator = GreedyBatchPointCalculator(model, acquisition,
multi_source_acquisition_optimizer, batch_size=5)
initial_loop_state = LoopState(intiial_results)
loop = OuterLoop(batch_candidate_point_calculator, FixedIntervalUpdater(model, 1), initial_loop_state)
loop.run_loop(objective, 10)
assert loop.loop_state.X.shape[0] == 60
def test_local_penalization():
parameter_space = ParameterSpace([ContinuousParameter('x', 0, 1)])
acquisition_optimizer = AcquisitionOptimizer(parameter_space)
x_init = np.random.rand(5, 1)
y_init = np.random.rand(5, 1)
gpy_model = GPy.models.GPRegression(x_init, y_init)
model = GPyModelWrapper(gpy_model)
acquisition = ExpectedImprovement(model)
batch_size = 5
lp_calc = LocalPenalizationPointCalculator(acquisition, acquisition_optimizer, model, parameter_space, batch_size)
loop_state = create_loop_state(x_init, y_init)
new_points = lp_calc.compute_next_points(loop_state)
assert new_points.shape == (batch_size, 1)
space = ParameterSpace([ContinuousParameter('x', 0, 1)])
x_init = np.random.rand(10, 1)
def function_with_cost(x):
return np.sin(x), x
user_fcn = UserFunctionWrapper(function_with_cost)
y_init, cost_init = function_with_cost(x_init)
gpy_model_objective = GPy.models.GPRegression(x_init, y_init)
gpy_model_cost = GPy.models.GPRegression(x_init, cost_init)
model_objective = GPyModelWrapper(gpy_model_objective)
model_cost = GPyModelWrapper(gpy_model_cost)
loop = CostSensitiveBayesianOptimizationLoop(space, model_objective, model_cost)
loop.run_loop(user_fcn, 10)
assert loop.loop_state.X.shape[0] == 20
assert loop.loop_state.cost.shape[0] == 20
def test_log_acquisition_shapes():
x_init = np.random.rand(5, 2)
y_init = np.random.rand(5, 1)
gpy_model = GPy.models.GPRegression(x_init, y_init)
model = GPyModelWrapper(gpy_model)
ei = ExpectedImprovement(model)
log = LogAcquisition(ei)
x = np.ones((5, 2))
value, gradient = log.evaluate_with_gradients(x)
assert value.shape == (5, 1)
assert gradient.shape == (5, 2)
value_2 = log.evaluate(x)
assert value_2.shape == (5, 1)
def cost_model():
x = np.random.rand(10, 2)
y = np.random.rand(10, 1) + 1
gpy_model = GPy.models.GPRegression(x, y)
return GPyModelWrapper(gpy_model)
encoding = OneHotEncoding(carol_spirits)
parameter_space = ParameterSpace([
ContinuousParameter('real_param', 0.0, 1.0),
CategoricalParameter('categorical_param', encoding)
])
x_init = parameter_space.sample_uniform(10)
assert x_init.shape == (10, 4)
assert np.all(np.logical_or(x_init[:, 1:3] == 0.0, x_init[:, 1:3] == 1.0))
y_init = objective(x_init)
gpy_model = GPy.models.GPRegression(x_init, y_init)
gpy_model.Gaussian_noise.fix(1)
model = GPyModelWrapper(gpy_model)
acquisition = ExpectedImprovement(model)
loop = BayesianOptimizationLoop(parameter_space, model, acquisition)
loop.run_loop(objective, 5)
assert len(loop.loop_state.Y) == 15
assert np.all(np.logical_or(loop.loop_state.X[:, 1:3] == 0.0, loop.loop_state.X[:, 1:3] == 1.0))
def test_loop():
n_iterations = 5
x_init = np.random.rand(5, 1)
y_init = np.random.rand(5, 1)
# Make GPy model
gpy_model = GPy.models.GPRegression(x_init, y_init)
model = GPyModelWrapper(gpy_model)
space = ParameterSpace([ContinuousParameter('x', 0, 1)])
acquisition = ModelVariance(model)
# Make loop and collect points
exp_design = ExperimentalDesignLoop(space, model, acquisition)
exp_design.run_loop(UserFunctionWrapper(f), FixedIterationsStoppingCondition(n_iterations))
# Check we got the correct number of points
assert exp_design.loop_state.X.shape[0] == 10
def test_iteration_end_event():
space = ParameterSpace([ContinuousParameter('x', 0, 1)])
def user_function(x):
return x
x_test = np.linspace(0, 1)[:, None]
y_test = user_function(x_test)
x_init = np.linspace(0, 1, 5)[:, None]
y_init = user_function(x_init)
gpy_model = GPy.models.GPRegression(x_init, y_init)
model = GPyModelWrapper(gpy_model)
mse = []
def compute_mse(self, loop_state):
mse.append(np.mean(np.square(model.predict(x_test)[0] - y_test)))
loop_state = create_loop_state(x_init, y_init)
acquisition = ModelVariance(model)
acquisition_optimizer = AcquisitionOptimizer(space)
candidate_point_calculator = SequentialPointCalculator(acquisition, acquisition_optimizer)
model_updater = FixedIntervalUpdater(model)
loop = OuterLoop(candidate_point_calculator, model_updater, loop_state)
loop.iteration_end_event.append(compute_mse)
loop.run_loop(user_function, 5)