# MIPLearn: Extensible Framework for Learning-Enhanced Mixed-Integer Optimization # Copyright (C) 2020-2021, UChicago Argonne, LLC. All rights reserved. # Released under the modified BSD license. See COPYING.md for more details. import logging import re import sys from io import StringIO from typing import Any, List, Dict, Optional, Hashable import pyomo from overrides import overrides from pyomo import environ as pe from pyomo.core import Var from pyomo.core.base import _GeneralVarData from pyomo.core.base.constraint import SimpleConstraint from pyomo.core.expr.numeric_expr import SumExpression, MonomialTermExpression from pyomo.opt import TerminationCondition from pyomo.opt.base.solvers import SolverFactory from miplearn.instance.base import Instance from miplearn.solvers import _RedirectOutput from miplearn.solvers.internal import ( InternalSolver, LPSolveStats, IterationCallback, LazyCallback, MIPSolveStats, Constraint, ) from miplearn.types import ( SolverParams, UserCutCallback, Solution, VariableName, Category, ) import numpy as np logger = logging.getLogger(__name__) class BasePyomoSolver(InternalSolver): """ Base class for all Pyomo solvers. """ def __init__( self, solver_factory: SolverFactory, params: SolverParams, ) -> None: self.instance: Optional[Instance] = None self.model: Optional[pe.ConcreteModel] = None self.params = params self._all_vars: List[pe.Var] = [] self._bin_vars: List[pe.Var] = [] self._is_warm_start_available: bool = False self._pyomo_solver: SolverFactory = solver_factory self._obj_sense: str = "min" self._varname_to_var: Dict[str, pe.Var] = {} self._cname_to_constr: Dict[str, pe.Constraint] = {} self._termination_condition: str = "" for (key, value) in params.items(): self._pyomo_solver.options[key] = value @overrides def solve_lp( self, tee: bool = False, ) -> LPSolveStats: self.relax() streams: List[Any] = [StringIO()] if tee: streams += [sys.stdout] with _RedirectOutput(streams): results = self._pyomo_solver.solve(tee=True) self._restore_integrality() opt_value = None if not self.is_infeasible(): opt_value = results["Problem"][0]["Lower bound"] return { "LP value": opt_value, "LP log": streams[0].getvalue(), } def _restore_integrality(self) -> None: for var in self._bin_vars: var.domain = pyomo.core.base.set_types.Binary self._pyomo_solver.update_var(var) @overrides def solve( self, tee: bool = False, iteration_cb: Optional[IterationCallback] = None, lazy_cb: Optional[LazyCallback] = None, user_cut_cb: Optional[UserCutCallback] = None, ) -> MIPSolveStats: if lazy_cb is not None: raise Exception("lazy callback not currently supported") if user_cut_cb is not None: raise Exception("user cut callback not currently supported") total_wallclock_time = 0 streams: List[Any] = [StringIO()] if tee: streams += [sys.stdout] if iteration_cb is None: iteration_cb = lambda: False while True: logger.debug("Solving MIP...") with _RedirectOutput(streams): results = self._pyomo_solver.solve( tee=True, warmstart=self._is_warm_start_available, ) total_wallclock_time += results["Solver"][0]["Wallclock time"] should_repeat = iteration_cb() if not should_repeat: break log = streams[0].getvalue() node_count = self._extract_node_count(log) ws_value = self._extract_warm_start_value(log) self._termination_condition = results["Solver"][0]["Termination condition"] lb, ub = None, None if not self.is_infeasible(): lb = results["Problem"][0]["Lower bound"] ub = results["Problem"][0]["Upper bound"] stats: MIPSolveStats = { "Lower bound": lb, "Upper bound": ub, "Wallclock time": total_wallclock_time, "Sense": self._obj_sense, "MIP log": log, "Nodes": node_count, "Warm start value": ws_value, } return stats @overrides def get_solution(self) -> Optional[Solution]: assert self.model is not None if self.is_infeasible(): return None solution: Solution = {} for var in self.model.component_objects(Var): for index in var: if var[index].fixed: continue solution[f"{var}[{index}]"] = var[index].value return solution @overrides def get_variable_names(self) -> List[VariableName]: assert self.model is not None variables: List[VariableName] = [] for var in self.model.component_objects(Var): for index in var: if var[index].fixed: continue variables += [f"{var}[{index}]"] return variables @overrides def set_warm_start(self, solution: Solution) -> None: self._clear_warm_start() count_fixed = 0 for (var_name, value) in solution.items(): if value is None: continue var = self._varname_to_var[var_name] var.value = solution[var_name] count_fixed += 1 if count_fixed > 0: self._is_warm_start_available = True @overrides def set_instance( self, instance: Instance, model: Any = None, ) -> None: if model is None: model = instance.to_model() assert isinstance(model, pe.ConcreteModel) self.instance = instance self.model = model self._pyomo_solver.set_instance(model) self._update_obj() self._update_vars() self._update_constrs() def _clear_warm_start(self) -> None: for var in self._all_vars: if not var.fixed: var.value = None self._is_warm_start_available = False def _update_obj(self) -> None: self._obj_sense = "max" if self._pyomo_solver._objective.sense == pyomo.core.kernel.objective.minimize: self._obj_sense = "min" def _update_vars(self) -> None: assert self.model is not None self._all_vars = [] self._bin_vars = [] self._varname_to_var = {} for var in self.model.component_objects(Var): for idx in var: self._varname_to_var[f"{var.name}[{idx}]"] = var[idx] self._all_vars += [var[idx]] if var[idx].domain == pyomo.core.base.set_types.Binary: self._bin_vars += [var[idx]] def _update_constrs(self) -> None: assert self.model is not None self._cname_to_constr = {} for constr in self.model.component_objects(pyomo.core.Constraint): if isinstance(constr, pe.ConstraintList): for idx in constr: self._cname_to_constr[f"{constr.name}[{idx}]"] = constr[idx] else: self._cname_to_constr[constr.name] = constr @overrides def fix(self, solution: Solution) -> None: for (varname, value) in solution.items(): if value is None: continue var = self._varname_to_var[varname] var.fix(value) self._pyomo_solver.update_var(var) @overrides def add_constraint(self, constraint: Any, name: str = "") -> Any: self._pyomo_solver.add_constraint(constraint) self._update_constrs() @staticmethod def __extract( log: str, regexp: Optional[str], default: Optional[str] = None, ) -> Optional[str]: if regexp is None: return default value = default for line in log.splitlines(): matches = re.findall(regexp, line) if len(matches) == 0: continue value = matches[0] return value def _extract_warm_start_value(self, log: str) -> Optional[float]: value = self.__extract(log, self._get_warm_start_regexp()) if value is None: return None return float(value) def _extract_node_count(self, log: str) -> Optional[int]: value = self.__extract(log, self._get_node_count_regexp()) if value is None: return None return int(value) def _get_warm_start_regexp(self) -> Optional[str]: return None def _get_node_count_regexp(self) -> Optional[str]: return None @overrides def relax(self) -> None: for var in self._bin_vars: lb, ub = var.bounds var.setlb(lb) var.setub(ub) var.domain = pyomo.core.base.set_types.Reals self._pyomo_solver.update_var(var) @overrides def get_inequality_slacks(self) -> Dict[str, float]: result: Dict[str, float] = {} for (cname, cobj) in self._cname_to_constr.items(): if cobj.equality: continue result[cname] = cobj.slack() return result @overrides def extract_constraint(self, cid: str) -> Constraint: raise NotImplementedError() @overrides def is_constraint_satisfied(self, cobj: Constraint, tol: float = 1e-6) -> bool: raise NotImplementedError() @overrides def is_infeasible(self) -> bool: return self._termination_condition == TerminationCondition.infeasible @overrides def get_dual(self, cid: str) -> float: raise NotImplementedError() @overrides def get_sense(self) -> str: return self._obj_sense @overrides def build_test_instance_infeasible(self) -> Instance: return PyomoTestInstanceInfeasible() @overrides def build_test_instance_redundancy(self) -> Instance: return PyomoTestInstanceRedundancy() @overrides def build_test_instance_knapsack(self) -> Instance: return PyomoTestInstanceKnapsack( weights=[23.0, 26.0, 20.0, 18.0], prices=[505.0, 352.0, 458.0, 220.0], capacity=67.0, ) @overrides def get_constraints(self) -> Dict[str, Constraint]: assert self.model is not None def _get(c: pyomo.core.Constraint, name: str) -> Constraint: # Extract RHS and sense has_ub = c.has_ub() has_lb = c.has_lb() assert ( (not has_lb) or (not has_ub) or c.upper() == c.lower() ), "range constraints not supported" if has_lb: sense = ">" rhs = c.lower() elif has_ub: sense = "<" rhs = c.upper() else: sense = "=" rhs = c.upper() # Extract LHS lhs = {} if isinstance(c.body, SumExpression): for term in c.body._args_: if isinstance(term, MonomialTermExpression): lhs[term._args_[1].name] = term._args_[0] elif isinstance(term, _GeneralVarData): lhs[term.name] = 1.0 else: raise Exception(f"Unknown term type: {term.__class__.__name__}") elif isinstance(c.body, _GeneralVarData): lhs[c.body.name] = 1.0 else: raise Exception(f"Unknown expression type: {c.body.__class__.__name__}") # Build constraint return Constraint( lhs=lhs, rhs=rhs, sense=sense, ) constraints = {} for constr in self.model.component_objects(pyomo.core.Constraint): if isinstance(constr, pe.ConstraintList): for idx in constr: name = f"{constr.name}[{idx}]" assert name not in constraints constraints[name] = _get(constr[idx], name=name) else: name = constr.name assert name not in constraints constraints[name] = _get(constr, name=name) return constraints class PyomoTestInstanceInfeasible(Instance): @overrides def to_model(self) -> pe.ConcreteModel: model = pe.ConcreteModel() model.x = pe.Var([0], domain=pe.Binary) model.OBJ = pe.Objective(expr=model.x[0], sense=pe.maximize) model.eq = pe.Constraint(expr=model.x[0] >= 2) return model class PyomoTestInstanceRedundancy(Instance): def to_model(self) -> pe.ConcreteModel: model = pe.ConcreteModel() model.x = pe.Var([0, 1], domain=pe.Binary) model.OBJ = pe.Objective(expr=model.x[0] + model.x[1], sense=pe.maximize) model.eq1 = pe.Constraint(expr=model.x[0] + model.x[1] <= 1) model.eq2 = pe.Constraint(expr=model.x[0] + model.x[1] <= 2) return model class PyomoTestInstanceKnapsack(Instance): """ Simpler (one-dimensional) Knapsack Problem, used for testing. """ def __init__( self, weights: List[float], prices: List[float], capacity: float, ) -> None: super().__init__() self.weights = weights self.prices = prices self.capacity = capacity self.varname_to_item: Dict[VariableName, int] = { f"x[{i}]": i for i in range(len(self.weights)) } @overrides def to_model(self) -> pe.ConcreteModel: model = pe.ConcreteModel() items = range(len(self.weights)) model.x = pe.Var(items, domain=pe.Binary) model.OBJ = pe.Objective( expr=sum(model.x[v] * self.prices[v] for v in items), sense=pe.maximize, ) model.eq_capacity = pe.Constraint( expr=sum(model.x[v] * self.weights[v] for v in items) <= self.capacity ) return model @overrides def get_instance_features(self) -> List[float]: return [ self.capacity, np.average(self.weights), ] @overrides def get_variable_features(self, var_name: VariableName) -> List[Category]: item = self.varname_to_item[var_name] return [ self.weights[item], self.prices[item], ] @overrides def build_lazy_constraint(self, model: Any, violation: Hashable) -> Any: model.cut = pe.Constraint(expr=model.x[0] <= 0.0, name="cut") return model.cut