Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import mock
import pytest
import numpy as np
from emukit.core.interfaces import IModel, IDifferentiable
from emukit.experimental_design.model_based.acquisitions import ModelVariance
class MockModel(IModel, IDifferentiable):
pass
@pytest.fixture
def model():
model = mock.create_autospec(MockModel)
model.predict.return_value = (0.1 * np.ones((1, 1)), 0.5 * np.ones((1, 1)))
model.get_prediction_gradients.return_value = (np.ones((1, 2)), 2 * np.ones((1, 2)))
return model
def test_model_variance(model):
acquisition = ModelVariance(model)
acquisition_value = acquisition.evaluate(np.zeros((1, 2)))
assert(np.isclose(acquisition_value, 0.5))
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Tuple
import numpy as np
import GPy
from ..core.interfaces import IModel, IDifferentiable
from ..experimental_design.interfaces import ICalculateVarianceReduction
from ..bayesian_optimization.interfaces import IEntropySearchModel
class GPyModelWrapper(IModel, IDifferentiable, ICalculateVarianceReduction, IEntropySearchModel):
"""
This is a thin wrapper around GPy models to allow users to plug GPy models into Emukit
"""
def __init__(self, gpy_model):
self.model = gpy_model
def predict(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
:param X: (n_points x n_dimensions) array containing locations at which to get predictions
:return: (mean, variance) Arrays of size n_points x 1 of the predictive distribution at each input location
"""
return self.model.predict(X)
def get_prediction_gradients(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
:param X: (n_points x n_dimensions) array containing locations at which to get gradient of the predictions
def has_gradients(self):
return isinstance(self.model, IDifferentiable)
@property
def X(self) -> np.ndarray:
"""
:return: An array of shape n_points x n_dimensions containing training inputs
"""
return self.model.X
@property
def Y(self) -> np.ndarray:
"""
:return: An array of shape n_points x 1 containing training outputs
"""
return self.model.Y
class GPyMultiOutputWrapper(IModel, IDifferentiable, ICalculateVarianceReduction, IEntropySearchModel):
"""
A wrapper around GPy multi-output models.
X inputs should have the corresponding output index as the last column in the array
"""
def __init__(self, gpy_model: GPy.core.GP, n_outputs: int, n_optimization_restarts: int,
verbose_optimization: bool=True):
"""
:param gpy_model: GPy multi-output model
:param n_outputs: Number of outputs in the problem
:param n_optimization_restarts: Number of restarts from random starting points when optimizing hyper-parameters
"""
super().__init__()
self.gpy_model = gpy_model
self.n_optimization_restarts = n_optimization_restarts
self.n_outputs = n_outputs
def has_gradients(self):
return isinstance(self.model, IDifferentiable)
:param update_interval: Number of iterations between optimization of model hyper-parameters. Defaults to 1.
:param batch_size: How many points to evaluate in one iteration of the optimization loop. Defaults to 1.
"""
self.model = model
if acquisition is None:
acquisition = ExpectedImprovement(model)
model_updaters = FixedIntervalUpdater(model, update_interval)
acquisition_optimizer = AcquisitionOptimizer(space)
if batch_size == 1:
candidate_point_calculator = SequentialPointCalculator(acquisition, acquisition_optimizer)
else:
if not isinstance(model, IDifferentiable):
raise ValueError('Model must implement ' + str(IDifferentiable) +
' for use with Local Penalization batch method.')
log_acquisition = LogAcquisition(acquisition)
candidate_point_calculator = LocalPenalizationPointCalculator(log_acquisition, acquisition_optimizer, model,
space, batch_size)
loop_state = create_loop_state(model.X, model.Y)
super().__init__(candidate_point_calculator, model_updaters, loop_state)
def has_gradients(self) -> bool:
"""Returns that this acquisition has gradients"""
return isinstance(self.model, IDifferentiable)
:param batch_size: How many points to evaluate in one iteration of the optimization loop. Defaults to 1.
"""
self.model = model
if acquisition is None:
acquisition = ExpectedImprovement(model)
model_updaters = FixedIntervalUpdater(model, update_interval)
acquisition_optimizer = AcquisitionOptimizer(space)
if batch_size == 1:
candidate_point_calculator = SequentialPointCalculator(acquisition, acquisition_optimizer)
else:
if not isinstance(model, IDifferentiable):
raise ValueError('Model must implement ' + str(IDifferentiable) +
' for use with Local Penalization batch method.')
log_acquisition = LogAcquisition(acquisition)
candidate_point_calculator = LocalPenalizationPointCalculator(log_acquisition, acquisition_optimizer, model,
space, batch_size)
loop_state = create_loop_state(model.X, model.Y)
super().__init__(candidate_point_calculator, model_updaters, loop_state)
def has_gradients(self) -> bool:
"""
Whether gradients of the cost function with respect to input location are available
"""
return isinstance(self.cost_model, IDifferentiable)
def has_gradients(self):
return isinstance(self.model, IDifferentiable)