from datetime import datetime
import cantera as ct
import pandas as pd
import numpy as np
from scipy.optimize import minimize
# noinspection PyPep8Naming
import xml.etree.ElementTree as ET
import seaborn as sns
import matplotlib.pyplot as plt
import shutil
import copy
from inspect import signature
import os
import re
import pkg_resources
from .utils import set_size
sns.set()
sns.set(font_scale=1.6)
[docs]class REEPS:
r"""
Rare earth elements (REE or RE) Takes in experimental data
Returns parameters for GEM
.. note::
The order in which the REEs appear in the csv file must be the same
order as they appear in the xml, complex_names and
rare_earth_ion_names.
For example, say in exp_csv_filename's csv, RE_1 is Nd RE_2 is Pr,
and
.. code-block:: python
aq_solvent_name = 'H2O(L)'
extractant_name = '(HA)2(org)'
diluent_name = 'dodecane'
Then:
The csvs column ordering must be:
[h_i, h_eq, z_i, z_eq, Nd_aq_i, Nd_aq_eq, Nd_d_eq,
Pr_aq_i, Pr_aq_eq, Pr_d_eq]
The aqueous speciesArray must be
"H2O(L) H+ OH- Cl- Nd+++ Pr+++"
The organic speciesArray must be
"(HA)2(org) dodecane Nd(H(A)2)3(org) Pr(H(A)2)3(org)"
.. code-block:: python
complex_names = ['Nd(H(A)2)3(org)', 'Pr(H(A)2)3(org)']
rare_earth_ion_names = ['Nd+++', 'Pr+++']
:param exp_data: (str) csv file name with experimental data
In the .csv file, the rows are different experiments and
columns are the measured quantities.
The ordering of the columns needs to be:
[h_i, h_eq, z_i, z_eq,
{RE_1}_aq_i, {RE_1}_aq_eq, {RE_1}_d_eq,
{RE_2}_aq_i, {RE_2}_aq_eq, {RE_2}_d_eq,...
{RE_N}_aq_i, {RE_N}_aq_eq, {RE_N}_d_eq]
Naming does not matter, just the order.
Where {RE_1}-{RE_N} are the rare earth element names of interest
i.e. Nd, Pr, La, etc.
Below is an explanation of the columns.
+-------+------------+------------------------------------------+
| Index | Column | Meaning |
+=======+============+==========================================+
| 0 | h_i | Initial Concentration of |
| | | H+ ions (mol/L) |
+-------+------------+------------------------------------------+
| 1 | h_eq | Equilibrium concentration of |
| | | H+ ions (mol/L) |
+-------+------------+------------------------------------------+
| 2 | z_i | Initial concentration of |
| | | extractant (mol/L) |
+-------+------------+------------------------------------------+
| 3 | z_eq | Equilibrium concentration of |
| | | extractant (mol/L) |
+-------+------------+------------------------------------------+
| 4 | {RE}_aq_i | Initial concentration of RE ions (mol/L) |
+-------+------------+------------------------------------------+
| 5 | {RE}_aq_eq | Equilibrium concentration of RE ions |
| | | in aqueous phase (mol/L) |
+-------+------------+------------------------------------------+
| 6 | {RE}_d_eq | Equilibrium Ratio between amount of |
| | | RE atoms in organic to aqueous |
+-------+------------+------------------------------------------+
:param phases_xml_filename: (str) xml file with parameters
for equilibrium calc
Would recommend copying and modifying xmls located in data/xmls
or in Cantera's "data" folder
speciesArray fields need specific ordering.
In aqueous phase: aq_solvent_name, H+, OH-, Cl-, RE_1, RE_2, ..., RE_N
For aqueous phase, RE_1-RE_N represent RE ion names i.e. Nd+++, Pr+++
In organic phase : extractant_name, diluant_name, RE_1, RE_2, ..., RE_N
For organic phase, RE_1-RE_N represent RE complex names
i.e. Nd(H(A)2)3(org), Pr(H(A)2)3(org)
:param phase_names: (list) names of phases in xml file
Found in the xml file under <phase ... id={phase_name}>
:param aq_solvent_name: (str) name of aqueous solvent in xml file
:param extractant_name: (str) name of extractant in xml file
:param diluant_name: (str) name of diluant in xml file
:param complex_names: (list) names of complexes in xml file.
Ensure the ordering is correct
:param rare_earth_ion_names: (list) names of rare earth ions in xml file
Ensure the ordering is correct
:param re_species_list: (list) names of rare earth elements.
If ``None``, re_species_list will be rare_earth_ion_names without '+'
i.e. 'Nd+++'->'Nd'
Ensure the ordering is correct
:param aq_solvent_rho: (float) density of solvent (g/L)
If ``None``, molar volume/molecular weight is used from xml
:param extractant_rho: (float) density of extractant (g/L)
If ``None``, molar volume/molecular weight is used from xml
:param diluant_rho: (float) density of diluant (g/L)
If ``None``, molar volume/molecular weight is used from xml
:param opt_dict: (dict) dictionary containing info about which
species parameters are updated to fit model to experimental data
Should have the format as below. Dictionary keys under user defined
parameter name must be named as shown below ('upper_element_name',
'upper_attrib_name', etc.). 'attrib_name's and 'attrib_value's can
be None. {} denotes areas for user to fill in.
.. code-block:: python
opt_dict = {"{user_defined_name_for_parameter_1}":
{'upper_element_name': {param_upper_element},
'upper_attrib_name': {param_upper_attrib_name},
'upper_attrib_value': {param_upper_attrib_value},
'lower_element_name': {param_lower_element},
'lower_attrib_name': {param_lower_attrib_name},
'lower_attrib_value': {param_lower_attrib_value},
'input_format': {str format to input input_value}
'input_value': {guess_value}},
"{user_defined_name_for_parameter_2}":
...
...
}
:param objective_function: (function or str) function to compute objective
By default, the objective function is log mean squared error
of distribution ratio
.. code-block:: python
np.sum((np.log10(d_pred)-np.log10(d_meas))^2)
Function needs to take inputs:
.. code-block:: python
objective_function(predicted_dict, measured_df, kwargs)
``kwargs`` is optional
Function needs to return: (float) value computed by objective function
Below is the guide for referencing predicted values
+---------------------------+--------------------------------+
| To access | Use |
+===========================+================================+
| hydrogen ion conc in aq | predicted_dict['h_eq'] |
+---------------------------+--------------------------------+
| extractant conc in org | predicted_dict['z_eq'] |
+---------------------------+--------------------------------+
| RE ion eq conc in aq | predicted_dict['{RE}_aq_eq'] |
+---------------------------+--------------------------------+
| RE complex eq conc in org | predicted_dict['{RE}_org_eq'] |
+---------------------------+--------------------------------+
| RE distribution ratio | predicted_dict['{RE}_d_eq'] |
+---------------------------+--------------------------------+
Replace "{RE}" with rare earth element i.e. Nd, La, etc.
For measured values, use the same names, but
replace ``predicted_dict`` with ``measured_df``
:param optimizer: (function or str) function to perform optimization
.. note::
The optimized variables are not directly the species parameters,
but instead are first multiplied by the initial guess before
sending becoming the species parameters.
For example, say
.. code-block:: python
opt_dict = {'Nd(H(A)2)3(org):'h0':-4.7e6}
If the bounds on h0 need to be [-4.7e7,-4.7e5], then
divide the bounds by the guess and get
.. code-block:: python
"bounds": [(1e-1, 1e1)]
By default, the optimizer is scipy's optimize function with
.. code-block:: python
default_kwargs= {"method": 'SLSQP',
"bounds": [(1e-1, 1e1)] * len(x_guess),
"constraints": (),
"options": {'disp': True,
'maxiter': 1000,
'ftol': 1e-6}}
Function needs to take inputs:
``optimizer(objective_function, x_guess, kwargs)``
``kwargs`` is optional
Function needs to return: ((np.ndarray, float)) Optimized parameters,
objective_function value
:param temp_xml_file_path: (str) path to temporary xml file.
This xml file is a duplicate of the phases_xml_file name and is
modified during the optimization process to avoid changing the original
xml file
default is local temp folder
:param dependant_params_dict: (dict) dictionary containing information
about parameters dependant on opt_dict
"""
def __init__(self,
exp_data,
phases_xml_filename,
phase_names,
aq_solvent_name,
extractant_name,
diluant_name,
complex_names,
rare_earth_ion_names,
re_species_list=None,
aq_solvent_rho=None,
extractant_rho=None,
diluant_rho=None,
opt_dict=None,
objective_function='Log-MSE',
optimizer='SLSQP',
temp_xml_file_path=None,
dependant_params_dict=None,
):
self._built_in_obj_list = ['Log-MSE']
self._built_in_opt_list = ['SLSQP']
self._exp_data = exp_data
self._phases_xml_filename = phases_xml_filename
self._opt_dict = opt_dict
self._phase_names = phase_names
self._aq_solvent_name = aq_solvent_name
self._extractant_name = extractant_name
self._diluant_name = diluant_name
self._complex_names = complex_names
self._rare_earth_ion_names = rare_earth_ion_names
self._aq_solvent_rho = aq_solvent_rho
self._extractant_rho = extractant_rho
self._diluant_rho = diluant_rho
self._objective_function = None
self.set_objective_function(objective_function)
self._optimizer = None
self._re_species_list = re_species_list
self.set_optimizer(optimizer)
if temp_xml_file_path is None:
temp_xml_file_path = r'{0}/temp.xml'.format(os.getenv('TEMP'))
self._temp_xml_file_path = temp_xml_file_path
self._dependant_params_dict = dependant_params_dict
# Try and except for adding package data to path.
# This only works for sdist, not bdist
# If bdist is needed, research "manifest.in" python setup files
try:
shutil.copyfile(self._phases_xml_filename,
self._temp_xml_file_path)
self._phases = ct.import_phases(self._phases_xml_filename,
phase_names)
except FileNotFoundError:
self._phases_xml_filename = \
pkg_resources.resource_filename('reeps',
r'..\data\xmls\{0}'.format(
phases_xml_filename))
shutil.copyfile(self._phases_xml_filename,
self._temp_xml_file_path)
self._phases = ct.import_phases(self._phases_xml_filename,
phase_names)
if isinstance(self._exp_data, str):
try:
self._exp_df = pd.read_csv(self._exp_data)
except FileNotFoundError:
self._exp_data = pkg_resources.resource_filename(
'reeps', r'..\data\csvs\{0}'.format(self._exp_data))
self._exp_df = pd.read_csv(self._exp_data)
else:
self._exp_df = self._exp_data.copy()
self._exp_df_columns = ['h_i', 'h_eq', 'z_i', 'z_eq']
if self._re_species_list is None:
self._re_species_list = []
for name in self._rare_earth_ion_names:
species = name.replace('+', '')
self._re_species_list.append(species)
for species in self._re_species_list:
self._exp_df_columns.append('{0}_aq_i'.format(species))
self._exp_df_columns.append('{0}_aq_eq'.format(species))
self._exp_df_columns.append('{0}_d_eq'.format(species))
self._exp_df.columns = self._exp_df_columns
for species in self._re_species_list:
self._exp_df['{0}_org_eq'.format(species)] = \
self._exp_df['{0}_aq_eq'.format(species)] \
* self._exp_df['{0}_d_eq'.format(species)]
self._in_moles = None
self._aq_ind = None
self._org_ind = None
self._re_charges = None
self.set_in_moles(feed_vol=1)
self._predicted_dict = None
self.update_predicted_dict()
[docs] @staticmethod
def scipy_minimize(objective, x_guess, optimizer_kwargs=None):
""" The default optimizer for REEPS
Uses scipy.minimize
By default, options are
.. code-block:: python
default_kwargs= {"method": 'SLSQP',
"bounds": [(1e-1, 1e1)]*len(x_guess),
"constraints": (),
"options": {'disp': True,
'maxiter': 1000,
'ftol': 1e-6}}
:param objective: (func) the objective function
:param x_guess: (np.ndarray) the initial guess (always 1)
:param optimizer_kwargs: (dict) dictionary of options for minimize
:returns: ((np.ndarray, float)) Optimized parameters,
objective_function value
"""
if optimizer_kwargs is None:
optimizer_kwargs = {"method": 'SLSQP',
"bounds": [(1e-1, 1e1)] * len(x_guess),
"constraints": (),
"options": {'disp': True,
'maxiter': 1000,
'ftol': 1e-6}}
res = minimize(objective, x_guess, **optimizer_kwargs)
est_parameters = res.x
return est_parameters, res.fun
[docs] def log_mean_squared_error(self, predicted_dict, meas_df):
"""Default objective function for REEPS
Returns the log mean squared error of
predicted distribution ratios (d=n_org/n_aq)
to measured d.
np.sum((np.log10(d_pred)-np.log10(d_meas))\**2)
:param predicted_dict: (dict) contains predicted data
:param meas_df: (pd.DataFrame) contains experimental data
:return: (float) log mean squared error between predicted and measured
"""
meas = np.concatenate([meas_df['{0}_d_eq'.format(species)].values
for species in self._re_species_list])
pred = np.concatenate([
predicted_dict['{0}_d_eq'.format(species)]
for species in self._re_species_list])
log_pred = np.log10(pred)
log_meas = np.log10(meas)
log_diff = (log_pred - log_meas) ** 2
obj = np.sum(log_diff)
return obj
[docs] def get_exp_df(self) -> pd.DataFrame:
"""Returns the experimental DataFrame
:return: (pd.DataFrame) Experimental data
"""
return self._exp_df
[docs] def set_exp_df(self, exp_data):
"""Changes the experimental DataFrame to input exp_csv_filename data
and renames columns to internal REEPS names
h_i, h_eq, z_i, z_eq, {RE}_aq_i, {RE}_aq_eq, {RE}_d
See class docstring on "exp_csv_filename" for further explanations.
:param exp_data: (str or pd.DataFrame)
file name/path or DataFrame for experimental data csv
"""
self._exp_data = exp_data
if isinstance(self._exp_data, str):
try:
self._exp_df = pd.read_csv(self._exp_data)
except FileNotFoundError:
self._exp_data = pkg_resources.resource_filename(
'reeps', r'..\data\csvs\{0}'.format(self._exp_data))
self._exp_df = pd.read_csv(self._exp_data)
else:
self._exp_df = exp_data.copy()
self._exp_df_columns = ['h_i', 'h_eq', 'z_i', 'z_eq']
if self._re_species_list is None:
self._re_species_list = []
for name in self._rare_earth_ion_names:
species = name.replace('+', '')
self._re_species_list.append(species)
for species in self._re_species_list:
self._exp_df_columns.append('{0}_aq_i'.format(species))
self._exp_df_columns.append('{0}_aq_eq'.format(species))
self._exp_df_columns.append('{0}_d_eq'.format(species))
self._exp_df.columns = self._exp_df_columns
for species in self._re_species_list:
self._exp_df['{0}_org_eq'.format(species)] = \
self._exp_df['{0}_aq_eq'.format(species)] \
* self._exp_df['{0}_d_eq'.format(species)]
self.set_in_moles(feed_vol=1)
self.update_predicted_dict()
return None
[docs] def get_phases(self) -> list:
"""
Returns the list of Cantera solutions
:return: (list) list of Cantera solutions/phases
"""
return self._phases
[docs] def set_phases(self, phases_xml_filename, phase_names):
"""Change list of Cantera solutions by inputting
new xml file name and phase names
Also runs set_in_moles to set feed volume to 1 L
:param phases_xml_filename: (str) xml file with parameters
for equilibrium calc
:param phase_names: (list) names of phases in xml file
"""
self._phases_xml_filename = phases_xml_filename
self._phase_names = phase_names
# Try and except for adding package data to path.
# This only works for sdist, not bdist
# If bdist is needed, research "manifest.in" python setup files
try:
shutil.copyfile(self._phases_xml_filename,
self._temp_xml_file_path)
self._phases = ct.import_phases(self._phases_xml_filename,
phase_names)
except FileNotFoundError:
self._phases_xml_filename = \
pkg_resources.resource_filename('reeps',
r'..\data\xmls\{0}'.format(
phases_xml_filename))
shutil.copyfile(self._phases_xml_filename,
self._temp_xml_file_path)
self._phases = ct.import_phases(self._phases_xml_filename,
phase_names)
self.set_in_moles(feed_vol=1)
self.update_predicted_dict()
return None
[docs] def get_opt_dict(self) -> dict:
"""
Returns the dictionary containing optimization information
:return: (dict) dictionary containing info about which
species parameters are updated to fit model to experimental data
"""
return self._opt_dict
[docs] def set_opt_dict(self, opt_dict):
"""
Change the dictionary to input opt_dict.
opt_dict specifies species parameters to be updated to
fit model to data
See class docstring on "opt_dict" for more information.
:param opt_dict: (dict) dictionary containing info about which
species parameters are updated to fit model to experimental data
"""
self._opt_dict = opt_dict
return None
[docs] def get_aq_solvent_name(self) -> str:
"""Returns aq_solvent_name
:return: aq_solvent_name: (str) name of aqueous solvent in xml file
"""
return self._aq_solvent_name
[docs] def set_aq_solvent_name(self, aq_solvent_name):
""" Change aq_solvent_name to input aq_solvent_name
:param aq_solvent_name: (str) name of aqueous solvent in xml file
"""
self._aq_solvent_name = aq_solvent_name
return None
[docs] def get_diluant_name(self) -> str:
""" Returns diluant name
:return: diluant_name: (str) name of diluant in xml file
"""
return self._diluant_name
[docs] def set_diluant_name(self, diluant_name):
"""
Change diluant_name to input diluant_name
:param diluant_name: (str) name of diluant in xml file
"""
self._diluant_name = diluant_name
return None
[docs] def get_complex_names(self) -> list:
"""Returns list of complex names
:return: complex_names: (list) names of complexes in xml file.
"""
return self._complex_names
[docs] def set_complex_names(self, complex_names):
"""Change complex names list to input complex_names
:param complex_names: (list) names of complexes in xml file.
"""
self._complex_names = complex_names
return None
[docs] def get_rare_earth_ion_names(self) -> list:
"""Returns list of rare earth ion names
:return: rare_earth_ion_names: (list) names of rare earth ions in
xml file
"""
return self._rare_earth_ion_names
[docs] def set_rare_earth_ion_names(self, rare_earth_ion_names):
"""Change list of rare earth ion names to input
rare_earth_ion_names
:param rare_earth_ion_names: (list) names of rare earth ions in
xml file
"""
self._rare_earth_ion_names = rare_earth_ion_names
return None
[docs] def get_re_species_list(self) -> list:
"""Returns list of rare earth element names
:return: re_species_list: (list) names of rare earth elements in
xml file
"""
return self._re_species_list
[docs] def set_re_species_list(self, re_species_list):
"""Change list of rare earth ion names to input
rare_earth_ion_names
:param re_species_list: (list) names of rare earth elements in
xml file
"""
self._re_species_list = re_species_list
return None
[docs] def get_aq_solvent_rho(self) -> str:
"""Returns aqueous solvent density (g/L)
:return: aq_solvent_rho: (float) density of aqueous solvent
"""
return self._aq_solvent_rho
[docs] def set_aq_solvent_rho(self, aq_solvent_rho):
"""Changes aqueous solvent density (g/L) to input aq_solvent_rho
:param aq_solvent_rho: (float) density of aqueous solvent
"""
self._aq_solvent_rho = aq_solvent_rho
return None
[docs] def get_diluant_rho(self) -> str:
"""Returns diluant density (g/L)
:return: diluant_rho: (float) density of diluant
"""
return self._diluant_rho
[docs] def set_diluant_rho(self, diluant_rho):
"""Changes diluant density (g/L) to input diluant_rho
:param diluant_rho: (float) density of diluant
"""
self._diluant_rho = diluant_rho
return None
[docs] def set_in_moles(self, feed_vol):
"""Function that initializes mole fractions to input feed_vol
This function is called at initialization
Sets in_moles to a pd.DataFrame containing initial mole fractions
Columns for species and rows for different experiments
This function also calls update_predicted_dict
:param feed_vol: (float) feed volume of mixture (L)
"""
phases_copy = self._phases.copy()
exp_df = self._exp_df.copy()
solvent_name = self._aq_solvent_name
extractant_name = self._extractant_name
diluant_name = self._diluant_name
solvent_rho = self._aq_solvent_rho
extractant_rho = self._extractant_rho
diluant_rho = self._diluant_rho
re_names = self._rare_earth_ion_names
re_species_list = self._re_species_list
mixed = ct.Mixture(phases_copy)
aq_ind = None
solvent_ind = None
for ind, phase in enumerate(phases_copy):
if solvent_name in phase.species_names:
aq_ind = ind
solvent_ind = phase.species_names.index(solvent_name)
if aq_ind is None:
raise Exception('Solvent "{0}" not found \
in xml file'.format(solvent_name))
if aq_ind == 0:
org_ind = 1
else:
org_ind = 0
self._aq_ind = aq_ind
self._org_ind = org_ind
extractant_ind = phases_copy[org_ind].species_names.index(
extractant_name)
diluant_ind = phases_copy[org_ind].species_names.index(diluant_name)
re_ind_list = [phases_copy[aq_ind].species_names.index(re_name)
for re_name in re_names]
re_charges = np.array([phases_copy[aq_ind].species(re_ind).charge
for re_ind in re_ind_list])
self._re_charges = re_charges
mix_aq = mixed.phase(aq_ind)
mix_org = mixed.phase(org_ind)
solvent_mw = mix_aq.molecular_weights[solvent_ind] # g/mol
extractant_mw = mix_org.molecular_weights[extractant_ind]
diluant_mw = mix_org.molecular_weights[diluant_ind]
if solvent_rho is None:
solvent_rho = mix_aq(aq_ind).partial_molar_volumes[
solvent_ind] / solvent_mw * 1e6 # g/L
self._aq_solvent_rho = solvent_rho
if extractant_rho is None:
extractant_rho = mix_org(org_ind).partial_molar_volumes[
extractant_ind] / extractant_mw * 1e6
self._extractant_rho = extractant_rho
if diluant_rho is None:
diluant_rho = mix_org(org_ind).partial_molar_volumes[
extractant_ind] / extractant_mw * 1e6
self._diluant_rho = diluant_rho
in_moles_data = []
aq_phase_solvent_moles = feed_vol * solvent_rho / solvent_mw
for index, row in exp_df.iterrows():
h_plus_moles = feed_vol * row['h_i']
hydroxide_ions = 0
rare_earth_moles = np.array([feed_vol * row[
'{0}_aq_i'.format(re_species)]
for re_species in re_species_list])
re_charge_sum = np.sum(re_charges * rare_earth_moles)
chlorine_moles = re_charge_sum + h_plus_moles
extractant_moles = feed_vol * row['z_i']
extractant_vol = extractant_moles * extractant_mw / extractant_rho
diluant_vol = feed_vol - extractant_vol
diluant_moles = diluant_vol * diluant_rho / diluant_mw
complex_moles = np.zeros(len(re_species_list))
species_moles_aq = [aq_phase_solvent_moles,
h_plus_moles,
hydroxide_ions,
chlorine_moles]
species_moles_aq.extend(list(rare_earth_moles))
species_moles_org = [extractant_moles, diluant_moles]
species_moles_org.extend(list(complex_moles))
if aq_ind == 0:
species_moles = species_moles_aq + species_moles_org
else:
species_moles = species_moles_org + species_moles_aq
in_moles_data.append(species_moles)
self._in_moles = pd.DataFrame(
in_moles_data, columns=mixed.species_names)
self.update_predicted_dict()
return None
[docs] def get_in_moles(self) -> pd.DataFrame:
"""Returns the in_moles DataFrame which contains the initial mole
fractions of each species for each experiment
:return: in_moles: (pd.DataFrame) DataFrame with initial mole fractions
"""
return self._in_moles
[docs] def set_objective_function(self, objective_function):
"""Change objective function to input objective_function.
See class docstring on "objective_function" for instructions
:param objective_function: (func) Objective function to quantify
error between model and experimental data
"""
if not callable(objective_function) \
and objective_function not in self._built_in_obj_list:
raise Exception(
"objective_function must be a function "
"or in this strings list: {0}".format(
self._built_in_obj_list))
if callable(objective_function):
if len(signature(objective_function).parameters) < 2:
raise Exception(
"objective_function must be a function "
"with at least 3 arguments:"
" f(predicted_dict, experimental_df, kwargs)")
if objective_function == 'Log-MSE':
objective_function = self.log_mean_squared_error
self._objective_function = objective_function
return None
[docs] def get_objective_function(self):
"""Returns objective function
:return: objective_function: (func) Objective function to quantify
error between model and experimental data
"""
return self._objective_function
[docs] def set_optimizer(self, optimizer):
"""Change optimizer function to input optimizer.
See class docstring on "optimizer" for instructions
:param optimizer: (func) Optimizer function to minimize objective
function
"""
if not callable(optimizer) \
and optimizer not in self._built_in_opt_list:
raise Exception(
"optimizer must be a function "
"or in this strings list: {0}".format(
self._built_in_opt_list))
if callable(optimizer):
if len(signature(optimizer).parameters) < 2:
raise Exception(
"optimizer must be a function "
"with at least 2 arguments: "
"f(objective_func,x_guess, kwargs)")
if optimizer == 'SLSQP':
optimizer = self.scipy_minimize
self._optimizer = optimizer
return None
[docs] def get_optimizer(self):
"""Returns objective function
:return: optimizer: (func) Optimizer function to minimize objective
function
"""
return self._optimizer
[docs] def get_temp_xml_file_path(self):
"""Returns path to temporary xml file.
This xml file is a duplicate of the phases_xml_file name and is
modified during the optimization process to avoid changing the original
xml file.
:return: temp_xml_file_path: (str) path to temporary xml file.
"""
return self._temp_xml_file_path
[docs] def set_temp_xml_file_path(self, temp_xml_file_path):
"""Changes temporary xml file path to input temp_xml_file_path.
This xml file is a duplicate of the phases_xml_file name and is
modified during the optimization process to avoid changing the original
xml file.
:param temp_xml_file_path: (str) path to temporary xml file.
"""
self._temp_xml_file_path = temp_xml_file_path
return None
[docs] def get_dependant_params_dict(self):
"""
Returns the dependant_params_dict
:return: dependant_params_dict: (dict) dictionary containing
information about parameters dependant on opt_dict
"""
return self._dependant_params_dict
[docs] def set_dependant_params_dict(self, dependant_params_dict):
"""
Sets the dependant_params_dict
:param dependant_params_dict: (dict) dictionary containing information
about parameters dependant on opt_dict
"""
self._dependant_params_dict = dependant_params_dict
return None
[docs] def update_predicted_dict(self,
phases_xml_filename=None,
phase_names=None):
"""Function that computes the predicted equilibrium concentrations
the fed phases_xml_filename parameters predicts given the initial
mole fractions set by in_moles()
:param phases_xml_filename: (str)xml file with parameters
for equilibrium calc. If ``None``, the
current phases_xml_filename is used.
:param phase_names: (list) names of phases in xml file.
If ``None``, the current phases_names is used.
"""
if phases_xml_filename is None:
phases_xml_filename = self._phases_xml_filename
if phase_names is None:
phase_names = self._phase_names
aq_ind = self._aq_ind
org_ind = self._org_ind
complex_names = self._complex_names
extractant_name = self._extractant_name
rare_earth_ion_names = self._rare_earth_ion_names
in_moles = self._in_moles
re_species_list = self._re_species_list
phases_copy = ct.import_phases(phases_xml_filename, phase_names)
mix = ct.Mixture(phases_copy)
key_names = ['h_eq', 'z_eq']
for re_species in re_species_list:
key_names.append('{0}_aq_eq'.format(re_species))
key_names.append('{0}_org_eq'.format(re_species))
key_names.append('{0}_d_eq'.format(re_species))
predicted_dict = {'{0}'.format(key_name): []
for key_name in key_names}
for row in in_moles.values:
mix.species_moles = row
mix.equilibrate('TP', log_level=0)
re_org_array = np.array([mix.species_moles[mix.species_index(
org_ind, complex_name)] for complex_name in complex_names])
re_aq_array = np.array([mix.species_moles[mix.species_index(
aq_ind, re_ion_name)] for re_ion_name in rare_earth_ion_names])
d_array = re_org_array / re_aq_array
hydrogen_ions = mix.species_moles[mix.species_index(aq_ind, 'H+')]
extractant = mix.species_moles[mix.species_index(
org_ind, extractant_name)]
for index, re_species in enumerate(re_species_list):
predicted_dict['{0}_aq_eq'.format(
re_species)].append(re_aq_array[index])
predicted_dict['{0}_org_eq'.format(
re_species)].append(re_org_array[index])
predicted_dict['{0}_d_eq'.format(
re_species)].append(d_array[index])
predicted_dict['h_eq'].append(hydrogen_ions)
predicted_dict['z_eq'].append(extractant)
for key, value in predicted_dict.items():
predicted_dict[key] = np.array(value)
self._predicted_dict = predicted_dict
return None
[docs] def get_predicted_dict(self):
"""Returns predicted dictionary of species concentrations
that xml parameters predicts given current in_moles
:return: predicted_dict: (dict) dictionary of species concentrations
"""
return self._predicted_dict
def _internal_objective(self, x, kwargs=None):
"""
Internal objective function. Uses objective function to compute value
If the optimizer requires vectorized variables ie pso, this function
takes care of it
:param x: (list) thermo properties varied to minimize objective func
:param kwargs: (list) arguments for objective_function
"""
temp_xml_file_path = self._temp_xml_file_path
exp_df = self._exp_df
objective_function = self._objective_function
opt_dict = copy.deepcopy(self._opt_dict)
dep_dict = copy.deepcopy(self._dependant_params_dict)
x = np.array(x)
if len(x.shape) == 1:
xs = np.array([x])
vectorized_x = False
else:
vectorized_x = True
xs = x
objective_values = []
for x in xs:
i = 0
for species_name in opt_dict.keys():
for thermo_prop in opt_dict[species_name].keys():
if not np.isnan(
x[i]): # if nan, do not update xml with nan
opt_dict[species_name][thermo_prop] *= x[i]
i += 1
self.update_xml(opt_dict,
temp_xml_file_path,
dependant_params_dict=dep_dict)
self.update_predicted_dict(temp_xml_file_path)
predicted_dict = self.get_predicted_dict()
self.update_predicted_dict()
if kwargs is None:
# noinspection PyCallingNonCallable
obj = objective_function(predicted_dict, exp_df)
else:
# noinspection PyCallingNonCallable
obj = objective_function(predicted_dict, exp_df, **kwargs)
objective_values.append(obj)
if vectorized_x:
objective_values = np.array(objective_values)
else:
objective_values = objective_values[0]
return objective_values
[docs] def fit(self,
objective_function=None,
optimizer=None,
objective_kwargs=None,
optimizer_kwargs=None) -> tuple:
"""Fits experimental to modeled data by minimizing objective function
with optimizer. Returns dictionary with opt_dict structure
:param objective_function: (function) function to compute objective
If 'None', last set objective or default function is used
:param optimizer: (function) function to perform optimization
If 'None', last set optimizer or default is used
:param optimizer_kwargs: (dict) optional arguments for optimizer
:param objective_kwargs: (dict) optional arguments
for objective function
:returns tuple: (opt_dict (dict), opt_value (float))
optimized opt_dict: Has identical structure as opt_dict
"""
if objective_function is not None:
self.set_objective_function(objective_function)
if optimizer is not None:
self.set_optimizer(optimizer)
def objective(x):
return self._internal_objective(x, objective_kwargs)
optimizer = self._optimizer
opt_dict = copy.deepcopy(self._opt_dict)
i = 0
for species_name in opt_dict.keys():
for _ in opt_dict[species_name].keys():
i += 1
x_guess = np.ones(i)
if optimizer_kwargs is None:
# noinspection PyCallingNonCallable
est_parameters, obj_value = optimizer(objective, x_guess)
else:
# noinspection PyCallingNonCallable
est_parameters, obj_value = optimizer(objective,
x_guess,
optimizer_kwargs)
i = 0
for species_name in opt_dict.keys():
for thermo_prop in opt_dict[species_name].keys():
opt_dict[species_name][thermo_prop] *= est_parameters[i]
i += 1
return opt_dict, obj_value
[docs] def update_xml(self,
info_dict,
phases_xml_filename=None,
dependant_params_dict=None):
"""updates xml file with info_dict
:param info_dict: (dict) info in {species_names:{thermo_prop:val}}
Requires an identical structure to opt_dict
:param phases_xml_filename: (str) xml filename if editing other xml
If ``None``, the current xml will be modified and the internal
Cantera phases will be refreshed to the new values.
:param dependant_params_dict: (dict) dictionary containing information
about parameters dependant on info_dict
"""
if phases_xml_filename is None:
phases_xml_filename = self._phases_xml_filename
new_dict = copy.deepcopy(info_dict)
dep_dict = dependant_params_dict
if dep_dict is not None:
for species_name in dep_dict.keys():
for thermo_prop in dep_dict[species_name]:
mod_func = \
dep_dict[species_name][thermo_prop]['function']
mod_kwargs = \
dep_dict[species_name][thermo_prop]['kwargs']
ind_vars = \
dep_dict[species_name][thermo_prop]['ind_vars']
ind_vals = [new_dict[ind_var[0]][ind_var[1]]
for ind_var in ind_vars]
new_dict[species_name] = {}
new_dict[species_name][thermo_prop] = {}
new_dict[species_name][thermo_prop] = \
mod_func(ind_vals, **mod_kwargs)
# print(mod_func(ind_vals, **mod_kwargs))
# print(new_dict)
tree = ET.parse(phases_xml_filename)
root = tree.getroot()
# Update xml file
for species_name in new_dict.keys():
for thermo_prop in new_dict[species_name].keys():
for species in root.iter('species'):
if species.attrib['name'] == species_name:
for changed_prop in species.iter(thermo_prop):
changed_prop.text = str(
new_dict[species_name][thermo_prop])
now = datetime.now()
changed_prop.set('updated',
'Updated at {0}:{1} {2}-{3}-{4}'
.format(now.hour, now.minute,
now.month, now.day,
now.year))
tree.write(phases_xml_filename)
if phases_xml_filename == self._phases_xml_filename:
self.set_phases(self._phases_xml_filename, self._phase_names)
return None
def _internal_objective_ver2(self, x, kwargs=None):
"""
ver2 generalizes to handle accessing parameters. ver1 assumes species
parameter is modified. ver2 assumes parameter is accessed by going
through two levels: upper and lower
Internal objective function. Uses objective function to compute value
If the optimizer requires vectorized variables ie pso, this function
takes care of it
:param x: (list) thermo properties varied to minimize objective func
:param kwargs: (list) arguments for objective_function
"""
temp_xml_file_path = self._temp_xml_file_path
exp_df = self._exp_df
objective_function = self._objective_function
opt_dict = copy.deepcopy(self._opt_dict)
dep_dict = copy.deepcopy(self._dependant_params_dict)
x = np.array(x)
if len(x.shape) == 1:
xs = np.array([x])
vectorized_x = False
else:
vectorized_x = True
xs = x
objective_values = []
for x in xs:
for ind, param_name in enumerate(opt_dict.keys()):
if not np.isnan(
x[ind]): # if nan, do not update xml with nan
opt_dict[param_name]['input_value'] *= x[ind]
self.update_xml_ver2(opt_dict,
temp_xml_file_path,
dependant_params_dict=dep_dict)
self.update_predicted_dict(temp_xml_file_path)
predicted_dict = self.get_predicted_dict()
self.update_predicted_dict()
if kwargs is None:
# noinspection PyCallingNonCallable
obj = objective_function(predicted_dict, exp_df)
else:
# noinspection PyCallingNonCallable
obj = objective_function(predicted_dict, exp_df, **kwargs)
objective_values.append(obj)
if vectorized_x:
objective_values = np.array(objective_values)
else:
objective_values = objective_values[0]
return objective_values
[docs] def fit_ver2(self,
objective_function=None,
optimizer=None,
objective_kwargs=None,
optimizer_kwargs=None) -> tuple:
"""Fits experimental to modeled data by minimizing objective function
with optimizer. Returns dictionary with opt_dict structure
:param objective_function: (function) function to compute objective
If 'None', last set objective or default function is used
:param optimizer: (function) function to perform optimization
If 'None', last set optimizer or default is used
:param optimizer_kwargs: (dict) optional arguments for optimizer
:param objective_kwargs: (dict) optional arguments
for objective function
:returns tuple: (opt_dict (dict), opt_value (float))
optimized opt_dict: Has identical structure as opt_dict
"""
if objective_function is not None:
self.set_objective_function(objective_function)
if optimizer is not None:
self.set_optimizer(optimizer)
def objective(x):
return self._internal_objective_ver2(x, objective_kwargs)
optimizer = self._optimizer
opt_dict = copy.deepcopy(self._opt_dict)
x_guess = np.ones(len(list(opt_dict.keys())))
if optimizer_kwargs is None:
# noinspection PyCallingNonCallable
est_parameters, obj_value = optimizer(objective, x_guess)
else:
# noinspection PyCallingNonCallable
est_parameters, obj_value = optimizer(objective,
x_guess,
optimizer_kwargs)
for ind, param_name in enumerate(opt_dict.keys()):
opt_dict[param_name]['input_value'] *= est_parameters[ind]
return opt_dict, obj_value
[docs] def update_xml_ver2(self,
info_dict,
phases_xml_filename=None,
dependant_params_dict=None):
"""updates xml file with info_dict
:param info_dict: (dict) info in {species_names:{thermo_prop:val}}
Requires an identical structure to opt_dict
:param phases_xml_filename: (str) xml filename if editing other xml
If ``None``, the current xml will be modified and the internal
Cantera phases will be refreshed to the new values.
:param dependant_params_dict: (dict) dictionary containing information
about parameters dependant on info_dict
"""
if phases_xml_filename is None:
phases_xml_filename = self._phases_xml_filename
new_dict = copy.deepcopy(info_dict)
dep_dict = dependant_params_dict
if dep_dict is not None:
new_dict.update(dep_dict)
for param_name in dep_dict.keys():
mod_func = \
dep_dict[param_name]['function']
mod_kwargs = \
dep_dict[param_name]['kwargs']
if isinstance(dep_dict[param_name]['independent_params'], str):
ind_param_names = [dep_dict[
param_name]['independent_params']]
else:
ind_param_names = \
dep_dict[param_name]['independent_params']
ind_vals = [new_dict[ind_param_name]['input_value']
for ind_param_name in ind_param_names]
if mod_kwargs is None:
new_dict[param_name]['input_value'] = mod_func(ind_vals)
else:
new_dict[param_name]['input_value'] = \
mod_func(ind_vals,
**mod_kwargs)
tree = ET.parse(phases_xml_filename)
root = tree.getroot()
# Update xml file
for key in list(new_dict.keys()):
d = new_dict[key]
now = datetime.now()
if (d['upper_attrib_name'] is not None
and d['lower_attrib_name'] is not None):
for child1 in root.iter(d['upper_element_name']):
if (child1.attrib[d['upper_attrib_name']]
== d['upper_attrib_value']):
for child2 in child1.iter(d['lower_element_name']):
if (child1.attrib[d['lower_attrib_name']]
== d['lower_attrib_value']):
child2.text = d['input_format'].format(
d['input_value'])
child2.set('updated',
'Updated at {0}:{1} {2}-{3}-{4}'
.format(now.hour, now.minute,
now.month, now.day,
now.year))
elif (d['upper_attrib_name'] is None
and d['lower_attrib_name'] is not None):
for child1 in root.iter(d['upper_element_name']):
for child2 in child1.iter(d['lower_element_name']):
if (child1.attrib[d['lower_attrib_name']]
== d['lower_attrib_value']):
child2.text = d['input_format'].format(
d['input_value'])
child2.set('updated',
'Updated at {0}:{1} {2}-{3}-{4}'
.format(now.hour, now.minute,
now.month, now.day,
now.year))
elif (d['upper_attrib_name'] is not None
and d['lower_attrib_name'] is None):
for child1 in root.iter(d['upper_element_name']):
if (child1.attrib[d['upper_attrib_name']]
== d['upper_attrib_value']):
for child2 in child1.iter(d['lower_element_name']):
child2.text = d['input_format'].format(
d['input_value'])
child2.set('updated',
'Updated at {0}:{1} {2}-{3}-{4}'
.format(now.hour, now.minute,
now.month, now.day,
now.year))
else:
for child1 in root.iter(d['upper_element_name']):
for child2 in child1.iter(d['lower_element_name']):
child2.text = d['input_format'].format(
d['input_value'])
child2.set('updated', 'Updated at {0}:{1} {2}-{3}-{4}'
.format(now.hour, now.minute,
now.month, now.day,
now.year))
tree.write(phases_xml_filename)
if phases_xml_filename == self._phases_xml_filename:
self.set_phases(self._phases_xml_filename, self._phase_names)
return None
[docs] def parity_plot(self,
compared_value=None,
c_data=None,
c_label=None,
plot_title=None,
save_path=None,
print_r_squared=False,
data_labels=None,
legend=True):
"""
Parity plot between measured and predicted compared_value.
Default compared value is {RE_1}_aq_eq
:param compared_value: (str) Quantity to compare predicted and
experimental data. Can be any column containing "eq" in exp_df i.e.
h_eq, z_eq, {RE}_d_eq, etc.
:param plot_title: (str or boolean)
If None (default): Plot title will be generated from compared_value
Recommend to just explore. If h_eq, plot_title is
"H^+ eq conc".
If str: Plot title will be plot_title string
If "False": No plot title
:param c_data: (list or np.ndarray) data for color axis
:param c_label: (str) label for color axis
:param save_path: (str) save path for parity plot
:param print_r_squared: (boolean) To plot or not to plot r-squared
value. Prints 2 places past decimal
:param data_labels: labels for the data such as paper's name where
experiment is pulled from.
:param legend: whether to display legend for data_labels. Has no
effect if data_labels is None
:return fig, ax: returns the figure and axes objects
"""
exp_df = self.get_exp_df()
predicted_dict = self.get_predicted_dict()
re_species_list = self._re_species_list
extractant_name = self.get_extractant_name()
re_charges = self._re_charges
if compared_value is None:
compared_value = '{0}_aq_eq'.format(re_species_list[0])
pred = pd.DataFrame(predicted_dict)[compared_value].fillna(0).values
meas = exp_df[compared_value].fillna(0).values
name_breakdown = re.findall('[^_\W]+', compared_value)
compared_species = name_breakdown[0]
if compared_species == 'h':
feed_molarity = exp_df['h_i'].fillna(0).values
elif compared_species == 'z':
feed_molarity = exp_df['z_i'].fillna(0).values
else:
feed_molarity = exp_df[
'{0}_aq_i'.format(compared_species)].fillna(0).values
if isinstance(data_labels, list):
combined_df = pd.DataFrame({'pred': pred,
'meas': meas,
'label': data_labels,
'feed_molarity': feed_molarity})
elif isinstance(c_data, str):
combined_df = pd.DataFrame({'pred': pred,
'meas': meas,
c_data: exp_df[c_data].values,
'feed_molarity': feed_molarity})
else:
combined_df = pd.DataFrame({'pred': pred,
'meas': meas,
'feed_molarity': feed_molarity})
combined_df = combined_df[(combined_df['feed_molarity'] != 0)]
meas = combined_df['meas'].values
pred = combined_df['pred'].values
min_data = np.min([pred, meas])
max_data = np.max([pred, meas])
min_max_data = np.array([min_data, max_data])
if compared_species == 'h':
default_title = '$H^+$ eq. conc. (mol/L)'
elif compared_species == 'z':
default_title = '{0} eq. conc. (mol/L)'.format(extractant_name)
else:
phase = name_breakdown[1]
if phase == 'aq':
re_charge = re_charges[re_species_list.index(compared_species)]
default_title = '$%s^{%d+}$ eq. conc. (mol/L)' \
% (compared_species, re_charge)
elif phase == 'd':
default_title = '{0} distribution ratio'.format(
compared_species)
else:
default_title = '{0} complex eq. conc. (mol/L)'.format(
compared_species)
fig, ax = plt.subplots()
if isinstance(data_labels, list):
unique_labels = list(set(data_labels))
for label in unique_labels:
filtered_data = combined_df[combined_df['label'] == label]
filtered_meas = filtered_data['meas']
filtered_pred = filtered_data['pred']
ax.scatter(filtered_meas, filtered_pred, label=label)
if legend:
ax.legend(loc='best')
elif c_data is not None:
if isinstance(c_data, str):
c_data = combined_df[c_data].values
p1 = ax.scatter(meas, pred, c=c_data, alpha=1, cmap='viridis')
c_bar = fig.colorbar(p1, format='%.2f')
if c_label is not None:
c_bar.set_label(c_label, rotation=270, labelpad=20)
else:
sns.scatterplot(meas, pred, color="r",
legend=False)
ax.plot(min_max_data, min_max_data, color="b", label="")
if print_r_squared:
ax.text(min_max_data[0],
min_max_data[1] * 0.9,
'$R^2$={0:.2f}'.format(self.r_squared(compared_value)))
# plt.legend(loc='lower right')
# else:
# plt.legend()
ax.set(xlabel='Measured', ylabel='Predicted')
if plot_title is None:
ax.set_title(default_title)
elif isinstance(plot_title, str):
ax.set_title(plot_title)
set_size(8, 6)
plt.tight_layout()
plt.show()
if save_path is not None:
plt.savefig(save_path, bbox_inches='tight')
return fig, ax
[docs] def r_squared(self, compared_value=None):
"""r-squared value comparing measured and predicted compared value
Closer to 1, the better the model's predictions.
:param compared_value: (str) Quantity to compare predicted and
experimental data. Can be any column containing "eq" in exp_df i.e.
h_eq, z_eq, {RE}_d_eq, etc. default is {RE}_aq_eq
"""
exp_df = self.get_exp_df()
predicted_dict = self.get_predicted_dict()
re_species_list = self._re_species_list
if compared_value is None:
compared_value = '{0}_aq_eq'.format(re_species_list[0])
pred = pd.DataFrame(predicted_dict)[compared_value].fillna(0).values
predicted_y = np.array(pred)
actual_y = exp_df[compared_value].fillna(0).values
name_breakdown = re.findall('[^_\W]+', compared_value)
compared_species = name_breakdown[0]
if compared_species == 'h':
feed_molarity = exp_df['h_i'].fillna(0).values
elif compared_species == 'z':
feed_molarity = exp_df['z_i'].fillna(0).values
else:
feed_molarity = exp_df[
'{0}_aq_i'.format(compared_species)].fillna(0).values
combined_df = pd.DataFrame({'pred': predicted_y,
'meas': actual_y,
'in_moles': feed_molarity})
combined_df = combined_df[(combined_df['in_moles'] != 0)]
actual_y = combined_df['meas'].values
predicted_y = combined_df['pred'].values
num = sum((actual_y - predicted_y) ** 2)
den = sum((actual_y - np.mean(actual_y)) ** 2)
if den == 0:
r_2 = 0
else:
r_2 = (1 - num / den)
return r_2
[docs] @staticmethod
def plot_3d_data(x_data,
y_data,
z_data,
c_data=None,
x_label=None,
y_label=None,
z_label=None,
c_label=None):
"""
:param x_data: (list) list of data for x axis
:param y_data: (list) list of data for y axis
:param z_data: (list) list of data for z axis
:param c_data: (list) list of data for color axis
:param x_label: (str) label for x axis
:param y_label: (str) label for y axis
:param z_label: (str) label for z axis
:param c_label: (str) label for color axis
:return:
"""
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
if c_data is None:
ax.plot(x_data, y_data, z_data, 'o')
else:
p1 = ax.scatter(x_data,
y_data,
z_data, 'o', c=c_data,
cmap='viridis', alpha=1)
c_bar = fig.colorbar(p1)
if c_label is not None:
c_bar.set_label(c_label, rotation=270, labelpad=20)
if x_label is None:
ax.set_xlabel('x', labelpad=15)
else:
ax.set_xlabel(x_label, labelpad=15)
if y_label is None:
ax.set_ylabel('y', labelpad=15)
else:
ax.set_ylabel(y_label, labelpad=15)
if z_label is None:
ax.set_zlabel('z', labelpad=15)
else:
ax.set_zlabel(z_label, labelpad=15)
plt.show()
return fig, ax