Source code for prepshot.output_data
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""This module contains functions to save solving results of models.
"""
import logging
import argparse
import numpy as np
import xarray as xr
import pandas as pd
from prepshot.logs import timer
from prepshot.utils import cartesian_product
[docs]def create_data_array(
data : dict, dims : list, unit : str, model : object,
extractor=None,
) -> xr.DataArray:
"""Create a xarray DataArray with specified data, dimensions, coordinates
and units.
Parameters
----------
data : dict
The data to be included in the DataArray.
dims : list
The list of dimentions of the data.
unit : str
The unit of the data.
model : object
The model object.
extractor : callable, optional
Function that turns a tupledict entry into a scalar. Defaults
to ``model.get_value`` (primal values for a variable
tupledict). Pass ``model.get_constraint_dual`` to extract
shadow prices from a constraint tupledict.
Returns
-------
xr.DataArray
A DataArray with the specified data, dimensions, coordinates and units.
"""
if extractor is None:
extractor = model.get_value
coords = {dim:getattr(model, dim) for dim in dims}
index_tuple = cartesian_product(*coords.values())
if len(dims) == 1:
index_tuple = [i[0] for i in index_tuple]
data_values = data.map(extractor)
data = np.array(
[data_values[tuple_] for tuple_ in index_tuple]
).reshape([len(coord) for coord in coords.values()])
return xr.DataArray(data=data,
dims=dims,
coords=coords,
attrs={'unit': unit})
[docs]@timer
def extract_results_non_hydro(model : object) -> xr.Dataset:
"""Extracts results for non-hydro models.
Parameters
----------
model : object
Model object sloved already.
Returns
-------
xr.Dataset
A Dataset containing DataArrays for each attribute of the model.
"""
model.zone1 = model.zone
model.zone2 = model.zone
data_vars = {}
data_vars['trans_export'] = create_data_array(
model.trans_export, ['hour', 'month', 'year', 'zone1', 'zone2'],
'TWh',
model)
data_vars['gen'] = create_data_array(
model.gen, ['hour', 'month', 'year', 'zone', 'tech'], 'TWh', model)
data_vars['install'] = create_data_array(
model.cap_existing, ['year', 'zone', 'tech'], 'MW', model
)
data_vars['carbon'] = create_data_array(
model.carbon, ['year'], 'Ton', model
)
data_vars['charge'] = create_data_array(
model.charge, ['hour', 'month', 'year', 'zone', 'tech'], 'MWh', model
)
data_vars['cost_var_breakdown'] = create_data_array(
model.cost_var_tech_breakdown, ['year', 'zone', 'tech'],
'dollar', model
)
data_vars['cost_fix_breakdown'] = create_data_array(
model.cost_fix_tech_breakdown, ['year', 'zone', 'tech'],
'dollar', model
)
data_vars['cost_newtech_breakdown'] = create_data_array(
model.cost_newtech_breakdown, ['year', 'zone', 'tech'],
'dollar', model
)
if hasattr(model, 'public_debt_newtech'):
data_vars['public_debt_newtech'] = create_data_array(
model.public_debt_newtech, ['year', 'zone', 'tech'],
'dollar', model
)
data_vars['cost_newline_breakdown'] = create_data_array(
model.cost_newline_breakdown, ['year', 'zone1', 'zone2'],
'dollar', model
)
data_vars['carbon_breakdown'] = create_data_array(
model.carbon_breakdown, ['year', 'zone', 'tech'], 'Ton', model)
# Shadow price (dual) of the nodal power-balance constraint --
# the locational marginal price of one extra MWh of demand at
# each (hour, month, year, zone). The constraint is posed as
# 'demand - supply == 0', so the raw dual is negative-of-LMP; we
# flip the sign here to follow the standard convention (positive
# marginal cost of one more MWh of demand). Discounted to NPV:
# divide by var_factor[y, z] to get undiscounted real-year LMPs.
# Skipped silently if the solver did not return duals (e.g. MIP,
# infeasible).
try:
data_vars['shadow_price_demand'] = create_data_array(
model.power_balance_cons,
['hour', 'month', 'year', 'zone'],
'dollar/MWh (NPV)', model,
extractor=lambda c: -model.get_constraint_dual(c),
)
except Exception as e:
logging.warning(
"Could not extract demand-balance shadow prices: %s", e
)
data_vars['cost_var'] = xr.DataArray(model.get_value(model.cost_var))
data_vars['cost_fix'] = xr.DataArray(model.get_value(model.cost_fix))
data_vars['cost_newtech'] = xr.DataArray(
data=model.get_value(model.cost_newtech)
)
data_vars['cost_newline'] = xr.DataArray(
data=model.get_value(model.cost_newline)
)
data_vars['income'] = xr.DataArray(
data=model.get_value(model.income)
)
data_vars['cost'] = xr.DataArray(
data = model.get_value(model.cost)
)
return xr.Dataset(data_vars)
[docs]@timer
def extract_results_hydro(model : object) -> xr.Dataset:
"""Extracts results for hydro models.
Parameters
----------
model : object
Model solved already.
Returns
-------
xr.Dataset
A Dataset containing DataArrays for each attribute of the model.
"""
ds = extract_results_non_hydro(model)
data_vars = {}
data_vars['genflow'] = create_data_array(
model.genflow, ['station', 'hour', 'month', 'year'], 'm**3s**-1',
model
)
data_vars['spillflow'] = create_data_array(
model.spillflow, ['station', 'hour', 'month', 'year'], 'm**3s**-1',
model
)
return ds.assign(data_vars)
[docs]@timer
def save_to_excel(
ds : xr.Dataset,
output_filename : str
) -> None:
"""Save the results to an Excel file.
Parameters
----------
ds : xr.Dataset
Dataset containing the results.
output_filename : str
The name of the output file.
"""
max_rows = 1_000_000
# pylint: disable=abstract-class-instantiated
with pd.ExcelWriter(
f'{output_filename}.xlsx', engine='xlsxwriter'
) as writer:
for key in ds.data_vars:
if len(ds[key].shape) == 0:
df = pd.DataFrame([ds[key].values.max()], columns=[key])
else:
df = ds[key].to_dataframe()
n_rows = len(df)
if n_rows <= max_rows:
df.to_excel(writer, sheet_name=key, merge_cells=False)
else:
logging.info(
"%s has %d rows, split into %d sheets", key, n_rows, int(np.ceil(n_rows/max_rows))
)
n_sheets = int(np.ceil(n_rows/max_rows))
for i in range(n_sheets):
start_row = i * max_rows
end_row = min((i+1) * max_rows, n_rows)
df_part = df.iloc[start_row:end_row]
df_part.to_excel(
writer, sheet_name=f'{key}_{i+1}', merge_cells=False
)
[docs]def save_result(model : object) -> None:
"""Extracts results from the provided model.
Parameters
----------
model : object
The model object to extract results from and save.
"""
isinflow = model.params['isinflow']
args = model.params['command_line_args']
output_filename = model.params['output_filename']
output_filename = update_output_filename(output_filename, args)
if isinflow:
ds = extract_results_hydro(model)
else:
ds = extract_results_non_hydro(model)
ds.to_netcdf(f'{output_filename}.nc')
logging.info(
"Results are written to %s.nc", output_filename
)
save_to_excel(ds, output_filename)
logging.info("Results are written to separate excel files")
[docs]def update_output_filename(
output_filename : str, args : argparse.Namespace
) -> str:
"""Update the output filename based on the arguments.
Parameters
----------
output_filename : str
The name of the output file.
args : argparse.Namespace
Arguments parsed by argparse.
Returns
-------
str
The updated output filename.
"""
output_filename = output_filename + '_'.join(
f'{key}_{value}' for key, value in vars(args).items()
if value is not None
)
return output_filename