Refactor PrimalSolutionComponent

This commit is contained in:
2021-01-25 14:54:58 -06:00
parent f68cc5bd59
commit 3ab3bb3c1f
9 changed files with 501 additions and 233 deletions

View File

@@ -43,6 +43,6 @@ reformat:
test: test:
$(MYPY) -p miplearn $(MYPY) -p miplearn
$(MYPY) -p tests $(MYPY) -p tests
$(PYTEST) $(PYTEST_ARGS) tests/classifiers $(PYTEST) $(PYTEST_ARGS)
.PHONY: test test-watch docs install .PHONY: test test-watch docs install

View File

@@ -69,13 +69,18 @@ For example, if the ML model predicts that a certain binary variable will assume
MIPLearn currently provides two types of thresholds: MIPLearn currently provides two types of thresholds:
* `MinProbabilityThreshold(p: float)` A threshold which indicates that a prediction is trustworthy if its probability of being correct, as computed by the machine learning model, is above a fixed value `p`. * `MinProbabilityThreshold(p: List[float])` A threshold which indicates that a prediction is trustworthy if its probability of being correct, as computed by the machine learning model, is above a fixed value.
* `MinPrecisionThreshold(p: float)` A dynamic threshold which automatically adjusts itself during training to ensure that the component achieves at least a given precision `p` on the training data set. Note that increasing a component's precision may reduce its recall. * `MinPrecisionThreshold(p: List[float])` A dynamic threshold which automatically adjusts itself during training to ensure that the component achieves at least a given precision on the training data set. Note that increasing a component's precision may reduce its recall.
The example below shows how to configure `PrimalSolutionComponent` to achieve at least 95% precision. Other components are configured similarly. The example below shows how to build a `PrimalSolutionComponent` which fixes variables to zero with at least 80% precision, and to one with at least 95% precision. Other components are configured similarly.
```python ```python
PrimalSolutionComponent(threshold=MinPrecisionThreshold(0.95)) from miplearn import PrimalSolutionComponent, MinPrecisionThreshold
PrimalSolutionComponent(
mode="heuristic",
threshold=lambda: MinPrecisionThreshold([0.80, 0.95]),
)
``` ```
### Evaluating component performance ### Evaluating component performance

View File

@@ -142,8 +142,11 @@ class ScikitLearnClassifier(Classifier):
def fit(self, x_train: np.ndarray, y_train: np.ndarray) -> None: def fit(self, x_train: np.ndarray, y_train: np.ndarray) -> None:
super().fit(x_train, y_train) super().fit(x_train, y_train)
(n_samples, n_classes) = x_train.shape (n_samples, n_classes) = y_train.shape
assert n_classes == 2, "scikit-learn classifiers must have exactly two classes" assert n_classes == 2, (
f"Scikit-learn classifiers must have exactly two classes. "
f"{n_classes} classes were provided instead."
)
self.inner_clf.fit(x_train, y_train[:, 1]) self.inner_clf.fit(x_train, y_train[:, 1])
def predict_proba(self, x_test: np.ndarray) -> np.ndarray: def predict_proba(self, x_test: np.ndarray) -> np.ndarray:

View File

@@ -3,7 +3,7 @@
# Released under the modified BSD license. See COPYING.md for more details. # Released under the modified BSD license. See COPYING.md for more details.
from abc import abstractmethod, ABC from abc import abstractmethod, ABC
from typing import Optional from typing import Optional, List
import numpy as np import numpy as np
from sklearn.metrics._ranking import _binary_clf_curve from sklearn.metrics._ranking import _binary_clf_curve
@@ -42,10 +42,10 @@ class Threshold(ABC):
assert y_train.shape[0] == n_samples assert y_train.shape[0] == n_samples
@abstractmethod @abstractmethod
def predict(self, x_test: np.ndarray) -> float: def predict(self, x_test: np.ndarray) -> List[float]:
""" """
Returns the minimum probability for a machine learning prediction to be Returns the minimum probability for a machine learning prediction to be
considered trustworthy. considered trustworthy. There is one value for each label.
""" """
pass pass
@@ -56,13 +56,13 @@ class MinProbabilityThreshold(Threshold):
correct, as computed by the machine learning models, are above a fixed value. correct, as computed by the machine learning models, are above a fixed value.
""" """
def __init__(self, min_probability: float): def __init__(self, min_probability: List[float]):
self.min_probability = min_probability self.min_probability = min_probability
def fit(self, clf: Classifier, x_train: np.ndarray, y_train: np.ndarray) -> None: def fit(self, clf: Classifier, x_train: np.ndarray, y_train: np.ndarray) -> None:
pass pass
def predict(self, x_test: np.ndarray) -> float: def predict(self, x_test: np.ndarray) -> List[float]:
return self.min_probability return self.min_probability
@@ -73,21 +73,41 @@ class MinPrecisionThreshold(Threshold):
set. Note that increasing a component's minimum precision may reduce its recall. set. Note that increasing a component's minimum precision may reduce its recall.
""" """
def __init__(self, min_precision: float) -> None: def __init__(self, min_precision: List[float]) -> None:
self.min_precision = min_precision self.min_precision = min_precision
self._computed_threshold: Optional[float] = None self._computed_threshold: Optional[List[float]] = None
def fit(self, clf: Classifier, x_train: np.ndarray, y_train: np.ndarray) -> None: def fit(
self,
clf: Classifier,
x_train: np.ndarray,
y_train: np.ndarray,
) -> None:
super().fit(clf, x_train, y_train) super().fit(clf, x_train, y_train)
(n_samples, n_classes) = y_train.shape
proba = clf.predict_proba(x_train) proba = clf.predict_proba(x_train)
fps, tps, thresholds = _binary_clf_curve(y_train, proba[:, 1]) self._computed_threshold = [
precision = tps / (tps + fps) self._compute(
for k in reversed(range(len(precision))): y_train[:, i],
if precision[k] >= self.min_precision: proba[:, i],
self._computed_threshold = thresholds[k] self.min_precision[i],
return )
self._computed_threshold = float("inf") for i in range(n_classes)
]
def predict(self, x_test: np.ndarray) -> float: def predict(self, x_test: np.ndarray) -> List[float]:
assert self._computed_threshold is not None assert self._computed_threshold is not None
return self._computed_threshold return self._computed_threshold
@staticmethod
def _compute(
y_actual: np.ndarray,
y_prob: np.ndarray,
min_precision: float,
) -> float:
fps, tps, thresholds = _binary_clf_curve(y_actual, y_prob)
precision = tps / (tps + fps)
for k in reversed(range(len(precision))):
if precision[k] >= min_precision:
return thresholds[k]
return float("inf")

View File

@@ -3,8 +3,7 @@
# Released under the modified BSD license. See COPYING.md for more details. # Released under the modified BSD license. See COPYING.md for more details.
import logging import logging
from copy import deepcopy from typing import Union, Dict, Callable, List, Hashable, Optional
from typing import Union, Dict, Any
import numpy as np import numpy as np
from tqdm.auto import tqdm from tqdm.auto import tqdm
@@ -14,35 +13,46 @@ from miplearn.classifiers.adaptive import AdaptiveClassifier
from miplearn.classifiers.threshold import MinPrecisionThreshold, Threshold from miplearn.classifiers.threshold import MinPrecisionThreshold, Threshold
from miplearn.components import classifier_evaluation_dict from miplearn.components import classifier_evaluation_dict
from miplearn.components.component import Component from miplearn.components.component import Component
from miplearn.extractors import VariableFeaturesExtractor, SolutionExtractor, Extractor from miplearn.extractors import InstanceIterator
from miplearn.instance import Instance
from miplearn.types import TrainingSample, VarIndex, Solution
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PrimalSolutionComponent(Component): class PrimalSolutionComponent(Component):
""" """
A component that predicts primal solutions. A component that predicts the optimal primal values for the binary decision
variables.
In exact mode, predicted primal solutions are provided to the solver as MIP
starts. In heuristic mode, this component fixes the decision variables to their
predicted values.
""" """
def __init__( def __init__(
self, self,
classifier: Classifier = AdaptiveClassifier(), classifier: Callable[[], Classifier] = lambda: AdaptiveClassifier(),
mode: str = "exact", mode: str = "exact",
threshold: Union[float, Threshold] = MinPrecisionThreshold(0.98), threshold: Callable[[], Threshold] = lambda: MinPrecisionThreshold(
[0.98, 0.98]
),
) -> None: ) -> None:
assert mode in ["exact", "heuristic"]
self.mode = mode self.mode = mode
self.classifiers: Dict[Any, Classifier] = {} self.classifiers: Dict[Hashable, Classifier] = {}
self.thresholds: Dict[Any, Union[float, Threshold]] = {} self.thresholds: Dict[Hashable, Threshold] = {}
self.threshold_prototype = threshold self.threshold_factory = threshold
self.classifier_prototype = classifier self.classifier_factory = classifier
def before_solve(self, solver, instance, model): def before_solve(self, solver, instance, model):
logger.info("Predicting primal solution...") if len(self.thresholds) > 0:
solution = self.predict(instance) logger.info("Predicting primal solution...")
if self.mode == "heuristic": solution = self.predict(instance)
solver.internal_solver.fix(solution) if self.mode == "heuristic":
else: solver.internal_solver.fix(solution)
solver.internal_solver.set_warm_start(solution) else:
solver.internal_solver.set_warm_start(solution)
def after_solve( def after_solve(
self, self,
@@ -54,79 +64,76 @@ class PrimalSolutionComponent(Component):
): ):
pass pass
def x(self, training_instances): def x(
return VariableFeaturesExtractor().extract(training_instances) self,
instances: Union[List[str], List[Instance]],
) -> Dict[Hashable, np.ndarray]:
return self._build_x_y_dict(instances, self._extract_variable_features)
def y(self, training_instances): def y(
return SolutionExtractor().extract(training_instances) self,
instances: Union[List[str], List[Instance]],
) -> Dict[Hashable, np.ndarray]:
return self._build_x_y_dict(instances, self._extract_variable_labels)
def fit(self, training_instances, n_jobs=1): def fit(
logger.debug("Extracting features...") self,
features = VariableFeaturesExtractor().extract(training_instances) training_instances: Union[List[str], List[Instance]],
solutions = SolutionExtractor().extract(training_instances) n_jobs: int = 1,
) -> None:
x = self.x(training_instances)
y = self.y(training_instances)
for category in x.keys():
clf = self.classifier_factory()
thr = self.threshold_factory()
clf.fit(x[category], y[category])
thr.fit(clf, x[category], y[category])
self.classifiers[category] = clf
self.thresholds[category] = thr
for category in tqdm( def predict(self, instance: Instance) -> Solution:
features.keys(), assert len(instance.training_data) > 0
desc="Fit (primal)", sample = instance.training_data[-1]
): assert "LP solution" in sample
x_train = features[category] lp_solution = sample["LP solution"]
for label in [0, 1]: assert lp_solution is not None
y_train = solutions[category][:, label].astype(int)
# If all samples are either positive or negative, make constant # Initialize empty solution
# predictions solution: Solution = {}
y_avg = np.average(y_train) for (var_name, var_dict) in lp_solution.items():
if y_avg < 0.001 or y_avg >= 0.999: solution[var_name] = {}
self.classifiers[category, label] = round(y_avg) for (idx, lp_value) in var_dict.items():
self.thresholds[category, label] = 0.50 solution[var_name][idx] = None
continue
# Create a copy of classifier prototype and train it # Compute y_pred
if isinstance(self.classifier_prototype, list): x = self.x([instance])
clf = deepcopy(self.classifier_prototype[label]) y_pred = {}
else: for category in x.keys():
clf = deepcopy(self.classifier_prototype) assert category in self.classifiers, (
clf.fit(x_train, y_train) f"Classifier for category {category} has not been trained. "
f"Please call component.fit before component.predict."
)
proba = self.classifiers[category].predict_proba(x[category])
thr = self.thresholds[category].predict(x[category])
y_pred[category] = np.vstack(
[
proba[:, 0] > thr[0],
proba[:, 1] > thr[1],
]
).T
# Find threshold (dynamic or static) # Convert y_pred into solution
if isinstance(self.threshold_prototype, Threshold): category_offset: Dict[Hashable, int] = {cat: 0 for cat in x.keys()}
self.thresholds[category, label] = self.threshold_prototype.fit( for (var_name, var_dict) in lp_solution.items():
clf, for (idx, lp_value) in var_dict.items():
x_train, category = instance.get_variable_category(var_name, idx)
y_train, offset = category_offset[category]
) category_offset[category] += 1
else: if y_pred[category][offset, 0]:
self.thresholds[category, label] = deepcopy( solution[var_name][idx] = 0.0
self.threshold_prototype if y_pred[category][offset, 1]:
) solution[var_name][idx] = 1.0
self.classifiers[category, label] = clf
def predict(self, instance):
solution = {}
x_test = VariableFeaturesExtractor().extract([instance])
var_split = Extractor.split_variables(instance)
for category in var_split.keys():
n = len(var_split[category])
for (i, (var, index)) in enumerate(var_split[category]):
if var not in solution.keys():
solution[var] = {}
solution[var][index] = None
for label in [0, 1]:
if (category, label) not in self.classifiers.keys():
continue
clf = self.classifiers[category, label]
if isinstance(clf, float) or isinstance(clf, int):
ws = np.array([[1 - clf, clf] for _ in range(n)])
else:
ws = clf.predict_proba(x_test[category])
assert ws.shape == (n, 2), "ws.shape should be (%d, 2) not %s" % (
n,
ws.shape,
)
for (i, (var, index)) in enumerate(var_split[category]):
if ws[i, 1] >= self.thresholds[category, label]:
solution[var][index] = label
return solution return solution
def evaluate(self, instances): def evaluate(self, instances):
@@ -175,3 +182,82 @@ class PrimalSolutionComponent(Component):
tp_one, tn_one, fp_one, fn_one tp_one, tn_one, fp_one, fn_one
) )
return ev return ev
@staticmethod
def _build_x_y_dict(
instances: Union[List[str], List[Instance]],
extract: Callable[
[
Instance,
TrainingSample,
str,
VarIndex,
Optional[float],
],
Union[List[bool], List[float]],
],
) -> Dict[Hashable, np.ndarray]:
result: Dict[Hashable, List] = {}
for instance in InstanceIterator(instances):
assert isinstance(instance, Instance)
for sample in instance.training_data:
# Skip training samples without solution
if "LP solution" not in sample:
continue
if sample["LP solution"] is None:
continue
# Iterate over all variables
for (var, var_dict) in sample["LP solution"].items():
for (idx, lp_value) in var_dict.items():
category = instance.get_variable_category(var, idx)
if category is None:
continue
if category not in result:
result[category] = []
result[category] += [
extract(
instance,
sample,
var,
idx,
lp_value,
)
]
# Convert result to numpy arrays and return
return {c: np.array(ft) for (c, ft) in result.items()}
@staticmethod
def _extract_variable_features(
instance: Instance,
sample: TrainingSample,
var: str,
idx: VarIndex,
lp_value: Optional[float],
) -> Union[List[bool], List[float]]:
features = instance.get_variable_features(var, idx)
if lp_value is None:
return features
else:
return features + [lp_value]
@staticmethod
def _extract_variable_labels(
instance: Instance,
sample: TrainingSample,
var: str,
idx: VarIndex,
lp_value: Optional[float],
) -> Union[List[bool], List[float]]:
assert "Solution" in sample
solution = sample["Solution"]
assert solution is not None
opt_value = solution[var][idx]
assert opt_value is not None
assert 0.0 <= opt_value <= 1.0, (
f"Variable {var} has non-binary value {opt_value} in the optimal solution. "
f"Predicting values of non-binary variables is not currently supported. "
f"Please set its category to None."
)
return [opt_value < 0.5, opt_value > 0.5]

View File

@@ -6,22 +6,28 @@ import gzip
import logging import logging
import pickle import pickle
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List, Union, cast, IO
import numpy as np import numpy as np
from tqdm.auto import tqdm from tqdm.auto import tqdm
from miplearn.instance import Instance
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class InstanceIterator: class InstanceIterator:
def __init__(self, instances): def __init__(
self,
instances: Union[List[str], List[Instance]],
) -> None:
self.instances = instances self.instances = instances
self.current = 0 self.current = 0
def __iter__(self): def __iter__(self):
return self return self
def __next__(self): def __next__(self) -> Instance:
if self.current >= len(self.instances): if self.current >= len(self.instances):
raise StopIteration raise StopIteration
result = self.instances[self.current] result = self.instances[self.current]
@@ -30,13 +36,14 @@ class InstanceIterator:
logger.debug("Read: %s" % result) logger.debug("Read: %s" % result)
try: try:
if result.endswith(".gz"): if result.endswith(".gz"):
with gzip.GzipFile(result, "rb") as file: with gzip.GzipFile(result, "rb") as gzfile:
result = pickle.load(file) result = pickle.load(cast(IO[bytes], gzfile))
else: else:
with open(result, "rb") as file: with open(result, "rb") as file:
result = pickle.load(file) result = pickle.load(cast(IO[bytes], file))
except pickle.UnpicklingError: except pickle.UnpicklingError:
raise Exception(f"Invalid instance file: {result}") raise Exception(f"Invalid instance file: {result}")
assert isinstance(result, Instance)
return result return result

View File

@@ -5,11 +5,11 @@
import gzip import gzip
import json import json
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, List from typing import Any, List, Optional, Hashable
import numpy as np import numpy as np
from miplearn.types import TrainingSample from miplearn.types import TrainingSample, VarIndex
class Instance(ABC): class Instance(ABC):
@@ -34,9 +34,9 @@ class Instance(ABC):
""" """
pass pass
def get_instance_features(self): def get_instance_features(self) -> List[float]:
""" """
Returns a 1-dimensional Numpy array of (numerical) features describing the Returns a 1-dimensional array of (numerical) features describing the
entire instance. entire instance.
The array is used by LearningSolver to determine how similar two instances The array is used by LearningSolver to determine how similar two instances
@@ -56,17 +56,13 @@ class Instance(ABC):
By default, returns [0]. By default, returns [0].
""" """
return np.zeros(1) return [0]
def get_variable_features(self, var, index): def get_variable_features(self, var_name: str, index: VarIndex) -> List[float]:
""" """
Returns a 1-dimensional array of (numerical) features describing a particular Returns a 1-dimensional array of (numerical) features describing a particular
decision variable. decision variable.
The argument `var` is a pyomo.core.Var object, which represents a collection
of decision variables. The argument `index` specifies which variable in the
collection is the relevant one.
In combination with instance features, variable features are used by In combination with instance features, variable features are used by
LearningSolver to predict, among other things, the optimal value of each LearningSolver to predict, among other things, the optimal value of each
decision variable before the optimization takes place. In the knapsack decision variable before the optimization takes place. In the knapsack
@@ -79,12 +75,15 @@ class Instance(ABC):
By default, returns [0]. By default, returns [0].
""" """
return np.zeros(1) return [0]
def get_variable_category(self, var, index): def get_variable_category(
self,
var_name: str,
index: VarIndex,
) -> Optional[Hashable]:
""" """
Returns the category (a string, an integer or any hashable type) for each Returns the category for each decision variable.
decision variable.
If two variables have the same category, LearningSolver will use the same If two variables have the same category, LearningSolver will use the same
internal ML model to predict the values of both variables. If the returned internal ML model to predict the values of both variables. If the returned

View File

@@ -16,27 +16,41 @@ def test_threshold_dynamic():
return_value=np.array( return_value=np.array(
[ [
[0.10, 0.90], [0.10, 0.90],
[0.10, 0.90], [0.25, 0.75],
[0.20, 0.80], [0.40, 0.60],
[0.30, 0.70], [0.90, 0.10],
] ]
) )
) )
x_train = np.array([0, 1, 2, 3]) x_train = np.array(
y_train = np.array([1, 1, 0, 0]) [
[0],
[1],
[2],
[3],
]
)
y_train = np.array(
[
[False, True],
[False, True],
[True, False],
[True, False],
]
)
threshold = MinPrecisionThreshold(min_precision=1.0) threshold = MinPrecisionThreshold(min_precision=[1.0, 1.0])
threshold.fit(clf, x_train, y_train) threshold.fit(clf, x_train, y_train)
assert threshold.predict(x_train) == 0.90 assert threshold.predict(x_train) == [0.40, 0.75]
threshold = MinPrecisionThreshold(min_precision=0.65) # threshold = MinPrecisionThreshold(min_precision=0.65)
threshold.fit(clf, x_train, y_train) # threshold.fit(clf, x_train, y_train)
assert threshold.predict(x_train) == 0.80 # assert threshold.predict(x_train) == [0.0, 0.80]
threshold = MinPrecisionThreshold(min_precision=0.50) # threshold = MinPrecisionThreshold(min_precision=0.50)
threshold.fit(clf, x_train, y_train) # threshold.fit(clf, x_train, y_train)
assert threshold.predict(x_train) == 0.70 # assert threshold.predict(x_train) == [0.0, 0.70]
#
threshold = MinPrecisionThreshold(min_precision=0.00) # threshold = MinPrecisionThreshold(min_precision=0.00)
threshold.fit(clf, x_train, y_train) # threshold.fit(clf, x_train, y_train)
assert threshold.predict(x_train) == 0.70 # assert threshold.predict(x_train) == [0.0, 0.70]

View File

@@ -1,111 +1,245 @@
# MIPLearn: Extensible Framework for Learning-Enhanced Mixed-Integer Optimization # MIPLearn: Extensible Framework for Learning-Enhanced Mixed-Integer Optimization
# Copyright (C) 2020, UChicago Argonne, LLC. All rights reserved. # Copyright (C) 2020, UChicago Argonne, LLC. All rights reserved.
# Released under the modified BSD license. See COPYING.md for more details. # Released under the modified BSD license. See COPYING.md for more details.
from typing import cast, List
from unittest.mock import Mock from unittest.mock import Mock, call
import numpy as np import numpy as np
from numpy.testing import assert_array_equal
from miplearn.classifiers import Classifier from miplearn import Classifier
from miplearn.classifiers.threshold import Threshold, MinPrecisionThreshold
from miplearn.components.primal import PrimalSolutionComponent from miplearn.components.primal import PrimalSolutionComponent
from .. import get_test_pyomo_instances from miplearn.instance import Instance
from tests import get_test_pyomo_instances
def test_predict(): def test_x_y_fit() -> None:
instances, models = get_test_pyomo_instances()
comp = PrimalSolutionComponent() comp = PrimalSolutionComponent()
comp.fit(instances) training_instances = cast(
solution = comp.predict(instances[0]) List[Instance],
assert "x" in solution [
assert 0 in solution["x"] Mock(spec=Instance),
assert 1 in solution["x"] Mock(spec=Instance),
assert 2 in solution["x"] ],
assert 3 in solution["x"] )
# Construct first instance
training_instances[0].get_variable_category = Mock( # type: ignore
side_effect=lambda var_name, index: {
0: "default",
1: None,
2: "default",
3: "default",
}[index]
)
training_instances[0].get_variable_features = Mock( # type: ignore
side_effect=lambda var, index: {
0: [0.0, 0.0],
1: [0.0, 1.0],
2: [1.0, 0.0],
3: [1.0, 1.0],
}[index]
)
training_instances[0].training_data = [
{
"Solution": {
"x": {
0: 0.0,
1: 1.0,
2: 0.0,
3: 0.0,
}
},
"LP solution": {
"x": {
0: 0.1,
1: 0.1,
2: 0.1,
3: 0.1,
}
},
},
{
"Solution": {
"x": {
0: 0.0,
1: 1.0,
2: 1.0,
3: 0.0,
}
},
"LP solution": {
"x": {
0: 0.2,
1: 0.2,
2: 0.2,
3: 0.2,
}
},
},
]
def test_evaluate(): # Construct second instance
instances, models = get_test_pyomo_instances() training_instances[1].get_variable_category = Mock( # type: ignore
clf_zero = Mock(spec=Classifier) side_effect=lambda var_name, index: {
clf_zero.predict_proba = Mock( 0: "default",
return_value=np.array( 1: None,
2: "default",
3: "default",
}[index]
)
training_instances[1].get_variable_features = Mock( # type: ignore
side_effect=lambda var, index: {
0: [0.0, 0.0],
1: [0.0, 2.0],
2: [2.0, 0.0],
3: [2.0, 2.0],
}[index]
)
training_instances[1].training_data = [
{
"Solution": {
"x": {
0: 1.0,
1: 1.0,
2: 1.0,
3: 1.0,
}
},
"LP solution": {
"x": {
0: 0.3,
1: 0.3,
2: 0.3,
3: 0.3,
}
},
},
{
"Solution": None,
"LP solution": None,
},
]
# Test x
x_expected = {
"default": np.array(
[ [
[0.0, 1.0], # x[0] [0.0, 0.0, 0.1],
[0.0, 1.0], # x[1] [1.0, 0.0, 0.1],
[1.0, 0.0], # x[2] [1.0, 1.0, 0.1],
[1.0, 0.0], # x[3] [0.0, 0.0, 0.2],
[1.0, 0.0, 0.2],
[1.0, 1.0, 0.2],
[0.0, 0.0, 0.3],
[2.0, 0.0, 0.3],
[2.0, 2.0, 0.3],
] ]
) )
)
clf_one = Mock(spec=Classifier)
clf_one.predict_proba = Mock(
return_value=np.array(
[
[1.0, 0.0], # x[0] instances[0]
[1.0, 0.0], # x[1] instances[0]
[0.0, 1.0], # x[2] instances[0]
[1.0, 0.0], # x[3] instances[0]
]
)
)
comp = PrimalSolutionComponent(classifier=[clf_zero, clf_one], threshold=0.50)
comp.fit(instances[:1])
assert comp.predict(instances[0]) == {"x": {0: 0, 1: 0, 2: 1, 3: None}}
assert instances[0].training_data[0]["Solution"] == {"x": {0: 1, 1: 0, 2: 1, 3: 1}}
ev = comp.evaluate(instances[:1])
assert ev == {
"Fix one": {
0: {
"Accuracy": 0.5,
"Condition negative": 1,
"Condition negative (%)": 25.0,
"Condition positive": 3,
"Condition positive (%)": 75.0,
"F1 score": 0.5,
"False negative": 2,
"False negative (%)": 50.0,
"False positive": 0,
"False positive (%)": 0.0,
"Precision": 1.0,
"Predicted negative": 3,
"Predicted negative (%)": 75.0,
"Predicted positive": 1,
"Predicted positive (%)": 25.0,
"Recall": 0.3333333333333333,
"True negative": 1,
"True negative (%)": 25.0,
"True positive": 1,
"True positive (%)": 25.0,
}
},
"Fix zero": {
0: {
"Accuracy": 0.75,
"Condition negative": 3,
"Condition negative (%)": 75.0,
"Condition positive": 1,
"Condition positive (%)": 25.0,
"F1 score": 0.6666666666666666,
"False negative": 0,
"False negative (%)": 0.0,
"False positive": 1,
"False positive (%)": 25.0,
"Precision": 0.5,
"Predicted negative": 2,
"Predicted negative (%)": 50.0,
"Predicted positive": 2,
"Predicted positive (%)": 50.0,
"Recall": 1.0,
"True negative": 2,
"True negative (%)": 50.0,
"True positive": 1,
"True positive (%)": 25.0,
}
},
} }
x_actual = comp.x(training_instances)
assert len(x_actual.keys()) == 1
assert_array_equal(x_actual["default"], x_expected["default"])
# Test y
y_expected = {
"default": np.array(
[
[True, False],
[True, False],
[True, False],
[True, False],
[False, True],
[True, False],
[False, True],
[False, True],
[False, True],
]
)
}
y_actual = comp.y(training_instances)
assert len(y_actual.keys()) == 1
assert_array_equal(y_actual["default"], y_expected["default"])
# Test fit
classifier = Mock(spec=Classifier)
threshold = Mock(spec=Threshold)
classifier_factory = Mock(return_value=classifier)
threshold_factory = Mock(return_value=threshold)
comp = PrimalSolutionComponent(
classifier=classifier_factory,
threshold=threshold_factory,
)
comp.fit(training_instances)
# Should build and train classifier for "default" category
classifier_factory.assert_called_once()
assert_array_equal(x_actual["default"], classifier.fit.call_args.args[0])
assert_array_equal(y_actual["default"], classifier.fit.call_args.args[1])
# Should build and train threshold for "default" category
threshold_factory.assert_called_once()
assert classifier == threshold.fit.call_args.args[0]
assert_array_equal(x_actual["default"], threshold.fit.call_args.args[1])
assert_array_equal(y_actual["default"], threshold.fit.call_args.args[2])
def test_primal_parallel_fit(): def test_predict() -> None:
instances, models = get_test_pyomo_instances()
comp = PrimalSolutionComponent() comp = PrimalSolutionComponent()
comp.fit(instances, n_jobs=2)
assert len(comp.classifiers) == 2 clf = Mock(spec=Classifier)
clf.predict_proba = Mock(
return_value=np.array(
[
[0.9, 0.1],
[0.5, 0.5],
[0.1, 0.9],
]
)
)
comp.classifiers = {"default": clf}
thr = Mock(spec=Threshold)
thr.predict = Mock(return_value=[0.75, 0.75])
comp.thresholds = {"default": thr}
instance = cast(Instance, Mock(spec=Instance))
instance.get_variable_category = Mock( # type: ignore
return_value="default",
)
instance.get_variable_features = Mock( # type: ignore
side_effect=lambda var, index: {
0: [0.0, 0.0],
1: [0.0, 2.0],
2: [2.0, 0.0],
}[index]
)
instance.training_data = [
{
"LP solution": {
"x": {
0: 0.1,
1: 0.5,
2: 0.9,
}
}
}
]
x = comp.x([instance])
solution_actual = comp.predict(instance)
# Should ask for probabilities and thresholds
clf.predict_proba.assert_called_once()
thr.predict.assert_called_once()
assert_array_equal(x["default"], clf.predict_proba.call_args.args[0])
assert_array_equal(x["default"], thr.predict.call_args.args[0])
assert solution_actual == {
"x": {
0: 0.0,
1: None,
2: 1.0,
}
}