Refactor ObjectiveComponent

This commit is contained in:
2021-01-26 22:16:46 -06:00
parent 2e845058fc
commit 603902e608
5 changed files with 158 additions and 61 deletions

View File

@@ -3,7 +3,7 @@
# Released under the modified BSD license. See COPYING.md for more details.
import logging
from copy import deepcopy
from typing import List, Dict, Union, Callable, Optional, Any, TYPE_CHECKING
import numpy as np
from sklearn.linear_model import LinearRegression
@@ -17,7 +17,12 @@ from sklearn.metrics import (
from miplearn.classifiers import Regressor
from miplearn.components.component import Component
from miplearn.extractors import InstanceFeaturesExtractor, ObjectiveValueExtractor
from miplearn.extractors import InstanceIterator
from miplearn.instance import Instance
from miplearn.types import MIPSolveStats, TrainingSample, LearningSolveStats
if TYPE_CHECKING:
from miplearn.solvers.learning import LearningSolver
logger = logging.getLogger(__name__)
@@ -29,58 +34,102 @@ class ObjectiveValueComponent(Component):
def __init__(
self,
regressor: Regressor = LinearRegression(),
lb_regressor: Callable[[], Regressor] = LinearRegression,
ub_regressor: Callable[[], Regressor] = LinearRegression,
) -> None:
self.ub_regressor = None
self.lb_regressor = None
self.regressor_prototype = regressor
self.ub_regressor: Optional[Regressor] = None
self.lb_regressor: Optional[Regressor] = None
self.lb_regressor_factory = lb_regressor
self.ub_regressor_factory = ub_regressor
self._predicted_ub: Optional[float] = None
self._predicted_lb: Optional[float] = None
def before_solve(self, solver, instance, model):
def before_solve(
self,
solver: "LearningSolver",
instance: Instance,
model: Any,
) -> None:
if self.ub_regressor is not None:
logger.info("Predicting optimal value...")
lb, ub = self.predict([instance])[0]
instance.predicted_ub = ub
instance.predicted_lb = lb
logger.info("Predicted values: lb=%.2f, ub=%.2f" % (lb, ub))
pred = self.predict([instance])
self._predicted_lb = pred["Upper bound"][0]
self._predicted_ub = pred["Lower bound"][0]
logger.info(
"Predicted values: lb=%.2f, ub=%.2f"
% (
self._predicted_lb,
self._predicted_ub,
)
)
def after_solve(
self,
solver,
instance,
model,
stats,
training_data,
):
if self.ub_regressor is not None:
stats["Predicted UB"] = instance.predicted_ub
stats["Predicted LB"] = instance.predicted_lb
else:
stats["Predicted UB"] = None
stats["Predicted LB"] = None
solver: "LearningSolver",
instance: Instance,
model: Any,
stats: LearningSolveStats,
training_data: TrainingSample,
) -> None:
if self._predicted_ub is not None:
stats["Objective: predicted UB"] = self._predicted_ub
if self._predicted_lb is not None:
stats["Objective: predicted LB"] = self._predicted_lb
def fit(self, training_instances):
def fit(self, training_instances: Union[List[str], List[Instance]]) -> None:
self.lb_regressor = self.lb_regressor_factory()
self.ub_regressor = self.ub_regressor_factory()
logger.debug("Extracting features...")
features = InstanceFeaturesExtractor().extract(training_instances)
ub = ObjectiveValueExtractor(kind="upper bound").extract(training_instances)
lb = ObjectiveValueExtractor(kind="lower bound").extract(training_instances)
assert ub.shape == (len(training_instances), 1)
assert lb.shape == (len(training_instances), 1)
self.ub_regressor = deepcopy(self.regressor_prototype)
self.lb_regressor = deepcopy(self.regressor_prototype)
x_train = self.x(training_instances)
y_train = self.y(training_instances)
logger.debug("Fitting lb_regressor...")
self.lb_regressor.fit(x_train, y_train["Lower bound"])
logger.debug("Fitting ub_regressor...")
self.ub_regressor.fit(features, ub.ravel())
logger.debug("Fitting ub_regressor...")
self.lb_regressor.fit(features, lb.ravel())
self.ub_regressor.fit(x_train, y_train["Upper bound"])
def predict(self, instances):
features = InstanceFeaturesExtractor().extract(instances)
lb = self.lb_regressor.predict(features)
ub = self.ub_regressor.predict(features)
assert lb.shape == (len(instances),)
assert ub.shape == (len(instances),)
return np.array([lb, ub]).T
def predict(
self,
instances: Union[List[str], List[Instance]],
) -> Dict[str, List[float]]:
assert self.lb_regressor is not None
assert self.ub_regressor is not None
x_test = self.x(instances)
(n_samples, n_features) = x_test.shape
lb = self.lb_regressor.predict(x_test)
ub = self.ub_regressor.predict(x_test)
assert lb.shape == (n_samples, 1)
assert ub.shape == (n_samples, 1)
return {
"Lower bound": lb.ravel().tolist(),
"Upper bound": ub.ravel().tolist(),
}
def evaluate(self, instances):
@staticmethod
def x(instances: Union[List[str], List[Instance]]) -> np.ndarray:
result = []
for instance in InstanceIterator(instances):
for _ in instance.training_data:
instance_features = instance.get_instance_features()
result.append(instance_features)
return np.array(result)
@staticmethod
def y(instances: Union[List[str], List[Instance]]) -> Dict[str, np.ndarray]:
ub: List[List[float]] = []
lb: List[List[float]] = []
for instance in InstanceIterator(instances):
for sample in instance.training_data:
lb.append([sample["Lower bound"]])
ub.append([sample["Upper bound"]])
return {
"Lower bound": np.array(lb),
"Upper bound": np.array(ub),
}
def evaluate(
self,
instances: Union[List[str], List[Instance]],
) -> Dict[str, Dict[str, float]]:
y_pred = self.predict(instances)
y_true = np.array(
[
@@ -88,11 +137,12 @@ class ObjectiveValueComponent(Component):
inst.training_data[0]["Lower bound"],
inst.training_data[0]["Upper bound"],
]
for inst in instances
for inst in InstanceIterator(instances)
]
)
y_true_lb, y_true_ub = y_true[:, 0], y_true[:, 1]
y_pred_lb, y_pred_ub = y_pred[:, 1], y_pred[:, 1]
y_pred_lb = y_pred["Lower bound"]
y_pred_ub = y_pred["Upper bound"]
y_true_lb, y_true_ub = y_true[:, 1], y_true[:, 1]
ev = {
"Lower bound": {
"Mean squared error": mean_squared_error(y_true_lb, y_pred_lb),

View File

@@ -117,9 +117,6 @@ class MaxWeightStableSetInstance(Instance):
model.clique_eqs.add(sum(model.x[i] for i in clique) <= 1)
return model
def get_instance_features(self):
return np.ones(0)
def get_variable_features(self, var, index):
neighbor_weights = [0] * 15
neighbor_degrees = [100] * 15

View File

@@ -65,6 +65,8 @@ LearningSolveStats = TypedDict(
"Primal: free": int,
"Primal: zero": int,
"Primal: one": int,
"Objective: predicted LB": float,
"Objective: predicted UB": float,
},
total=False,
)