You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
MIPLearn/miplearn/solvers/pyomo/base.py

587 lines
19 KiB

# MIPLearn: Extensible Framework for Learning-Enhanced Mixed-Integer Optimization
# Copyright (C) 2020-2021, UChicago Argonne, LLC. All rights reserved.
# Released under the modified BSD license. See COPYING.md for more details.
import logging
import re
import sys
from io import StringIO
from typing import Any, List, Dict, Optional
import numpy as np
import pyomo
from overrides import overrides
from pyomo import environ as pe
from pyomo.core import Var, Suffix, Objective
from pyomo.core.base import _GeneralVarData
from pyomo.core.base.constraint import ConstraintList
from pyomo.core.expr.numeric_expr import SumExpression, MonomialTermExpression
from pyomo.opt import TerminationCondition
from pyomo.opt.base.solvers import SolverFactory
from miplearn.features import Variable
from miplearn.instance.base import Instance
from miplearn.solvers import _RedirectOutput
from miplearn.solvers.internal import (
InternalSolver,
LPSolveStats,
IterationCallback,
LazyCallback,
MIPSolveStats,
Constraint,
)
from miplearn.types import (
SolverParams,
UserCutCallback,
Solution,
VariableName,
Category,
)
logger = logging.getLogger(__name__)
class BasePyomoSolver(InternalSolver):
"""
Base class for all Pyomo solvers.
"""
def __init__(
self,
solver_factory: SolverFactory,
params: SolverParams,
) -> None:
self.instance: Optional[Instance] = None
self.model: Optional[pe.ConcreteModel] = None
self.params = params
self._all_vars: List[pe.Var] = []
self._bin_vars: List[pe.Var] = []
self._is_warm_start_available: bool = False
self._pyomo_solver: SolverFactory = solver_factory
self._obj_sense: str = "min"
self._varname_to_var: Dict[str, pe.Var] = {}
self._cname_to_constr: Dict[str, pe.Constraint] = {}
self._termination_condition: str = ""
self._has_lp_solution = False
self._has_mip_solution = False
self._obj: Dict[str, float] = {}
for (key, value) in params.items():
self._pyomo_solver.options[key] = value
@overrides
def add_constraint(
self,
constr: Any,
name: str,
) -> None:
assert self.model is not None
if isinstance(constr, Constraint):
lhs = 0.0
for (varname, coeff) in constr.lhs.items():
var = self._varname_to_var[varname]
lhs += var * coeff
if constr.sense == "=":
expr = lhs == constr.rhs
elif constr.sense == "<":
expr = lhs <= constr.rhs
else:
expr = lhs >= constr.rhs
cl = pe.Constraint(expr=expr, name=name)
self.model.add_component(name, cl)
self._pyomo_solver.add_constraint(cl)
self._cname_to_constr[name] = cl
else:
self._pyomo_solver.add_constraint(constr)
self._termination_condition = ""
self._has_lp_solution = False
self._has_mip_solution = False
@overrides
def are_callbacks_supported(self) -> bool:
return False
@overrides
def build_test_instance_infeasible(self) -> Instance:
return PyomoTestInstanceInfeasible()
@overrides
def build_test_instance_redundancy(self) -> Instance:
return PyomoTestInstanceRedundancy()
@overrides
def build_test_instance_knapsack(self) -> Instance:
return PyomoTestInstanceKnapsack(
weights=[23.0, 26.0, 20.0, 18.0],
prices=[505.0, 352.0, 458.0, 220.0],
capacity=67.0,
)
@overrides
def fix(self, solution: Solution) -> None:
for (varname, value) in solution.items():
if value is None:
continue
var = self._varname_to_var[varname]
var.fix(value)
self._pyomo_solver.update_var(var)
@overrides
def get_constraints(self) -> Dict[str, Constraint]:
assert self.model is not None
constraints = {}
for constr in self.model.component_objects(pyomo.core.Constraint):
if isinstance(constr, pe.ConstraintList):
for idx in constr:
name = f"{constr.name}[{idx}]"
assert name not in constraints
constraints[name] = self._parse_pyomo_constraint(constr[idx])
else:
name = constr.name
assert name not in constraints
constraints[name] = self._parse_pyomo_constraint(constr)
return constraints
@overrides
def get_constraint_attrs(self) -> List[str]:
return [
"dual_value",
"lazy",
"lhs",
"rhs",
"sense",
"slack",
]
@overrides
def get_solution(self) -> Optional[Solution]:
assert self.model is not None
if self.is_infeasible():
return None
solution: Solution = {}
for var in self.model.component_objects(Var):
for index in var:
if var[index].fixed:
continue
solution[f"{var}[{index}]"] = var[index].value
return solution
@overrides
def get_variables(self) -> Dict[str, Variable]:
assert self.model is not None
variables = {}
for var in self.model.component_objects(pyomo.core.Var):
for idx in var:
varname = f"{var}[{idx}]"
if idx is None:
varname = str(var)
variables[varname] = self._parse_pyomo_variable(var[idx])
return variables
@overrides
def get_variable_attrs(self) -> List[str]:
return [
# "basis_status",
"lower_bound",
"obj_coeff",
"reduced_cost",
# "sa_lb_down",
# "sa_lb_up",
# "sa_obj_down",
# "sa_obj_up",
# "sa_ub_down",
# "sa_ub_up",
"type",
"upper_bound",
"value",
]
@overrides
def is_constraint_satisfied(self, constr: Constraint, tol: float = 1e-6) -> bool:
lhs = 0.0
for (varname, coeff) in constr.lhs.items():
var = self._varname_to_var[varname]
lhs += var.value * coeff
if constr.sense == "<":
return lhs <= constr.rhs + tol
elif constr.sense == ">":
return lhs >= constr.rhs - tol
else:
return abs(constr.rhs - lhs) < abs(tol)
@overrides
def is_infeasible(self) -> bool:
return self._termination_condition == TerminationCondition.infeasible
@overrides
def remove_constraint(self, name: str) -> None:
assert self.model is not None
constr = self._cname_to_constr[name]
del self._cname_to_constr[name]
self.model.del_component(constr)
self._pyomo_solver.remove_constraint(constr)
@overrides
def set_instance(
self,
instance: Instance,
model: Any = None,
) -> None:
if model is None:
model = instance.to_model()
assert isinstance(model, pe.ConcreteModel)
self.instance = instance
self.model = model
self.model.extra_constraints = ConstraintList()
self.model.dual = Suffix(direction=Suffix.IMPORT)
self.model.rc = Suffix(direction=Suffix.IMPORT)
self.model.slack = Suffix(direction=Suffix.IMPORT)
self._pyomo_solver.set_instance(model)
self._update_obj()
self._update_vars()
self._update_constrs()
@overrides
def set_warm_start(self, solution: Solution) -> None:
self._clear_warm_start()
count_fixed = 0
for (var_name, value) in solution.items():
if value is None:
continue
var = self._varname_to_var[var_name]
var.value = solution[var_name]
count_fixed += 1
if count_fixed > 0:
self._is_warm_start_available = True
@overrides
def solve(
self,
tee: bool = False,
iteration_cb: Optional[IterationCallback] = None,
lazy_cb: Optional[LazyCallback] = None,
user_cut_cb: Optional[UserCutCallback] = None,
) -> MIPSolveStats:
assert lazy_cb is None, "callbacks are not currently supported"
assert user_cut_cb is None, "callbacks are not currently supported"
total_wallclock_time = 0
streams: List[Any] = [StringIO()]
if tee:
streams += [sys.stdout]
if iteration_cb is None:
iteration_cb = lambda: False
while True:
logger.debug("Solving MIP...")
with _RedirectOutput(streams):
results = self._pyomo_solver.solve(
tee=True,
warmstart=self._is_warm_start_available,
)
total_wallclock_time += results["Solver"][0]["Wallclock time"]
should_repeat = iteration_cb()
if not should_repeat:
break
log = streams[0].getvalue()
node_count = self._extract_node_count(log)
ws_value = self._extract_warm_start_value(log)
self._termination_condition = results["Solver"][0]["Termination condition"]
lb, ub = None, None
self._has_mip_solution = False
self._has_lp_solution = False
if not self.is_infeasible():
self._has_mip_solution = True
lb = results["Problem"][0]["Lower bound"]
ub = results["Problem"][0]["Upper bound"]
return MIPSolveStats(
mip_lower_bound=lb,
mip_upper_bound=ub,
mip_wallclock_time=total_wallclock_time,
mip_sense=self._obj_sense,
mip_log=log,
mip_nodes=node_count,
mip_warm_start_value=ws_value,
)
@overrides
def solve_lp(
self,
tee: bool = False,
) -> LPSolveStats:
self.relax()
streams: List[Any] = [StringIO()]
if tee:
streams += [sys.stdout]
with _RedirectOutput(streams):
results = self._pyomo_solver.solve(tee=True)
self._termination_condition = results["Solver"][0]["Termination condition"]
self._restore_integrality()
opt_value = None
self._has_lp_solution = False
self._has_mip_solution = False
if not self.is_infeasible():
opt_value = results["Problem"][0]["Lower bound"]
self._has_lp_solution = True
return LPSolveStats(
lp_value=opt_value,
lp_log=streams[0].getvalue(),
lp_wallclock_time=results["Solver"][0]["Wallclock time"],
)
@overrides
def relax(self) -> None:
for var in self._bin_vars:
lb, ub = var.bounds
var.setlb(lb)
var.setub(ub)
var.domain = pyomo.core.base.set_types.Reals
self._pyomo_solver.update_var(var)
def _clear_warm_start(self) -> None:
for var in self._all_vars:
if not var.fixed:
var.value = None
self._is_warm_start_available = False
@staticmethod
def _extract(
log: str,
regexp: Optional[str],
default: Optional[str] = None,
) -> Optional[str]:
if regexp is None:
return default
value = default
for line in log.splitlines():
matches = re.findall(regexp, line)
if len(matches) == 0:
continue
value = matches[0]
return value
def _extract_node_count(self, log: str) -> Optional[int]:
value = self._extract(log, self._get_node_count_regexp())
if value is None:
return None
return int(value)
def _extract_warm_start_value(self, log: str) -> Optional[float]:
value = self._extract(log, self._get_warm_start_regexp())
if value is None:
return None
return float(value)
def _get_node_count_regexp(self) -> Optional[str]:
return None
def _get_warm_start_regexp(self) -> Optional[str]:
return None
def _parse_pyomo_variable(self, var: pyomo.core.Var) -> Variable:
assert self.model is not None
# Variable type
vtype: Optional[str] = None
if var.domain == pyomo.core.Binary:
vtype = "B"
elif var.domain in [
pyomo.core.Reals,
pyomo.core.NonNegativeReals,
pyomo.core.NonPositiveReals,
pyomo.core.NegativeReals,
pyomo.core.PositiveReals,
]:
vtype = "C"
if vtype is None:
raise Exception(f"unknown variable domain: {var.domain}")
# Bounds
lb, ub = var.bounds
# Reduced costs
rc = None
if var in self.model.rc:
rc = self.model.rc[var]
# Objective coefficient
obj_coeff = 0.0
if var.name in self._obj:
obj_coeff = self._obj[var.name]
return Variable(
value=var.value,
type=vtype,
lower_bound=float(lb),
upper_bound=float(ub),
obj_coeff=obj_coeff,
reduced_cost=rc,
)
def _parse_pyomo_constraint(
self,
pyomo_constr: pyomo.core.Constraint,
) -> Constraint:
assert self.model is not None
constr = Constraint()
# Extract RHS and sense
has_ub = pyomo_constr.has_ub()
has_lb = pyomo_constr.has_lb()
assert (
(not has_lb) or (not has_ub) or pyomo_constr.upper() == pyomo_constr.lower()
), "range constraints not supported"
if not has_ub:
constr.sense = ">"
constr.rhs = pyomo_constr.lower()
elif not has_lb:
constr.sense = "<"
constr.rhs = pyomo_constr.upper()
else:
constr.sense = "="
constr.rhs = pyomo_constr.upper()
# Extract LHS
constr.lhs = self._parse_pyomo_expr(pyomo_constr.body)
# Extract solution attributes
if self._has_lp_solution:
constr.dual_value = self.model.dual[pyomo_constr]
if self._has_mip_solution or self._has_lp_solution:
constr.slack = self.model.slack[pyomo_constr]
# Build constraint
return constr
def _parse_pyomo_expr(self, expr: Any) -> Dict[str, float]:
lhs = {}
if isinstance(expr, SumExpression):
for term in expr._args_:
if isinstance(term, MonomialTermExpression):
lhs[term._args_[1].name] = float(term._args_[0])
elif isinstance(term, _GeneralVarData):
lhs[term.name] = 1.0
else:
raise Exception(f"Unknown term type: {term.__class__.__name__}")
elif isinstance(expr, _GeneralVarData):
lhs[expr.name] = 1.0
else:
raise Exception(f"Unknown expression type: {expr.__class__.__name__}")
return lhs
def _restore_integrality(self) -> None:
for var in self._bin_vars:
var.domain = pyomo.core.base.set_types.Binary
self._pyomo_solver.update_var(var)
def _update_obj(self) -> None:
self._obj_sense = "max"
if self._pyomo_solver._objective.sense == pyomo.core.kernel.objective.minimize:
self._obj_sense = "min"
def _update_vars(self) -> None:
assert self.model is not None
self._all_vars = []
self._bin_vars = []
self._varname_to_var = {}
for var in self.model.component_objects(Var):
for idx in var:
varname = f"{var.name}[{idx}]"
if idx is None:
varname = var.name
self._varname_to_var[varname] = var[idx]
self._all_vars += [var[idx]]
if var[idx].domain == pyomo.core.base.set_types.Binary:
self._bin_vars += [var[idx]]
for obj in self.model.component_objects(Objective):
self._obj = self._parse_pyomo_expr(obj.expr)
break
def _update_constrs(self) -> None:
assert self.model is not None
self._cname_to_constr.clear()
for constr in self.model.component_objects(pyomo.core.Constraint):
if isinstance(constr, pe.ConstraintList):
for idx in constr:
self._cname_to_constr[f"{constr.name}[{idx}]"] = constr[idx]
else:
self._cname_to_constr[constr.name] = constr
class PyomoTestInstanceInfeasible(Instance):
@overrides
def to_model(self) -> pe.ConcreteModel:
model = pe.ConcreteModel()
model.x = pe.Var([0], domain=pe.Binary)
model.OBJ = pe.Objective(expr=model.x[0], sense=pe.maximize)
model.eq = pe.Constraint(expr=model.x[0] >= 2)
return model
class PyomoTestInstanceRedundancy(Instance):
@overrides
def to_model(self) -> pe.ConcreteModel:
model = pe.ConcreteModel()
model.x = pe.Var([0, 1], domain=pe.Binary)
model.OBJ = pe.Objective(expr=model.x[0] + model.x[1], sense=pe.maximize)
model.eq1 = pe.Constraint(expr=model.x[0] + model.x[1] <= 1)
model.eq2 = pe.Constraint(expr=model.x[0] + model.x[1] <= 2)
return model
class PyomoTestInstanceKnapsack(Instance):
"""
Simpler (one-dimensional) Knapsack Problem, used for testing.
"""
def __init__(
self,
weights: List[float],
prices: List[float],
capacity: float,
) -> None:
super().__init__()
self.weights = weights
self.prices = prices
self.capacity = capacity
self.varname_to_item: Dict[VariableName, int] = {
f"x[{i}]": i for i in range(len(self.weights))
}
@overrides
def to_model(self) -> pe.ConcreteModel:
model = pe.ConcreteModel()
items = range(len(self.weights))
model.x = pe.Var(items, domain=pe.Binary)
model.z = pe.Var(domain=pe.Reals, bounds=(0, self.capacity))
model.OBJ = pe.Objective(
expr=sum(model.x[v] * self.prices[v] for v in items),
sense=pe.maximize,
)
model.eq_capacity = pe.Constraint(
expr=sum(model.x[v] * self.weights[v] for v in items) == model.z
)
return model
@overrides
def get_instance_features(self) -> List[float]:
return [
self.capacity,
np.average(self.weights),
]
@overrides
def get_variable_features(self, var_name: VariableName) -> List[Category]:
item = self.varname_to_item[var_name]
return [
self.weights[item],
self.prices[item],
]
@overrides
def get_variable_category(self, var_name: VariableName) -> Optional[Category]:
if var_name.startswith("x"):
return "default"
return None