Disallow untyped calls and incomplete defs

master
Alinson S. Xavier 5 years ago
parent 7555f561f8
commit 947189f25f

@ -1,2 +1,5 @@
[mypy] [mypy]
ignore_missing_imports = True ignore_missing_imports = True
#disallow_untyped_defs = True
disallow_untyped_calls = True
disallow_incomplete_defs = True

@ -5,6 +5,7 @@
import gzip import gzip
import json import json
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any
import numpy as np import numpy as np
@ -20,7 +21,7 @@ class Instance(ABC):
""" """
@abstractmethod @abstractmethod
def to_model(self): def to_model(self) -> Any:
""" """
Returns a concrete Pyomo model corresponding to this instance. Returns a concrete Pyomo model corresponding to this instance.
""" """

@ -4,19 +4,20 @@
import logging import logging
import sys import sys
from typing import Any, List
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class RedirectOutput: class RedirectOutput:
def __init__(self, streams): def __init__(self, streams: List[Any]):
self.streams = streams self.streams = streams
def write(self, data): def write(self, data: Any) -> None:
for stream in self.streams: for stream in self.streams:
stream.write(data) stream.write(data)
def flush(self): def flush(self) -> None:
for stream in self.streams: for stream in self.streams:
stream.flush() stream.flush()

@ -6,7 +6,7 @@ import re
import sys import sys
from io import StringIO from io import StringIO
from random import randint from random import randint
from typing import List, Any, Dict, Union from typing import List, Any, Dict, Union, Tuple, Optional
from . import RedirectOutput from . import RedirectOutput
from .internal import ( from .internal import (
@ -73,13 +73,14 @@ class GurobiSolver(InternalSolver):
self.model.update() self.model.update()
self._update_vars() self._update_vars()
def _raise_if_callback(self): def _raise_if_callback(self) -> None:
if self.cb_where is not None: if self.cb_where is not None:
raise Exception("method cannot be called from a callback") raise Exception("method cannot be called from a callback")
def _update_vars(self): def _update_vars(self) -> None:
self._all_vars = {} self._all_vars = {}
self._bin_vars = {} self._bin_vars = {}
idx: Union[Tuple, List[int], int]
for var in self.model.getVars(): for var in self.model.getVars():
m = re.search(r"([^[]*)\[(.*)]", var.varName) m = re.search(r"([^[]*)\[(.*)]", var.varName)
if m is None: if m is None:
@ -100,7 +101,7 @@ class GurobiSolver(InternalSolver):
self._bin_vars[name] = {} self._bin_vars[name] = {}
self._bin_vars[name][idx] = var self._bin_vars[name][idx] = var
def _apply_params(self, streams): def _apply_params(self, streams: List[Any]) -> None:
with RedirectOutput(streams): with RedirectOutput(streams):
for (name, value) in self.params.items(): for (name, value) in self.params.items():
self.model.setParam(name, value) self.model.setParam(name, value)
@ -271,7 +272,7 @@ class GurobiSolver(InternalSolver):
else: else:
self.model.addConstr(constraint, name=name) self.model.addConstr(constraint, name=name)
def _clear_warm_start(self): def _clear_warm_start(self) -> None:
for (varname, vardict) in self._all_vars.items(): for (varname, vardict) in self._all_vars.items():
for (idx, var) in vardict.items(): for (idx, var) in vardict.items():
var.start = self.GRB.UNDEFINED var.start = self.GRB.UNDEFINED
@ -338,14 +339,18 @@ class GurobiSolver(InternalSolver):
self.model = self.model.relax() self.model = self.model.relax()
self._update_vars() self._update_vars()
def _extract_warm_start_value(self, log): def _extract_warm_start_value(self, log: str) -> Optional[float]:
ws = self.__extract(log, "MIP start with objective ([0-9.e+-]*)") ws = self.__extract(log, "MIP start with objective ([0-9.e+-]*)")
if ws is not None: if ws is None:
ws = float(ws) return None
return ws return float(ws)
@staticmethod @staticmethod
def __extract(log, regexp, default=None): def __extract(
log: str,
regexp: str,
default: Optional[str] = None,
) -> Optional[str]:
value = default value = default
for line in log.splitlines(): for line in log.splitlines():
matches = re.findall(regexp, line) matches = re.findall(regexp, line)

@ -192,7 +192,7 @@ class InternalSolver(ABC):
pass pass
@abstractmethod @abstractmethod
def add_constraint(self, cobj: Constraint): def add_constraint(self, cobj: Constraint) -> None:
""" """
Adds a single constraint to the model. Adds a single constraint to the model.
""" """
@ -209,7 +209,7 @@ class InternalSolver(ABC):
pass pass
@abstractmethod @abstractmethod
def is_constraint_satisfied(self, cobj: Constraint): def is_constraint_satisfied(self, cobj: Constraint) -> bool:
""" """
Returns True if the current solution satisfies the given constraint. Returns True if the current solution satisfies the given constraint.
""" """

@ -6,7 +6,7 @@ import logging
import re import re
import sys import sys
from io import StringIO from io import StringIO
from typing import Any, List, Dict from typing import Any, List, Dict, Optional
import pyomo import pyomo
from pyomo import environ as pe from pyomo import environ as pe
@ -169,18 +169,18 @@ class BasePyomoSolver(InternalSolver):
variables[str(var)] += [index] variables[str(var)] += [index]
return variables return variables
def _clear_warm_start(self): def _clear_warm_start(self) -> None:
for var in self._all_vars: for var in self._all_vars:
if not var.fixed: if not var.fixed:
var.value = None var.value = None
self._is_warm_start_available = False self._is_warm_start_available = False
def _update_obj(self): def _update_obj(self) -> None:
self._obj_sense = "max" self._obj_sense = "max"
if self._pyomo_solver._objective.sense == pyomo.core.kernel.objective.minimize: if self._pyomo_solver._objective.sense == pyomo.core.kernel.objective.minimize:
self._obj_sense = "min" self._obj_sense = "min"
def _update_vars(self): def _update_vars(self) -> None:
self._all_vars = [] self._all_vars = []
self._bin_vars = [] self._bin_vars = []
self._varname_to_var = {} self._varname_to_var = {}
@ -191,7 +191,7 @@ class BasePyomoSolver(InternalSolver):
if var[idx].domain == pyomo.core.base.set_types.Binary: if var[idx].domain == pyomo.core.base.set_types.Binary:
self._bin_vars += [var[idx]] self._bin_vars += [var[idx]]
def _update_constrs(self): def _update_constrs(self) -> None:
self._cname_to_constr = {} self._cname_to_constr = {}
for constr in self.model.component_objects(Constraint): for constr in self.model.component_objects(Constraint):
self._cname_to_constr[constr.name] = constr self._cname_to_constr[constr.name] = constr
@ -220,7 +220,11 @@ class BasePyomoSolver(InternalSolver):
self._update_constrs() self._update_constrs()
@staticmethod @staticmethod
def __extract(log, regexp, default=None): def __extract(
log: str,
regexp: Optional[str],
default: Optional[str] = None,
) -> Optional[str]:
if regexp is None: if regexp is None:
return default return default
value = default value = default
@ -231,22 +235,25 @@ class BasePyomoSolver(InternalSolver):
value = matches[0] value = matches[0]
return value return value
def _extract_warm_start_value(self, log): def _extract_warm_start_value(self, log: str) -> Optional[float]:
value = self.__extract(log, self._get_warm_start_regexp()) value = self.__extract(log, self._get_warm_start_regexp())
if value is not None: if value is None:
value = float(value) return None
return value return float(value)
def _extract_node_count(self, log): def _extract_node_count(self, log: str) -> Optional[int]:
return self.__extract(log, self._get_node_count_regexp()) value = self.__extract(log, self._get_node_count_regexp())
if value is None:
return None
return int(value)
def get_constraint_ids(self): def get_constraint_ids(self):
return list(self._cname_to_constr.keys()) return list(self._cname_to_constr.keys())
def _get_warm_start_regexp(self): def _get_warm_start_regexp(self) -> Optional[str]:
return None return None
def _get_node_count_regexp(self): def _get_node_count_regexp(self) -> Optional[str]:
return None return None
def extract_constraint(self, cid): def extract_constraint(self, cid):

Loading…
Cancel
Save