prepshot._model.unit_commitment 源代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Unit-commitment (UC) overlay -- clustered MILP formulation.

Standard "clustered UC" representation, used by GenX, PowNet, and
similar planning models that need the MILP fidelity without modelling
each plant individually:

* Each UC-eligible tech (typically thermal: coal, gas, oil) is a
  cluster of identical units of size ``tech_unit_size[te]`` MW. The
  total cluster capacity is ``cap_existing[y, z, te]``, so the
  *integer* number of units is ``cap / unit_size``.

* New decision variables per ``(h, m, y, z, te)``:

  - ``online[h]`` -- number of units online (integer, 0..N_units).
  - ``startup[h]`` -- number of units started this hour (integer >=0).
  - ``shutdown[h]`` -- number of units shut down this hour (integer
    >=0).

* New constraints:

  - **State evolution** (h > hour[0]):
    ``online[h] - online[h-1] = startup[h] - shutdown[h]``
  - **Capacity bound on dispatch**:
    ``gen[h] <= online[h] * unit_size * p_max_pu * dt``
    ``gen[h] >= online[h] * unit_size * p_min_pu * dt``

    These REPLACE the standard ``gen <= cap_existing * p_max_pu * dt``
    bound for UC-eligible techs (the standard one is now redundant
    because ``online <= N_units = cap/unit_size``).
  - **Minimum up time** (``online >= sum of recent startups``):
    ``online[h] >= sum_{i=0..MinUp-1} startup[h-i]``
  - **Minimum down time** (``offline >= sum of recent shutdowns``):
    ``(N_units - online[h]) >= sum_{i=0..MinDown-1} shutdown[h-i]``

* New cost terms (added to the objective):

  - ``startup_cost[te] * unit_size[te] * startup[h]`` per hour
  - ``no_load_cost[te] * unit_size[te] * online[h] * dt`` per hour

  Both NPV-discounted via ``var_factor[y, z]`` like the rest of the
  variable cost.

This module turns PREP-SHOT into a **MILP** when enabled. HiGHS solves
it; runtime grows several-fold relative to the LP. Recommended only
for small-to-medium scenarios -- ``three_zone`` (48 hours) takes a
few seconds; full-year 8760-hour PCM mode with UC is feasible only
in rolling-horizon windows (see ``prepshot.pcm``).

Configuration
=============

Add to ``config.json``::

    "uc_parameters": {
        "is_uc": true,
        "uc_relaxation": "integer"   // or "continuous" for LP relaxation
    }

When ``is_uc`` is missing or ``false``, the module is a no-op and
PREP-SHOT stays LP. When ``uc_relaxation`` is ``"continuous"``, the
binaries become continuous in their natural ranges -- useful for
scaling tests and for warm-starting an integer solve.

Inputs
======

All optional (rows missing or files missing -> safe defaults):

* ``tech_uc_eligible.csv`` -- ``tech, eligible``. Default ``0``.
* ``tech_unit_size.csv`` -- ``tech, value`` (MW). Default ``1`` (one
  cluster = one unit at full cap, equivalent to LP behaviour).
* ``tech_min_up_time.csv`` -- ``tech, value`` (hours). Default ``1``.
* ``tech_min_down_time.csv`` -- ``tech, value`` (hours). Default ``1``.
* ``tech_startup_cost.csv`` -- ``tech, value`` ($/MW). Default ``0``.
* ``tech_no_load_cost.csv`` -- ``tech, value`` ($/MW-h). Default ``0``.
"""
import pyoptinterface as poi

from prepshot.utils import sparse_tupledict


[文档]class AddUnitCommitmentConstraints: """Clustered unit-commitment constraints + cost terms.""" def __init__(self, model: object) -> None: self.model = model if not model.params.get('is_uc', False): return # Eligibility: techs missing from tech_uc_eligible.csv default # to ``not eligible`` and pass through this module untouched. elig = model.params.get('uc_eligible') or {} self.is_eligible = { t: bool(elig.get(t, False)) for t in model.tech } if not any(self.is_eligible.values()): return # nothing to do; module is a no-op # Tech parameters with safe defaults. self.unit_size = model.params.get('uc_unit_size') or {} self.min_up = model.params.get('uc_min_up_time') or {} self.min_down = model.params.get('uc_min_down_time') or {} self.startup_cost = model.params.get('uc_startup_cost') or {} self.no_load_cost = model.params.get('uc_no_load_cost') or {} # Domain: integer for true clustered MILP, continuous for LP # relaxation. ``set_variable_attribute`` flips the domain # post-construction; for integer we set during creation. relax = model.params.get('uc_relaxation', 'integer') domain = ( poi.VariableDomain.Integer if relax == 'integer' else poi.VariableDomain.Continuous ) # Per-(zone, tech) integer count of units in the existing cluster. # cap_existing is an ExprBuilder; we use the *params* ['existing_ # fleet'] dict directly so this stays a Python integer. self.n_units = {} fleet = model.params.get('existing_fleet') or {} for z in model.zone: for te in model.tech: if not self.is_eligible[te]: continue u = float(self.unit_size.get(te, 1.0)) if u <= 0: self.n_units[(z, te)] = 0 continue # Sum capacity across commission years for this (zone, tech) cap_mw = sum( float(cap) for (t, zz, _cy), cap in fleet.items() if t == te and zz == z ) self.n_units[(z, te)] = int(cap_mw / u + 1e-6) # p_max_pu / p_min_pu lookups (same source as generation.py). # Must be computed BEFORE make_tupledict invokes the rules. self._p_max_pu = dict(model.params.get('max_gen_profile') or {}) self._p_min_pu = dict(model.params.get('min_gen_profile') or {}) # Sparse: only (z, te) where the tech is both UC-eligible AND # actually deployed at the zone. Drops ~99% of dense at full- # nodal scale; on three_zone (all techs at all zones) it's # equivalent to the original. active_zt_uc = [ (z, te) for (z, te) in model.active_zt if self.is_eligible.get(te, False) ] active_hmyzte_uc = [ (h, m, y, z, te) for h in model.hour for m in model.month for y in model.year for (z, te) in active_zt_uc ] self._active_zt_uc = active_zt_uc self._active_hmyzte_uc = active_hmyzte_uc _new_var = lambda *_: model.add_variable(lb=0, domain=domain) model.online = sparse_tupledict(active_hmyzte_uc, _new_var) model.startup = sparse_tupledict(active_hmyzte_uc, _new_var) model.shutdown = sparse_tupledict(active_hmyzte_uc, _new_var) # Constraints. Each rule already early-returns for non-eligible # techs; sparsifying just skips the rule call for those cells. model.uc_n_unit_bound_cons = sparse_tupledict( active_hmyzte_uc, self.n_unit_bound_rule ) model.uc_state_evolution_cons = sparse_tupledict( active_hmyzte_uc, self.state_evolution_rule ) model.uc_gen_up_cons = sparse_tupledict( active_hmyzte_uc, self.gen_up_uc_rule ) model.uc_gen_low_cons = sparse_tupledict( active_hmyzte_uc, self.gen_low_uc_rule ) model.uc_min_up_cons = sparse_tupledict( active_hmyzte_uc, self.min_up_rule ) model.uc_min_down_cons = sparse_tupledict( active_hmyzte_uc, self.min_down_rule ) # ------------------------------------------------------------------ # Rules
[文档] def n_unit_bound_rule(self, h, m, y, z, te): """``online[h] <= N_units`` and force online=startup=shutdown=0 for non-eligible (z, te) cells.""" model = self.model if not self.is_eligible[te]: # Lock all three to 0 so they don't drift in the LP relax. model.add_linear_constraint(model.online[h, m, y, z, te], poi.Eq, 0) model.add_linear_constraint(model.startup[h, m, y, z, te], poi.Eq, 0) return model.add_linear_constraint( model.shutdown[h, m, y, z, te], poi.Eq, 0 ) n = self.n_units.get((z, te), 0) return model.add_linear_constraint( model.online[h, m, y, z, te] - n, poi.Leq, 0 )
[文档] def state_evolution_rule(self, h, m, y, z, te): """``online[h] - online[h-1] = startup[h] - shutdown[h]`` for every hour after the first in the modelled set. The first hour's online state is free (initialised by the LP optimum).""" model = self.model if not self.is_eligible[te] or h == model.hour[0]: return None lhs = ( model.online[h, m, y, z, te] - model.online[h - 1, m, y, z, te] - model.startup[h, m, y, z, te] + model.shutdown[h, m, y, z, te] ) return model.add_linear_constraint(lhs, poi.Eq, 0)
[文档] def gen_up_uc_rule(self, h, m, y, z, te): """``gen[h] <= online[h] * unit_size * p_max_pu * dt``.""" model = self.model if not self.is_eligible[te]: return None u = float(self.unit_size.get(te, 1.0)) p_max_pu = self._p_max_pu.get((te, z, y, m, h), 1) dt = model.params['dt'] lhs = ( model.gen[h, m, y, z, te] - model.online[h, m, y, z, te] * u * p_max_pu * dt ) return model.add_linear_constraint(lhs, poi.Leq, 0)
[文档] def gen_low_uc_rule(self, h, m, y, z, te): """``gen[h] >= online[h] * unit_size * p_min_pu * dt``.""" model = self.model if not self.is_eligible[te]: return None u = float(self.unit_size.get(te, 1.0)) p_min_pu = self._p_min_pu.get((te, z, y, m, h), 0) if p_min_pu == 0: return None # trivial constraint; gen >= 0 already enforced dt = model.params['dt'] lhs = ( model.gen[h, m, y, z, te] - model.online[h, m, y, z, te] * u * p_min_pu * dt ) return model.add_linear_constraint(lhs, poi.Geq, 0)
[文档] def min_up_rule(self, h, m, y, z, te): """``online[h] >= sum_{i=0..MinUp-1} startup[h-i]`` -- if a unit started in any of the last MinUp hours, it must be online now.""" model = self.model if not self.is_eligible[te]: return None mu = int(self.min_up.get(te, 1)) if mu <= 1: return None # 1-hour min-up = no real constraint h0 = model.hour[0] lookback = [hh for hh in range(h - mu + 1, h + 1) if hh >= h0] if not lookback: return None lhs = ( poi.quicksum( model.startup[hh, m, y, z, te] for hh in lookback ) - model.online[h, m, y, z, te] ) return model.add_linear_constraint(lhs, poi.Leq, 0)
[文档] def min_down_rule(self, h, m, y, z, te): """``(N_units - online[h]) >= sum_{i=0..MinDown-1} shutdown[h-i]``.""" model = self.model if not self.is_eligible[te]: return None md = int(self.min_down.get(te, 1)) if md <= 1: return None h0 = model.hour[0] lookback = [hh for hh in range(h - md + 1, h + 1) if hh >= h0] if not lookback: return None n = self.n_units.get((z, te), 0) lhs = ( model.online[h, m, y, z, te] + poi.quicksum( model.shutdown[hh, m, y, z, te] for hh in lookback ) - n ) return model.add_linear_constraint(lhs, poi.Leq, 0)
[文档]def add_uc_cost_terms(model) -> "poi.ExprBuilder": """Return the UC cost contribution (startup + no-load) as an ExprBuilder, NPV-discounted by ``var_factor`` and divided by ``weight`` to align with the rest of ``cost_var``. Returns a zero ExprBuilder when UC is disabled, so callers can add it unconditionally. """ cost = poi.ExprBuilder() if not model.params.get('is_uc', False): return cost if not hasattr(model, 'online'): return cost elig = model.params.get('uc_eligible') or {} is_eligible = {t: bool(elig.get(t, False)) for t in model.tech} if not any(is_eligible.values()): return cost unit_size = model.params.get('uc_unit_size') or {} startup_cost = model.params.get('uc_startup_cost') or {} no_load_cost = model.params.get('uc_no_load_cost') or {} vf = model.params['var_factor'] w = model.params['weight'] dt = model.params['dt'] # Sparse: walk only (z, te) where the tech is eligible AND # actually deployed at the zone (online/startup variables only # exist there). active_zt_uc = [ (z, te) for (z, te) in model.active_zt if is_eligible.get(te, False) ] for h in model.hour: for m in model.month: for y in model.year: for (z, te) in active_zt_uc: u = float(unit_size.get(te, 1.0)) sc = float(startup_cost.get(te, 0.0)) nl = float(no_load_cost.get(te, 0.0)) if sc == 0 and nl == 0: continue factor = vf[y, z] / w if sc != 0: cost += ( sc * u * model.startup[h, m, y, z, te] * factor ) if nl != 0: cost += ( nl * u * dt * model.online[h, m, y, z, te] * factor ) return cost