Source code for prepshot._model.heat_rate

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Piecewise-linear heat rates (convex, no UC required).

Standard CEM/PCM addition for thermal generators: their fuel-burn-
per-MWh isn't constant across the operating range. Above the design
point, marginal heat rate increases (each additional MWh costs more
fuel than the previous one). LP-friendly approximation: split the
operating range into K segments with monotone-non-decreasing
multipliers, force the dispatch to use the cheapest segment first.

The convexity matters: with non-decreasing multipliers across
ascending output ranges, the LP optimum naturally fills segments in
order (cheapest first) without any binary ordering constraints.
Real heat-rate curves below the design point (where part-load
losses dominate) are concave -- those can only be modelled with no-
load + startup costs in the UC overlay (v1.15+).

Inputs
======

``tech_heat_rate.csv`` (table format) -- one row per (tech,
segment), with columns:

* ``tech`` -- technology name
* ``segment`` -- integer index (1, 2, 3, ...)
* ``frac_max`` -- cumulative cap fraction at the upper edge of this
  segment (e.g., 0.5 for segment 1 means "0-50 % of cap").
  Segments must form a partition: ``frac_max`` strictly increasing,
  last value = 1.0.
* ``multiplier`` -- relative heat rate within this segment, 1.0 ==
  baseline (whatever ``tech_fuel_price`` already implies). The
  multiplier sequence MUST be non-decreasing for LP convexity.

Techs not listed in the CSV keep the v1.18 single-rate fuel cost
(``fuel_price * gen``). Mixing piecewise and flat-rate techs is
fine.

Constraints
===========

Per ``(h, m, y, z, te)`` in the heat-rate set:

* sum-to-gen:
  ``sum_s gen_segment[h, m, y, z, te, s] == gen[h, m, y, z, te]``
* per-segment width:
  ``gen_segment[..., s] <= width[s] * cap_existing[y, z, te] * dt``

where ``width[s] = frac_max[s] - frac_max[s-1]``.

Fuel cost replaces the flat ``fuel_price * gen`` with:

  ``fuel_price[te, y] * sum_s multiplier[s] * gen_segment[..., s]``

NPV-discounted via ``var_factor[y, z]`` and divided by ``weight``,
identical to the rest of ``cost_var``.

Config
======

* ``cost_parameters.is_piecewise_heat_rate`` (bool, default False).
  When False, the module is a no-op and PREP-SHOT keeps the
  v1.18 flat-rate behaviour byte-for-byte.
"""
import pyoptinterface as poi

from prepshot.utils import sparse_tupledict


[docs]class AddHeatRateConstraints: """Piecewise-linear heat rate, opt-in.""" def __init__(self, model: object) -> None: self.model = model if not model.params.get('is_piecewise_heat_rate', False): return # ``heat_rate`` arrives as a DataFrame (table format). Build # per-tech ordered segment lists so the rules can iterate. df = model.params.get('heat_rate') if df is None or not hasattr(df, 'iterrows') or df.empty: return # Per-tech: list of (frac_max, multiplier) tuples, sorted by # frac_max ascending. Validation: multipliers must be # non-decreasing for LP convexity. self.segments_by_tech: dict = {} for tech, grp in df.groupby('tech'): segs = grp.sort_values('frac_max')[ ['frac_max', 'multiplier'] ].itertuples(index=False, name=None) segs = list(segs) if not segs: continue mults = [m for _, m in segs] if any(b < a - 1e-9 for a, b in zip(mults[:-1], mults[1:])): raise ValueError( f"tech_heat_rate.csv: multipliers for tech={tech!r} " f"are not non-decreasing ({mults}). LP-convex " f"piecewise-linear heat rates require monotone " f"slopes; resort the CSV or fix the multipliers." ) self.segments_by_tech[tech] = segs if not self.segments_by_tech: return # Index set for segments. We size to the max-K seen in the # CSV and let unused (tech, seg) cells be locked to zero by # the segment-bound rule. max_segments = max( len(segs) for segs in self.segments_by_tech.values() ) segment_idx = list(range(1, max_segments + 1)) model.heat_rate_segments = segment_idx # Sparse indices: only for (z, te) pairs in active_zt where # the tech actually has a heat-rate curve. With 168 thermal # techs each at one bus and 3 segments, this is ~500 (z, te, # s) tuples per timestep, not 472*212*3 = 300k. heat_rate_techs = set(self.segments_by_tech.keys()) zt_with_curve = [ (z, te) for (z, te) in model.active_zt if te in heat_rate_techs ] # Per-(z, te) the actual segment count varies; build a flat # (h, m, y, z, te, s) list that includes only real segments. active_hmyztes = [ (h, m, y, z, te, s) for h in model.hour for m in model.month for y in model.year for (z, te) in zt_with_curve for s in range(1, len(self.segments_by_tech[te]) + 1) ] active_hmyzte = [ (h, m, y, z, te) for h in model.hour for m in model.month for y in model.year for (z, te) in zt_with_curve ] _new_var = lambda *_: model.add_variable(lb=0) model.gen_segment = sparse_tupledict(active_hmyztes, _new_var) # gen_segment is now only defined where it can be non-zero; # the segment-zero constraint becomes vestigial. Keep an # empty tupledict so other code that touches this attribute # doesn't NoneType-error. model.gen_segment_zero_cons = sparse_tupledict([], lambda *a: None) # Per-segment width bound: # gen_segment[s] <= (frac_max[s] - frac_max[s-1]) * cap * dt model.gen_segment_width_cons = sparse_tupledict( active_hmyztes, self.segment_width_rule ) # Sum-to-gen: per (h, m, y, z, te) in the heat-rate set, # sum_s gen_segment[..., s] == gen[h, m, y, z, te]. model.gen_sum_to_total_cons = sparse_tupledict( active_hmyzte, self.sum_to_gen_rule ) # ------------------------------------------------------------------ # Rules
[docs] def segment_zero_rule(self, h, m, y, z, te, s): """Force ``gen_segment[..., s] = 0`` for (te, s) not defined.""" segs = self.segments_by_tech.get(te) if segs is not None and s <= len(segs): return None return self.model.add_linear_constraint( self.model.gen_segment[h, m, y, z, te, s], poi.Eq, 0 )
[docs] def segment_width_rule(self, h, m, y, z, te, s): """``gen_segment[s] <= (width[s] * cap_existing) * dt``.""" segs = self.segments_by_tech.get(te) if segs is None or s > len(segs): return None prev = 0.0 if s == 1 else float(segs[s - 2][0]) width = float(segs[s - 1][0]) - prev if width <= 0: return self.model.add_linear_constraint( self.model.gen_segment[h, m, y, z, te, s], poi.Eq, 0 ) model = self.model dt = model.params['dt'] lhs = ( model.gen_segment[h, m, y, z, te, s] - width * model.cap_existing[y, z, te] * dt ) return model.add_linear_constraint(lhs, poi.Leq, 0)
[docs] def sum_to_gen_rule(self, h, m, y, z, te): """``sum_s gen_segment[..., s] == gen[h, m, y, z, te]``.""" segs = self.segments_by_tech.get(te) if segs is None: return None # tech keeps the flat-rate fuel cost model = self.model lhs = poi.quicksum( model.gen_segment[h, m, y, z, te, s] for s in range(1, len(segs) + 1) ) - model.gen[h, m, y, z, te] return model.add_linear_constraint(lhs, poi.Eq, 0)
[docs]def add_heat_rate_fuel_cost(model) -> "poi.ExprBuilder": """Per-segment fuel cost contribution, NPV-discounted. For each tech with a heat-rate curve, the fuel cost becomes: fuel_price[te, y] * sum_s multiplier[s] * gen_segment[..., s] summed over (h, m, y, z), discounted by ``var_factor[y, z]``, and divided by ``weight``. Returned as an ExprBuilder. Returns a zero ExprBuilder when the module is disabled or no heat-rate curves are loaded. The CALLER (``cost.var_cost_rule``) must remove the corresponding techs' contribution from the existing flat-rate ``fuel_cost_breakdown`` to avoid double-counting -- handled by skipping those techs in the original ``fuel_cost_breakdown`` loop when ``hasattr(model, 'gen_segment')`` and the tech is in ``segments_by_tech``. """ cost = poi.ExprBuilder() if not model.params.get('is_piecewise_heat_rate', False): return cost if not hasattr(model, 'gen_segment'): return cost df = model.params.get('heat_rate') if df is None or not hasattr(df, 'iterrows') or df.empty: return cost fuel_price = model.params.get('fuel_price') or {} vf = model.params['var_factor'] w = model.params['weight'] # Re-build per-tech segment lists (mirrors AddHeatRateConstraints # __init__; cheap relative to the LP build cost). segs_by_tech = {} for tech, grp in df.groupby('tech'): segs_by_tech[tech] = list( grp.sort_values('frac_max')[ ['frac_max', 'multiplier'] ].itertuples(index=False, name=None) ) tech_set = set(model.tech) for te, segs in segs_by_tech.items(): if te not in tech_set: continue # Iterate only the zones where this tech actually exists -- # gen_segment[..., te, s] only has variables at those zones # (sparse creation in __init__). zones_for_te = model.tech_zones.get(te, []) if not zones_for_te: continue for s, (_, mult) in enumerate(segs, start=1): for y in model.year: fp = float(fuel_price.get((te, y), 0.0)) if fp == 0.0: continue for z in zones_for_te: factor = vf[y, z] / w cost += ( fp * float(mult) * factor * poi.quicksum( model.gen_segment[h, m, y, z, te, s] for h in model.hour for m in model.month ) ) return cost
[docs]def techs_with_heat_rate_curve(model) -> set: """Return the set of techs that have a heat-rate curve loaded. Used by ``cost.fuel_cost_breakdown`` to skip techs that should be priced through ``add_heat_rate_fuel_cost`` instead of the flat-rate fallback. """ if not model.params.get('is_piecewise_heat_rate', False): return set() if not hasattr(model, 'gen_segment'): return set() df = model.params.get('heat_rate') if df is None or not hasattr(df, 'iterrows') or df.empty: return set() return set(df['tech'].astype(str))