# MIPLearn: Extensible Framework for Learning-Enhanced Mixed-Integer Optimization # Copyright (C) 2020-2021, UChicago Argonne, LLC. All rights reserved. # Released under the modified BSD license. See COPYING.md for more details. import logging import re import sys from io import StringIO from typing import Any, List, Dict, Optional import numpy as np import pyomo from overrides import overrides from pyomo import environ as pe from pyomo.core import Var, Suffix, Objective from pyomo.core.base import _GeneralVarData from pyomo.core.base.constraint import ConstraintList from pyomo.core.expr.numeric_expr import SumExpression, MonomialTermExpression from pyomo.opt import TerminationCondition from pyomo.opt.base.solvers import SolverFactory from miplearn.features import Variable from miplearn.instance.base import Instance from miplearn.solvers import _RedirectOutput from miplearn.solvers.internal import ( InternalSolver, LPSolveStats, IterationCallback, LazyCallback, MIPSolveStats, Constraint, ) from miplearn.types import ( SolverParams, UserCutCallback, Solution, VariableName, Category, ) logger = logging.getLogger(__name__) class BasePyomoSolver(InternalSolver): """ Base class for all Pyomo solvers. """ def __init__( self, solver_factory: SolverFactory, params: SolverParams, ) -> None: self.instance: Optional[Instance] = None self.model: Optional[pe.ConcreteModel] = None self.params = params self._all_vars: List[pe.Var] = [] self._bin_vars: List[pe.Var] = [] self._is_warm_start_available: bool = False self._pyomo_solver: SolverFactory = solver_factory self._obj_sense: str = "min" self._varname_to_var: Dict[str, pe.Var] = {} self._cname_to_constr: Dict[str, pe.Constraint] = {} self._termination_condition: str = "" self._has_lp_solution = False self._has_mip_solution = False self._obj: Dict[str, float] = {} for (key, value) in params.items(): self._pyomo_solver.options[key] = value @overrides def solve_lp( self, tee: bool = False, ) -> LPSolveStats: self.relax() streams: List[Any] = [StringIO()] if tee: streams += [sys.stdout] with _RedirectOutput(streams): results = self._pyomo_solver.solve(tee=True) self._termination_condition = results["Solver"][0]["Termination condition"] self._restore_integrality() opt_value = None self._has_lp_solution = False self._has_mip_solution = False if not self.is_infeasible(): opt_value = results["Problem"][0]["Lower bound"] self._has_lp_solution = True return LPSolveStats( lp_value=opt_value, lp_log=streams[0].getvalue(), lp_wallclock_time=results["Solver"][0]["Wallclock time"], ) def _restore_integrality(self) -> None: for var in self._bin_vars: var.domain = pyomo.core.base.set_types.Binary self._pyomo_solver.update_var(var) @overrides def solve( self, tee: bool = False, iteration_cb: Optional[IterationCallback] = None, lazy_cb: Optional[LazyCallback] = None, user_cut_cb: Optional[UserCutCallback] = None, ) -> MIPSolveStats: assert lazy_cb is None, "callbacks are not currently supported" assert user_cut_cb is None, "callbacks are not currently supported" total_wallclock_time = 0 streams: List[Any] = [StringIO()] if tee: streams += [sys.stdout] if iteration_cb is None: iteration_cb = lambda: False while True: logger.debug("Solving MIP...") with _RedirectOutput(streams): results = self._pyomo_solver.solve( tee=True, warmstart=self._is_warm_start_available, ) total_wallclock_time += results["Solver"][0]["Wallclock time"] should_repeat = iteration_cb() if not should_repeat: break log = streams[0].getvalue() node_count = self._extract_node_count(log) ws_value = self._extract_warm_start_value(log) self._termination_condition = results["Solver"][0]["Termination condition"] lb, ub = None, None self._has_mip_solution = False self._has_lp_solution = False if not self.is_infeasible(): self._has_mip_solution = True lb = results["Problem"][0]["Lower bound"] ub = results["Problem"][0]["Upper bound"] return MIPSolveStats( mip_lower_bound=lb, mip_upper_bound=ub, mip_wallclock_time=total_wallclock_time, mip_sense=self._obj_sense, mip_log=log, mip_nodes=node_count, mip_warm_start_value=ws_value, ) @overrides def get_solution(self) -> Optional[Solution]: assert self.model is not None if self.is_infeasible(): return None solution: Solution = {} for var in self.model.component_objects(Var): for index in var: if var[index].fixed: continue solution[f"{var}[{index}]"] = var[index].value return solution @overrides def get_variable_names(self) -> List[VariableName]: assert self.model is not None variables: List[VariableName] = [] for var in self.model.component_objects(Var): for index in var: if var[index].fixed: continue variables += [f"{var}[{index}]"] return variables @overrides def set_warm_start(self, solution: Solution) -> None: self._clear_warm_start() count_fixed = 0 for (var_name, value) in solution.items(): if value is None: continue var = self._varname_to_var[var_name] var.value = solution[var_name] count_fixed += 1 if count_fixed > 0: self._is_warm_start_available = True @overrides def set_instance( self, instance: Instance, model: Any = None, ) -> None: if model is None: model = instance.to_model() assert isinstance(model, pe.ConcreteModel) self.instance = instance self.model = model self.model.extra_constraints = ConstraintList() self.model.dual = Suffix(direction=Suffix.IMPORT) self.model.rc = Suffix(direction=Suffix.IMPORT) self.model.slack = Suffix(direction=Suffix.IMPORT) self._pyomo_solver.set_instance(model) self._update_obj() self._update_vars() self._update_constrs() def _clear_warm_start(self) -> None: for var in self._all_vars: if not var.fixed: var.value = None self._is_warm_start_available = False def _update_obj(self) -> None: self._obj_sense = "max" if self._pyomo_solver._objective.sense == pyomo.core.kernel.objective.minimize: self._obj_sense = "min" def _update_vars(self) -> None: assert self.model is not None self._all_vars = [] self._bin_vars = [] self._varname_to_var = {} for var in self.model.component_objects(Var): for idx in var: self._varname_to_var[f"{var.name}[{idx}]"] = var[idx] self._all_vars += [var[idx]] if var[idx].domain == pyomo.core.base.set_types.Binary: self._bin_vars += [var[idx]] for obj in self.model.component_objects(Objective): self._obj = self._parse_pyomo_expr(obj.expr) break def _update_constrs(self) -> None: assert self.model is not None self._cname_to_constr.clear() for constr in self.model.component_objects(pyomo.core.Constraint): if isinstance(constr, pe.ConstraintList): for idx in constr: self._cname_to_constr[f"{constr.name}[{idx}]"] = constr[idx] else: self._cname_to_constr[constr.name] = constr @overrides def fix(self, solution: Solution) -> None: for (varname, value) in solution.items(): if value is None: continue var = self._varname_to_var[varname] var.fix(value) self._pyomo_solver.update_var(var) @overrides def add_constraint( self, constr: Any, name: str, ) -> None: assert self.model is not None if isinstance(constr, Constraint): lhs = 0.0 for (varname, coeff) in constr.lhs.items(): var = self._varname_to_var[varname] lhs += var * coeff if constr.sense == "=": expr = lhs == constr.rhs elif constr.sense == "<": expr = lhs <= constr.rhs else: expr = lhs >= constr.rhs cl = pe.Constraint(expr=expr, name=name) self.model.add_component(name, cl) self._pyomo_solver.add_constraint(cl) self._cname_to_constr[name] = cl else: self._pyomo_solver.add_constraint(constr) self._termination_condition = "" self._has_lp_solution = False self._has_mip_solution = False @overrides def remove_constraint(self, name: str) -> None: assert self.model is not None constr = self._cname_to_constr[name] del self._cname_to_constr[name] self.model.del_component(constr) self._pyomo_solver.remove_constraint(constr) @overrides def is_constraint_satisfied(self, constr: Constraint, tol: float = 1e-6) -> bool: lhs = 0.0 for (varname, coeff) in constr.lhs.items(): var = self._varname_to_var[varname] lhs += var.value * coeff if constr.sense == "<": return lhs <= constr.rhs + tol elif constr.sense == ">": return lhs >= constr.rhs - tol else: return abs(constr.rhs - lhs) < abs(tol) @staticmethod def __extract( log: str, regexp: Optional[str], default: Optional[str] = None, ) -> Optional[str]: if regexp is None: return default value = default for line in log.splitlines(): matches = re.findall(regexp, line) if len(matches) == 0: continue value = matches[0] return value def _extract_warm_start_value(self, log: str) -> Optional[float]: value = self.__extract(log, self._get_warm_start_regexp()) if value is None: return None return float(value) def _extract_node_count(self, log: str) -> Optional[int]: value = self.__extract(log, self._get_node_count_regexp()) if value is None: return None return int(value) def _get_warm_start_regexp(self) -> Optional[str]: return None def _get_node_count_regexp(self) -> Optional[str]: return None @overrides def relax(self) -> None: for var in self._bin_vars: lb, ub = var.bounds var.setlb(lb) var.setub(ub) var.domain = pyomo.core.base.set_types.Reals self._pyomo_solver.update_var(var) @overrides def is_infeasible(self) -> bool: return self._termination_condition == TerminationCondition.infeasible @overrides def get_dual(self, cid: str) -> float: constr = self._cname_to_constr[cid] return self._pyomo_solver.dual[constr] @overrides def get_sense(self) -> str: return self._obj_sense @overrides def build_test_instance_infeasible(self) -> Instance: return PyomoTestInstanceInfeasible() @overrides def build_test_instance_redundancy(self) -> Instance: return PyomoTestInstanceRedundancy() @overrides def build_test_instance_knapsack(self) -> Instance: return PyomoTestInstanceKnapsack( weights=[23.0, 26.0, 20.0, 18.0], prices=[505.0, 352.0, 458.0, 220.0], capacity=67.0, ) @overrides def get_variables(self) -> Dict[str, Variable]: assert self.model is not None variables = {} for var in self.model.component_objects(pyomo.core.Var): for idx in var: varname = f"{var}[{idx}]" if idx is None: varname = str(var) variables[varname] = self._parse_pyomo_variable(var[idx]) return variables def _parse_pyomo_variable(self, var: pyomo.core.Var) -> Variable: # Variable type vtype: Optional[str] = None if var.domain == pyomo.core.Binary: vtype = "B" elif var.domain in [ pyomo.core.Reals, pyomo.core.NonNegativeReals, pyomo.core.NonPositiveReals, pyomo.core.NegativeReals, pyomo.core.PositiveReals, ]: vtype = "C" if vtype is None: raise Exception(f"unknown variable domain: {var.domain}") # Bounds lb, ub = var.bounds # Reduced costs rc = None if var in self.model.rc: rc = self.model.rc[var] # Objective coefficient obj_coeff = 0.0 if var.name in self._obj: obj_coeff = self._obj[var.name] return Variable( value=var.value, type=vtype, lower_bound=float(lb), upper_bound=float(ub), obj_coeff=obj_coeff, reduced_cost=rc, ) @overrides def get_constraints(self) -> Dict[str, Constraint]: assert self.model is not None constraints = {} for constr in self.model.component_objects(pyomo.core.Constraint): if isinstance(constr, pe.ConstraintList): for idx in constr: name = f"{constr.name}[{idx}]" assert name not in constraints constraints[name] = self._parse_pyomo_constraint(constr[idx]) else: name = constr.name assert name not in constraints constraints[name] = self._parse_pyomo_constraint(constr) return constraints def _parse_pyomo_constraint( self, pyomo_constr: pyomo.core.Constraint, ) -> Constraint: assert self.model is not None constr = Constraint() # Extract RHS and sense has_ub = pyomo_constr.has_ub() has_lb = pyomo_constr.has_lb() assert ( (not has_lb) or (not has_ub) or pyomo_constr.upper() == pyomo_constr.lower() ), "range constraints not supported" if not has_ub: constr.sense = ">" constr.rhs = pyomo_constr.lower() elif not has_lb: constr.sense = "<" constr.rhs = pyomo_constr.upper() else: constr.sense = "=" constr.rhs = pyomo_constr.upper() # Extract LHS constr.lhs = self._parse_pyomo_expr(pyomo_constr.body) # Extract solution attributes if self._has_lp_solution: constr.dual_value = self.model.dual[pyomo_constr] if self._has_mip_solution or self._has_lp_solution: constr.slack = self.model.slack[pyomo_constr] # Build constraint return constr def _parse_pyomo_expr(self, expr): lhs = {} if isinstance(expr, SumExpression): for term in expr._args_: if isinstance(term, MonomialTermExpression): lhs[term._args_[1].name] = float(term._args_[0]) elif isinstance(term, _GeneralVarData): lhs[term.name] = 1.0 else: raise Exception(f"Unknown term type: {term.__class__.__name__}") elif isinstance(expr, _GeneralVarData): lhs[expr.name] = 1.0 else: raise Exception(f"Unknown expression type: {expr.__class__.__name__}") return lhs @overrides def are_callbacks_supported(self) -> bool: return False @overrides def get_constraint_attrs(self) -> List[str]: return [ "dual_value", "lazy", "lhs", "rhs", "sense", "slack", ] @overrides def get_variable_attrs(self) -> List[str]: return [ # "basis_status", "lower_bound", "obj_coeff", "reduced_cost", # "sa_lb_down", # "sa_lb_up", # "sa_obj_down", # "sa_obj_up", # "sa_ub_down", # "sa_ub_up", "type", "upper_bound", "value", ] class PyomoTestInstanceInfeasible(Instance): @overrides def to_model(self) -> pe.ConcreteModel: model = pe.ConcreteModel() model.x = pe.Var([0], domain=pe.Binary) model.OBJ = pe.Objective(expr=model.x[0], sense=pe.maximize) model.eq = pe.Constraint(expr=model.x[0] >= 2) return model class PyomoTestInstanceRedundancy(Instance): @overrides def to_model(self) -> pe.ConcreteModel: model = pe.ConcreteModel() model.x = pe.Var([0, 1], domain=pe.Binary) model.OBJ = pe.Objective(expr=model.x[0] + model.x[1], sense=pe.maximize) model.eq1 = pe.Constraint(expr=model.x[0] + model.x[1] <= 1) model.eq2 = pe.Constraint(expr=model.x[0] + model.x[1] <= 2) return model class PyomoTestInstanceKnapsack(Instance): """ Simpler (one-dimensional) Knapsack Problem, used for testing. """ def __init__( self, weights: List[float], prices: List[float], capacity: float, ) -> None: super().__init__() self.weights = weights self.prices = prices self.capacity = capacity self.varname_to_item: Dict[VariableName, int] = { f"x[{i}]": i for i in range(len(self.weights)) } @overrides def to_model(self) -> pe.ConcreteModel: model = pe.ConcreteModel() items = range(len(self.weights)) model.x = pe.Var(items, domain=pe.Binary) model.z = pe.Var(domain=pe.Reals, bounds=(0, self.capacity)) model.OBJ = pe.Objective( expr=sum(model.x[v] * self.prices[v] for v in items), sense=pe.maximize, ) model.eq_capacity = pe.Constraint( expr=sum(model.x[v] * self.weights[v] for v in items) == model.z ) return model @overrides def get_instance_features(self) -> List[float]: return [ self.capacity, np.average(self.weights), ] @overrides def get_variable_features(self, var_name: VariableName) -> List[Category]: item = self.varname_to_item[var_name] return [ self.weights[item], self.prices[item], ] @overrides def get_variable_category(self, var_name: VariableName) -> Optional[Category]: if var_name.startswith("x"): return "default" return None