StaticLazy: Refactor

master
Alinson S. Xavier 5 years ago
parent 168f56c296
commit 6e614264b5
No known key found for this signature in database
GPG Key ID: DCA0DAD4D2F58624

@ -151,8 +151,8 @@ class Component:
def fit_xy(
self,
x: Dict[str, np.ndarray],
y: Dict[str, np.ndarray],
x: Dict[Hashable, np.ndarray],
y: Dict[Hashable, np.ndarray],
) -> None:
"""
Given two dictionaries x and y, mapping the name of the category to matrices

@ -4,7 +4,7 @@
import logging
import sys
from typing import Dict, Tuple, Optional
from typing import Dict, Tuple, Optional, List, Hashable, Any, TYPE_CHECKING, Set
import numpy as np
from tqdm.auto import tqdm
@ -12,203 +12,163 @@ from tqdm.auto import tqdm
from miplearn import Classifier
from miplearn.classifiers.counting import CountingClassifier
from miplearn.components.component import Component
from miplearn.types import TrainingSample, Features
from miplearn.types import TrainingSample, Features, LearningSolveStats
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from miplearn.solvers.learning import LearningSolver, Instance
class LazyConstraint:
def __init__(self, cid, obj):
def __init__(self, cid: str, obj: Any) -> None:
self.cid = cid
self.obj = obj
class StaticLazyConstraintsComponent(Component):
"""
Component that decides which of the constraints tagged as lazy should
be kept in the formulation, and which should be removed.
"""
def __init__(
self,
classifier=CountingClassifier(),
threshold=0.05,
use_two_phase_gap=True,
large_gap=1e-2,
violation_tolerance=-0.5,
):
classifier: Classifier = CountingClassifier(),
threshold: float = 0.05,
violation_tolerance: float = -0.5,
) -> None:
assert isinstance(classifier, Classifier)
self.threshold = threshold
self.classifier_prototype = classifier
self.classifiers = {}
self.pool = []
self.original_gap = None
self.large_gap = large_gap
self.is_gap_large = False
self.use_two_phase_gap = use_two_phase_gap
self.violation_tolerance = violation_tolerance
self.threshold: float = threshold
self.classifier_prototype: Classifier = classifier
self.classifiers: Dict[Hashable, Classifier] = {}
self.pool: Dict[str, LazyConstraint] = {}
self.violation_tolerance: float = violation_tolerance
self.enforced_cids: Set[str] = set()
self.n_restored: int = 0
self.n_iterations: int = 0
def before_solve_mip(
self,
solver,
instance,
model,
stats,
features,
training_data,
):
self.pool = []
if not solver.use_lazy_cb and self.use_two_phase_gap:
logger.info("Increasing gap tolerance to %f", self.large_gap)
self.original_gap = solver.gap_tolerance
self.is_gap_large = True
solver.internal_solver.set_gap_tolerance(self.large_gap)
instance.found_violated_lazy_constraints = []
if instance.has_static_lazy_constraints():
self._extract_and_predict_static(solver, instance)
def iteration_cb(self, solver, instance, model):
solver: "LearningSolver",
instance: "Instance",
model: Any,
stats: LearningSolveStats,
features: Features,
training_data: TrainingSample,
) -> None:
assert solver.internal_solver is not None
if not features["Instance"]["Lazy constraint count"] == 0:
logger.info("Instance does not have static lazy constraints. Skipping.")
logger.info("Predicting required lazy constraints...")
self.enforced_cids = set(self.sample_predict(features, training_data))
logger.info("Moving lazy constraints to the pool...")
self.pool = {}
for (cid, cdict) in features["Constraints"].items():
if cdict["Lazy"] and cid not in self.enforced_cids:
self.pool[cid] = LazyConstraint(
cid=cid,
obj=solver.internal_solver.extract_constraint(cid),
)
logger.info(
f"{len(self.enforced_cids)} lazy constraints kept; "
f"{len(self.pool)} moved to the pool"
)
stats["LazyStatic: Removed"] = len(self.pool)
stats["LazyStatic: Kept"] = len(self.enforced_cids)
stats["LazyStatic: Restored"] = 0
self.n_restored = 0
self.n_iterations = 0
def after_solve_mip(
self,
solver: "LearningSolver",
instance: "Instance",
model: Any,
stats: LearningSolveStats,
features: Features,
training_data: TrainingSample,
) -> None:
training_data["LazyStatic: Enforced"] = self.enforced_cids
stats["LazyStatic: Restored"] = self.n_restored
stats["LazyStatic: Iterations"] = self.n_iterations
def iteration_cb(
self,
solver: "LearningSolver",
instance: "Instance",
model: Any,
) -> bool:
if solver.use_lazy_cb:
return False
else:
should_repeat = self._check_and_add(instance, solver)
if should_repeat:
return True
else:
if self.is_gap_large:
logger.info("Restoring gap tolerance to %f", self.original_gap)
solver.internal_solver.set_gap_tolerance(self.original_gap)
self.is_gap_large = False
return True
else:
return False
def lazy_cb(self, solver, instance, model):
self._check_and_add(instance, solver)
return self._check_and_add(solver)
def _check_and_add(self, instance, solver):
logger.debug("Finding violated lazy constraints...")
constraints_to_add = []
for c in self.pool:
def lazy_cb(
self,
solver: "LearningSolver",
instance: "Instance",
model: Any,
) -> None:
self._check_and_add(solver)
def _check_and_add(self, solver: "LearningSolver") -> bool:
assert solver.internal_solver is not None
logger.info("Finding violated lazy constraints...")
enforced: List[LazyConstraint] = []
for (cid, c) in self.pool.items():
if not solver.internal_solver.is_constraint_satisfied(
c.obj, tol=self.violation_tolerance
c.obj,
tol=self.violation_tolerance,
):
constraints_to_add.append(c)
for c in constraints_to_add:
self.pool.remove(c)
enforced.append(c)
logger.info(f"{len(enforced)} violations found")
for c in enforced:
del self.pool[c.cid]
solver.internal_solver.add_constraint(c.obj)
instance.found_violated_lazy_constraints += [c.cid]
if len(constraints_to_add) > 0:
logger.info(
"%8d lazy constraints added %8d in the pool"
% (len(constraints_to_add), len(self.pool))
)
self.enforced_cids.add(c.cid)
self.n_restored += 1
logger.info(
f"{len(enforced)} constraints restored; {len(self.pool)} in the pool"
)
if len(enforced) > 0:
self.n_iterations += 1
return True
else:
return False
def fit(self, training_instances):
training_instances = [
t
for t in training_instances
if hasattr(t, "found_violated_lazy_constraints")
]
logger.debug("Extracting x and y...")
x = self.x(training_instances)
y = self.y(training_instances)
logger.debug("Fitting...")
for category in tqdm(
x.keys(), desc="Fit (lazy)", disable=not sys.stdout.isatty()
):
if category not in self.classifiers:
self.classifiers[category] = self.classifier_prototype.clone()
self.classifiers[category].fit(x[category], y[category])
def predict(self, instance):
pass
def evaluate(self, instances):
pass
def _extract_and_predict_static(self, solver, instance):
x = {}
constraints = {}
logger.info("Extracting lazy constraints...")
for cid in solver.internal_solver.get_constraint_ids():
if instance.is_constraint_lazy(cid):
category = instance.get_constraint_category(cid)
if category not in x:
x[category] = []
constraints[category] = []
x[category] += [instance.get_constraint_features(cid)]
c = LazyConstraint(
cid=cid,
obj=solver.internal_solver.extract_constraint(cid),
)
constraints[category] += [c]
self.pool.append(c)
logger.info("%8d lazy constraints extracted" % len(self.pool))
logger.info("Predicting required lazy constraints...")
n_added = 0
for (category, x_values) in x.items():
def sample_predict(
self,
features: Features,
sample: TrainingSample,
) -> List[str]:
x, y = self.sample_xy(features, sample)
category_to_cids: Dict[Hashable, List[str]] = {}
for (cid, cdict) in features["Constraints"].items():
if "Category" not in cdict or cdict["Category"] is None:
continue
category = cdict["Category"]
if category not in category_to_cids:
category_to_cids[category] = []
category_to_cids[category] += [cid]
enforced_cids: List[str] = []
for category in x.keys():
if category not in self.classifiers:
continue
if isinstance(x_values[0], np.ndarray):
x[category] = np.array(x_values)
proba = self.classifiers[category].predict_proba(x[category])
for i in range(len(proba)):
if proba[i][1] > self.threshold:
n_added += 1
c = constraints[category][i]
self.pool.remove(c)
solver.internal_solver.add_constraint(c.obj)
instance.found_violated_lazy_constraints += [c.cid]
logger.info(
"%8d lazy constraints added %8d in the pool"
% (
n_added,
len(self.pool),
)
)
def _collect_constraints(self, train_instances):
constraints = {}
for instance in train_instances:
for cid in instance.found_violated_lazy_constraints:
category = instance.get_constraint_category(cid)
if category not in constraints:
constraints[category] = set()
constraints[category].add(cid)
for (category, cids) in constraints.items():
constraints[category] = sorted(list(cids))
return constraints
def x(self, train_instances):
result = {}
constraints = self._collect_constraints(train_instances)
for (category, cids) in constraints.items():
result[category] = []
for instance in train_instances:
for cid in cids:
result[category].append(instance.get_constraint_features(cid))
return result
def y(self, train_instances):
result = {}
constraints = self._collect_constraints(train_instances)
for (category, cids) in constraints.items():
result[category] = []
for instance in train_instances:
for cid in cids:
if cid in instance.found_violated_lazy_constraints:
result[category].append([0, 1])
else:
result[category].append([1, 0])
return result
clf = self.classifiers[category]
proba = clf.predict_proba(np.array(x[category]))
pred = list(proba[:, 1] > self.threshold)
for (i, is_selected) in enumerate(pred):
if is_selected:
enforced_cids += [category_to_cids[category][i]]
return enforced_cids
@staticmethod
def sample_xy(
features: Features,
sample: TrainingSample,
) -> Tuple[Dict, Dict]:
) -> Tuple[Dict[Hashable, List[List[float]]], Dict[Hashable, List[List[float]]]]:
x: Dict = {}
y: Dict = {}
for (cid, cfeatures) in features["Constraints"].items():
@ -227,3 +187,13 @@ class StaticLazyConstraintsComponent(Component):
else:
y[category] += [[True, False]]
return x, y
def fit_xy(
self,
x: Dict[Hashable, np.ndarray],
y: Dict[Hashable, np.ndarray],
) -> None:
for c in y.keys():
assert c in x
self.classifiers[c] = self.classifier_prototype.clone()
self.classifiers[c].fit(x[c], y[c])

@ -58,8 +58,8 @@ class ObjectiveValueComponent(Component):
def fit_xy(
self,
x: Dict[str, np.ndarray],
y: Dict[str, np.ndarray],
x: Dict[Hashable, np.ndarray],
y: Dict[Hashable, np.ndarray],
) -> None:
for c in ["Upper bound", "Lower bound"]:
if c in y:
@ -84,9 +84,9 @@ class ObjectiveValueComponent(Component):
def sample_xy(
features: Features,
sample: TrainingSample,
) -> Tuple[Dict[str, List[List[float]]], Dict[str, List[List[float]]]]:
x: Dict[str, List[List[float]]] = {}
y: Dict[str, List[List[float]]] = {}
) -> Tuple[Dict[Hashable, List[List[float]]], Dict[Hashable, List[List[float]]]]:
x: Dict[Hashable, List[List[float]]] = {}
y: Dict[Hashable, List[List[float]]] = {}
f = list(features["Instance"]["User features"])
if "LP value" in sample and sample["LP value"] is not None:
f += [sample["LP value"]]

@ -148,7 +148,7 @@ class PrimalSolutionComponent(Component):
def sample_xy(
features: Features,
sample: TrainingSample,
) -> Tuple[Dict, Dict]:
) -> Tuple[Dict[Hashable, List[List[float]]], Dict[Hashable, List[List[float]]]]:
x: Dict = {}
y: Dict = {}
solution: Optional[Solution] = None
@ -227,8 +227,8 @@ class PrimalSolutionComponent(Component):
def fit_xy(
self,
x: Dict[str, np.ndarray],
y: Dict[str, np.ndarray],
x: Dict[Hashable, np.ndarray],
y: Dict[Hashable, np.ndarray],
) -> None:
for category in x.keys():
clf = self.classifier_prototype.clone()

@ -20,11 +20,12 @@ class FeaturesExtractor:
self.solver = internal_solver
def extract(self, instance: "Instance") -> Features:
return {
"Instance": self._extract_instance(instance),
"Constraints": self._extract_constraints(instance),
features: Features = {
"Variables": self._extract_variables(instance),
"Constraints": self._extract_constraints(instance),
}
features["Instance"] = self._extract_instance(instance, features)
return features
def _extract_variables(self, instance: "Instance") -> Dict:
variables = self.solver.get_empty_solution()
@ -92,7 +93,10 @@ class FeaturesExtractor:
return constraints
@staticmethod
def _extract_instance(instance: "Instance") -> InstanceFeatures:
def _extract_instance(
instance: "Instance",
features: Features,
) -> InstanceFeatures:
user_features = instance.get_instance_features()
assert isinstance(user_features, list), (
f"Instance features must be a list. "
@ -103,4 +107,11 @@ class FeaturesExtractor:
f"Instance features must be a list of numbers. "
f"Found {type(v).__name__} instead."
)
return {"User features": user_features}
lazy_count = 0
for (cid, cdict) in features["Constraints"].items():
if cdict["Lazy"]:
lazy_count += 1
return {
"User features": user_features,
"Lazy constraint count": lazy_count,
}

@ -69,6 +69,10 @@ LearningSolveStats = TypedDict(
"Upper bound": Optional[float],
"Wallclock time": float,
"Warm start value": Optional[float],
"LazyStatic: Removed": int,
"LazyStatic: Kept": int,
"LazyStatic: Restored": int,
"LazyStatic: Iterations": int,
},
total=False,
)
@ -77,6 +81,7 @@ InstanceFeatures = TypedDict(
"InstanceFeatures",
{
"User features": List[float],
"Lazy constraint count": int,
},
total=False,
)

@ -1,144 +1,119 @@
# MIPLearn: Extensible Framework for Learning-Enhanced Mixed-Integer Optimization
# Copyright (C) 2020, UChicago Argonne, LLC. All rights reserved.
# Released under the modified BSD license. See COPYING.md for more details.
from typing import Dict, cast, Hashable
from unittest.mock import Mock, call
import numpy as np
import pytest
from numpy.testing import assert_array_equal
from miplearn import LearningSolver, InternalSolver, Instance
from miplearn.classifiers import Classifier
from miplearn.components.lazy_static import StaticLazyConstraintsComponent
from miplearn.instance import Instance
from miplearn.solvers.internal import InternalSolver
from miplearn.solvers.learning import LearningSolver
from miplearn.types import TrainingSample, Features
from miplearn.types import TrainingSample, Features, LearningSolveStats
@pytest.fixture
def sample() -> TrainingSample:
return {
"LazyStatic: Enforced": {"c1", "c2", "c4"},
}
@pytest.fixture
def features() -> Features:
return {
"Instance": {
"Lazy constraint count": 4,
},
"Constraints": {
"c1": {
"Category": "type-a",
"User features": [1.0, 1.0],
"Lazy": True,
},
"c2": {
"Category": "type-a",
"User features": [1.0, 2.0],
"Lazy": True,
},
"c3": {
"Category": "type-a",
"User features": [1.0, 3.0],
"Lazy": True,
},
"c4": {
"Category": "type-b",
"User features": [1.0, 4.0, 0.0],
"Lazy": True,
},
"c5": {
"Category": "type-b",
"User features": [1.0, 5.0, 0.0],
"Lazy": False,
},
},
}
def test_usage_with_solver():
def test_usage_with_solver(features: Features) -> None:
solver = Mock(spec=LearningSolver)
solver.use_lazy_cb = False
solver.gap_tolerance = 1e-4
internal = solver.internal_solver = Mock(spec=InternalSolver)
internal.get_constraint_ids = Mock(return_value=["c1", "c2", "c3", "c4"])
internal.extract_constraint = Mock(side_effect=lambda cid: "<%s>" % cid)
internal.is_constraint_satisfied = Mock(return_value=False)
instance = Mock(spec=Instance)
instance.has_static_lazy_constraints = Mock(return_value=True)
instance.is_constraint_lazy = Mock(
side_effect=lambda cid: {
"c1": False,
"c2": True,
"c3": True,
"c4": True,
}[cid]
)
instance.get_constraint_features = Mock(
side_effect=lambda cid: {
"c2": [1.0, 0.0],
"c3": [0.5, 0.5],
"c4": [1.0],
}[cid]
)
instance.get_constraint_category = Mock(
side_effect=lambda cid: {
"c2": "type-a",
"c3": "type-a",
"c4": "type-b",
}[cid]
)
component = StaticLazyConstraintsComponent(
threshold=0.90,
use_two_phase_gap=False,
threshold=0.50,
violation_tolerance=1.0,
)
component.classifiers = {
"type-a": Mock(spec=Classifier),
"type-b": Mock(spec=Classifier),
}
component.classifiers["type-a"].predict_proba = Mock(
return_value=[
[0.20, 0.80],
[0.05, 0.95],
]
)
component.classifiers["type-b"].predict_proba = Mock(
return_value=[
[0.02, 0.98],
]
)
# LearningSolver calls before_solve
component.classifiers["type-a"].predict_proba = Mock( # type: ignore
return_value=np.array(
[
[0.00, 1.00], # c1
[0.20, 0.80], # c2
[0.99, 0.01], # c3
]
)
)
component.classifiers["type-b"].predict_proba = Mock( # type: ignore
return_value=np.array(
[
[0.02, 0.98], # c4
]
)
)
sample: TrainingSample = {}
stats: LearningSolveStats = {}
# LearningSolver calls before_solve_mip
component.before_solve_mip(
solver=solver,
instance=instance,
model=None,
stats=None,
features=None,
training_data=None,
)
# Should ask if instance has static lazy constraints
instance.has_static_lazy_constraints.assert_called_once()
# Should ask internal solver for a list of constraints in the model
internal.get_constraint_ids.assert_called_once()
# Should ask if each constraint in the model is lazy
instance.is_constraint_lazy.assert_has_calls(
[
call("c1"),
call("c2"),
call("c3"),
call("c4"),
]
)
# For the lazy ones, should ask for features
instance.get_constraint_features.assert_has_calls(
[
call("c2"),
call("c3"),
call("c4"),
]
)
# Should also ask for categories
assert instance.get_constraint_category.call_count == 3
instance.get_constraint_category.assert_has_calls(
[
call("c2"),
call("c3"),
call("c4"),
]
)
# Should ask internal solver to remove constraints identified as lazy
assert internal.extract_constraint.call_count == 3
internal.extract_constraint.assert_has_calls(
[
call("c2"),
call("c3"),
call("c4"),
]
stats=stats,
features=features,
training_data=sample,
)
# Should ask ML to predict whether each lazy constraint should be enforced
component.classifiers["type-a"].predict_proba.assert_called_once_with(
[[1.0, 0.0], [0.5, 0.5]]
)
component.classifiers["type-b"].predict_proba.assert_called_once_with([[1.0]])
component.classifiers["type-a"].predict_proba.assert_called_once()
component.classifiers["type-b"].predict_proba.assert_called_once()
# For the ones that should be enforced, should ask solver to re-add them
# to the formulation. The remaining ones should remain in the pool.
assert internal.add_constraint.call_count == 2
internal.add_constraint.assert_has_calls(
[
call("<c3>"),
call("<c4>"),
]
)
internal.add_constraint.reset_mock()
# Should ask internal solver to remove some constraints
assert internal.extract_constraint.call_count == 1
internal.extract_constraint.assert_has_calls([call("c3")])
# LearningSolver calls after_iteration (first time)
should_repeat = component.iteration_cb(solver, instance, None)
@ -146,9 +121,9 @@ def test_usage_with_solver():
# Should ask internal solver to verify if constraints in the pool are
# satisfied and add the ones that are not
internal.is_constraint_satisfied.assert_called_once_with("<c2>", tol=1.0)
internal.is_constraint_satisfied.assert_called_once_with("<c3>", tol=1.0)
internal.is_constraint_satisfied.reset_mock()
internal.add_constraint.assert_called_once_with("<c2>")
internal.add_constraint.assert_called_once_with("<c3>")
internal.add_constraint.reset_mock()
# LearningSolver calls after_iteration (second time)
@ -159,139 +134,88 @@ def test_usage_with_solver():
internal.is_constraint_satisfied.assert_not_called()
internal.add_constraint.assert_not_called()
# Should update instance object
assert instance.found_violated_lazy_constraints == ["c3", "c4", "c2"]
def test_fit():
instance_1 = Mock(spec=Instance)
instance_1.found_violated_lazy_constraints = ["c1", "c2", "c4", "c5"]
instance_1.get_constraint_category = Mock(
side_effect=lambda cid: {
"c1": "type-a",
"c2": "type-a",
"c3": "type-a",
"c4": "type-b",
"c5": "type-b",
}[cid]
)
instance_1.get_constraint_features = Mock(
side_effect=lambda cid: {
"c1": [1, 1],
"c2": [1, 2],
"c3": [1, 3],
"c4": [1, 4, 0],
"c5": [1, 5, 0],
}[cid]
# LearningSolver calls after_solve_mip
component.after_solve_mip(
solver=solver,
instance=instance,
model=None,
stats=stats,
features=features,
training_data=sample,
)
instance_2 = Mock(spec=Instance)
instance_2.found_violated_lazy_constraints = ["c2", "c3", "c4"]
instance_2.get_constraint_category = Mock(
side_effect=lambda cid: {
"c1": "type-a",
"c2": "type-a",
"c3": "type-a",
"c4": "type-b",
"c5": "type-b",
}[cid]
)
instance_2.get_constraint_features = Mock(
side_effect=lambda cid: {
"c1": [2, 1],
"c2": [2, 2],
"c3": [2, 3],
"c4": [2, 4, 0],
"c5": [2, 5, 0],
}[cid]
)
# Should update training sample
assert sample["LazyStatic: Enforced"] == {"c1", "c2", "c3", "c4"}
instances = [instance_1, instance_2]
component = StaticLazyConstraintsComponent()
component.classifiers = {
"type-a": Mock(spec=Classifier),
"type-b": Mock(spec=Classifier),
}
# Should update stats
assert stats["LazyStatic: Removed"] == 1
assert stats["LazyStatic: Kept"] == 3
assert stats["LazyStatic: Restored"] == 1
assert stats["LazyStatic: Iterations"] == 1
expected_constraints = {
"type-a": ["c1", "c2", "c3"],
"type-b": ["c4", "c5"],
}
expected_x = {
"type-a": [[1, 1], [1, 2], [1, 3], [2, 1], [2, 2], [2, 3]],
"type-b": [[1, 4, 0], [1, 5, 0], [2, 4, 0], [2, 5, 0]],
}
expected_y = {
"type-a": [[0, 1], [0, 1], [1, 0], [1, 0], [0, 1], [0, 1]],
"type-b": [[0, 1], [0, 1], [0, 1], [1, 0]],
}
assert component._collect_constraints(instances) == expected_constraints
assert component.x(instances) == expected_x
assert component.y(instances) == expected_y
component.fit(instances)
component.classifiers["type-a"].fit.assert_called_once_with(
expected_x["type-a"],
expected_y["type-a"],
def test_sample_predict(
features: Features,
sample: TrainingSample,
) -> None:
comp = StaticLazyConstraintsComponent(threshold=0.5)
comp.classifiers["type-a"] = Mock(spec=Classifier)
comp.classifiers["type-a"].predict_proba = lambda _: np.array( # type:ignore
[
[0.0, 1.0], # c1
[0.0, 0.9], # c2
[0.9, 0.1], # c3
]
)
component.classifiers["type-b"].fit.assert_called_once_with(
expected_x["type-b"],
expected_y["type-b"],
comp.classifiers["type-b"] = Mock(spec=Classifier)
comp.classifiers["type-b"].predict_proba = lambda _: np.array( # type:ignore
[
[0.0, 1.0], # c4
]
)
def test_xy_sample() -> None:
sample: TrainingSample = {
"LazyStatic: Enforced": {"c1", "c2", "c4"},
}
features: Features = {
"Constraints": {
"c1": {
"Category": "type-a",
"User features": [1.0, 1.0],
"Lazy": True,
},
"c2": {
"Category": "type-a",
"User features": [1.0, 2.0],
"Lazy": True,
},
"c3": {
"Category": "type-a",
"User features": [1.0, 3.0],
"Lazy": True,
},
"c4": {
"Category": "type-b",
"User features": [1.0, 4.0, 0.0],
"Lazy": True,
},
"c5": {
"Category": "type-b",
"User features": [1.0, 5.0, 0.0],
"Lazy": False,
},
}
}
pred = comp.sample_predict(features, sample)
assert pred == ["c1", "c2", "c4"]
def test_fit_xy() -> None:
x = cast(
Dict[Hashable, np.ndarray],
{
"type-a": np.array([[1.0, 1.0], [1.0, 2.0], [1.0, 3.0]]),
"type-b": np.array([[1.0, 4.0, 0.0]]),
},
)
y = cast(
Dict[Hashable, np.ndarray],
{
"type-a": np.array([[False, True], [False, True], [True, False]]),
"type-b": np.array([[False, True]]),
},
)
clf = Mock(spec=Classifier)
clf.clone = Mock(side_effect=lambda: Mock(spec=Classifier))
comp = StaticLazyConstraintsComponent(classifier=clf)
comp.fit_xy(x, y)
assert clf.clone.call_count == 2
clf_a = comp.classifiers["type-a"]
clf_b = comp.classifiers["type-b"]
assert clf_a.fit.call_count == 1 # type: ignore
assert clf_b.fit.call_count == 1 # type: ignore
assert_array_equal(clf_a.fit.call_args[0][0], x["type-a"]) # type: ignore
assert_array_equal(clf_b.fit.call_args[0][0], x["type-b"]) # type: ignore
def test_sample_xy(
features: Features,
sample: TrainingSample,
) -> None:
x_expected = {
"type-a": [
[1.0, 1.0],
[1.0, 2.0],
[1.0, 3.0],
],
"type-b": [
[1.0, 4.0, 0.0],
],
"type-a": [[1.0, 1.0], [1.0, 2.0], [1.0, 3.0]],
"type-b": [[1.0, 4.0, 0.0]],
}
y_expected = {
"type-a": [
[False, True],
[False, True],
[True, False],
],
"type-b": [
[False, True],
],
"type-a": [[False, True], [False, True], [True, False]],
"type-b": [[False, True]],
}
xy = StaticLazyConstraintsComponent.sample_xy(features, sample)
assert xy is not None

@ -50,4 +50,5 @@ def test_knapsack() -> None:
}
assert features["Instance"] == {
"User features": [67.0, 21.75],
"Lazy constraint count": 0,
}

Loading…
Cancel
Save