prepshot.pcm 源代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Production-cost-model (PCM) mode with rolling horizon.

Companion mode to the default capacity-expansion (CEM) flow in
``run.py``. PCM takes a fixed fleet (from a prior CEM solve, or a
user-supplied capacity CSV) and solves *only* dispatch over a chosen
year, hour-by-hour, in overlapping windows -- the same machinery that
PowNet and PyPSA's ``optimize_with_rolling_horizon`` use.

.. note::

   **v1.14.1 status (alpha).** The rolling driver is wired end-to-end
   with single-window solves working cleanly. Multi-window rolling
   has TWO known limitations:

   1. **Cascading hydro cross-window state**: at the start of any
      non-first window, downstream stations see only their natural
      (incremental) inflow because the upstream's outflow during the
      lookback period (``hour[0] - delay .. hour[0]``) is in the
      previous window's solve and is not yet carried forward. When
      the downstream's ``min_outflow`` exceeds its natural inflow,
      water balance can drain storage below ``storage_min`` -> the
      sub-problem is infeasible. v1.15+ will add cross-window cascade
      state (carry the upstream's last ``max_delay`` outflow values
      into the next window's ``inflow_rule`` lookup).
   2. **Carbon cap rescaling**: the cap from ``policy_carbon_emission_
      limit.csv`` is annual (full-year tonneCO2) but each window
      only covers a fraction of the year. The naive filter applies
      the full annual cap to a 48-hour window -- not binding for our
      small examples but wrong on principle. Will rescale by
      ``window_hours / hours_in_year`` in v1.15.

   For now, recommend ``--horizon == --step == period_length``
   (single-window PCM = fixed-capacity dispatch validation).
   Multi-window rolling works on scenarios *without* binding
   min-outflow constraints on cascaded hydro.

Why rolling horizon?
====================

Solving dispatch on the full 8760-hour annual MILP/LP at once is
intractable for realistic networks. Rolling horizon decomposes the
problem into a sequence of short, overlapping subproblems:

  while t < 8760:
      build LP for hours [t, t + horizon)
      pin initial state (storage SOC, hydro reservoir level) from the
        previous window's terminal solution
      solve
      persist dispatch decisions for [t, t + step)
      advance t by step

Each window's lookahead (``horizon - step``) absorbs the end-of-window
distortion. PowNet uses ``horizon = 48 h, step = 24 h``; we default to
the same.

Capacity source
===============

Two formats are accepted via ``--cap-source PATH``:

* ``.nc`` -- a CEM ``baseline.nc`` written by ``run.py``. The
  ``install`` data array (dims ``year x zone x tech``) is selected
  for the chosen year.
* ``.csv`` -- a tidy table with columns ``zone, tech, year, capacity``
  (MW). Useful when running PCM standalone, no CEM needed.

If neither is supplied, the existing fleet from
``tech_existing.csv`` is used as-is (no expansion). This is the
"validate-the-existing-build" mode.

Config
======

Add a ``pcm_parameters`` block to ``config.json``::

    "pcm_parameters": {
        "horizon_h": 48,
        "step_h": 24,
        "year": 2030,
        "total_h": 168
    }

Or pass as CLI flags. CLI overrides config.

``total_h`` (optional) caps how many hours from ``hour[0]`` are
simulated -- useful as a smoke test on large nodal models, where a
single 48 h window can take tens of minutes to build. Omit (or set
``null``) to run the full year.

Output
======

PCM dispatch lands in ``output/baseline_pcm/`` -- one Parquet
sidecar per output variable (``gen.parquet``, ``trans_export.parquet``,
``lns.parquet``, ...), keyed in long format so the file size scales
with non-zero entries (not the dense (h, m, y, z, ...) product).
A ``manifest.json`` next to the Parquets records the schema. The
companion CEM ``baseline.nc`` is left untouched.

CLI
===

::

    cd examples/three_zone
    python -m prepshot.pcm --year 2025 --horizon 48 --step 24

    # smoke test: only the first 48 h, one window
    python -m prepshot.pcm --year 2025 --horizon 48 --step 48 --total-h 48

Or programmatically::

    from prepshot.pcm import run_pcm
    run_pcm('examples/three_zone', year=2025)
    run_pcm('examples/three_zone', year=2025, total_h=48)
"""
import argparse
import json
import logging
from pathlib import Path
from typing import Optional

import pandas as pd
import xarray as xr

from prepshot.set_up import initialize_environment
from prepshot.solver import solve_model
from prepshot.model import create_model
from prepshot.logs import setup_logging, timer


# ---------- capacity loaders ---------------------------------------------


[文档]def load_fixed_capacity( source: Path, target_year: int, scenario_dir: Path ) -> dict: """Return ``{(zone, tech): capacity_MW}`` for ``target_year``. ``source`` may be a path to: * a CEM ``baseline.nc`` (xarray dataset with an ``install`` array indexed by ``year x zone x tech``), or * a tidy CSV with columns ``zone, tech, year, capacity``. Relative paths are resolved against ``scenario_dir``. """ path = Path(source) if not path.is_absolute(): path = scenario_dir / path if not path.exists(): raise FileNotFoundError(f"Capacity source not found: {path}") if path.suffix == '.nc': ds = None last_err = None for engine in ('netcdf4', 'h5netcdf'): try: ds = xr.open_dataset(path, engine=engine) break except (OSError, ImportError, RuntimeError, ValueError) as exc: # netCDF backends fail in a variety of ways across # platforms (missing native lib, format mismatch, etc.); # we just want to fall through to the next engine. last_err = exc if ds is None: raise RuntimeError( f"Could not open {path}: {last_err}" ) if 'install' not in ds.data_vars: raise ValueError( f"{path} has no 'install' data var; not a CEM baseline." ) arr = ds['install'].sel(year=target_year) cap = {} for z in arr['zone'].values: for t in arr['tech'].values: cap[(str(z), str(t))] = float(arr.sel(zone=z, tech=t).item()) return cap if path.suffix == '.csv': df = pd.read_csv(path) df_yr = df[df['year'].astype(int) == int(target_year)] if df_yr.empty: raise ValueError( f"{path} has no rows for year={target_year}; " f"available years: {sorted(df['year'].unique())}" ) return { (str(r['zone']), str(r['tech'])): float(r['capacity']) for _, r in df_yr.iterrows() } raise ValueError( f"Unrecognized capacity source extension: {path.suffix}. " f"Use .nc or .csv." )
# ---------- per-window plumbing ------------------------------------------ def _lock_no_expansion(model) -> None: """Pin ``cap_newtech`` to zero so the model is dispatch-only. Must be called *after* ``create_model`` -- it sets the upper bound on each ``cap_newtech`` variable to 0, which presolve folds away. """ import pyoptinterface as poi for y in model.year: for z in model.zone: for te in model.tech: v = model.cap_newtech[y, z, te] model.set_variable_attribute( v, poi.VariableAttribute.UpperBound, 0.0 ) def _override_existing_fleet(params: dict, cap_lookup: dict) -> None: """Replace ``params['existing_fleet']`` so ``cap_existing`` for the chosen year collapses to the user-supplied capacity. investment.py reads ``params['existing_fleet']`` (dict keyed ``(tech, zone, commission_year) -> MW``) during model construction; each entry contributes capacity for years where ``commission_year <= y < commission_year + lifetime[tech, commission_year]``. By rewriting the dict to a single entry per (zone, tech) commissioned at the first model year, we guarantee the chosen year sees exactly the user-supplied capacity (and no other). Also clears the precomputed ``active_zt`` set so it's recomputed against the overridden fleet on the next ``create_model`` call. """ cy = params['year'][0] # commission at the first model year so # the lifetime lookup is defined. new_fleet = {} for (z, te), cap_mw in cap_lookup.items(): if cap_mw > 0: new_fleet[(te, z, cy)] = float(cap_mw) params['existing_fleet'] = new_fleet # Drop precomputed sparse index so create_model recomputes it # against the overridden fleet. for k in ('active_zt', 'tech_zones', 'zone_techs', 'active_zt_storage'): params.pop(k, None) def _build_window_params( full_params: dict, year: int, window_hours: list, state: dict, ) -> dict: """Slice ``full_params`` down to a single year + window of hours. A SHALLOW copy of ``full_params`` -- the big dicts (demand, max_gen_profile, inflow, ...) are reused by reference because they're never mutated per window. Only the small set of keys we actually overwrite below get fresh values; the dicts we update in place (e.g. ``initial_energy_storage_level``) are explicitly one-level copied so we don't bleed state into ``full_params``. Replaces the previous full ``deepcopy(full_params)`` which on Thai PCM (4 M demand entries + 0.3 M VRE-CF entries) cost ~10 s per window -- ~20x more than ``create_model`` itself. """ p = dict(full_params) # shallow copy p['hour'] = list(window_hours) p['year'] = [int(year)] p['skip_end_storage'] = True # interior windows: terminal SOC is free # Switch hydro from cyclic-wrap to non-cyclic-rolling. With cyclic # off, the cascade upstream's outflow at hours before the window # start is dropped from the downstream's inflow expression -- the # approximation that lets each window stand alone, instead of # implicitly looping its end back into its beginning. p['cyclic_hydro'] = False # Recompute weight for the smaller window so cost / income terms # scale correctly (weight is in the income denominator). if 'hours_in_year' in p: p['weight'] = ( len(p['month']) * len(window_hours) * p['dt'] ) / p['hours_in_year'] # Filter year-keyed lookups down to {year}. The CEM model iterates # over rows of the policy / fleet tables and indexes year-keyed # variables; keeping rows for years outside the window triggers # KeyError when those variables don't exist in this subproblem. cel = p.get('carbon_emission_limit') if cel is not None and hasattr(cel, 'iterrows'): p['carbon_emission_limit'] = cel[cel['year'] == int(year)].copy() # set per-station initial storage from carried-over state if state.get('hydro_storage'): p['initial_reservoir_storage_level'] = dict(state['hydro_storage']) if state.get('prior_outflow'): # Cross-window cascade state, consumed by hydro.inflow_rule # when the delayed hour falls before this window's hour[0]. p['prior_outflow'] = dict(state['prior_outflow']) if state.get('battery_storage'): # Merge: keep the dataset's default SOC for any (te, z) that # the previous window didn't write (e.g. zero-capacity slots). merged = dict(p.get('initial_energy_storage_level') or {}) merged.update(state['battery_storage']) p['initial_energy_storage_level'] = merged # Force the head-iteration off in PCM windows -- per-window head # iteration would multiply the solve count by 3x with no extra # accuracy on a single window. p['iteration_number'] = 1 return p def _extract_window_state( model, full_params: dict, terminal_hour: int ) -> dict: """Read storage levels + cascade outflows at end of the committed window to seed the next window. Hydro storage is clamped a small fraction inside ``[storage_min, storage_max]`` so floating-point boundary values from the LP optimum don't accidentally violate the next window's bounds. Battery SOC is converted from absolute MWh (the form held by ``model.storage``) to the per-unit-of-capacity fraction that ``params['initial_energy_storage_level']`` expects, using the same ``esl * cap * epr * dt`` relation as the ``init_energy_storage_rule``. **Prior outflow state (v1.17+).** For each upstream station with a delay ``D``, the next window's first ``D`` hours of downstream inflow refer back to the upstream's outflow over hours ``[terminal_hour - D + 1, terminal_hour]``. We extract those values now and stash them as a flat ``{(station, hour, month, year) -> m**3/s}`` lookup that the next window's ``hydro.inflow_rule`` will consult when ``t < hour[0]``. Without this, downstream stations would see only natural inflow at window boundaries -- often infeasible when ``min_outflow > natural_inflow``. """ state = { 'hydro_storage': {}, 'battery_storage': {}, 'prior_outflow': {}, } if full_params.get('isinflow') and hasattr(model, 'storage_reservoir'): smin = full_params.get('reservoir_storage_min') or {} smax = full_params.get('reservoir_storage_max') or {} for s in model.station: # storage_*[s, m, h] -- pick m=h=1 as a per-station bound # (these CSVs in shipped examples are flat across the # representative period). PCM v1.14.1 will resolve per-hour # bounds properly. lo = float(smin.get((s, 1, 1), 0.0)) hi = float(smax.get((s, 1, 1), float('inf'))) margin = max(0.0, (hi - lo) * 1e-3) # 0.1 % buffer for m in model.month: for y in model.year: val = float(model.get_value( model.storage_reservoir[s, terminal_hour, m, y] )) val = min(max(val, lo + margin), hi - margin) state['hydro_storage'][s] = val # Prior outflow lookup: for every upstream-of-anyone station # ``s`` and every absolute hour ``h_abs`` in # ``[terminal_hour - max_delay + 1, terminal_hour]``, stash # ``outflow[s, h_abs, m, y]``. The next window's # ``hydro.inflow_rule`` keys this dict when ``t < hour[0]``. wdt_df = full_params.get('water_delay_time') if ( wdt_df is not None and hasattr(wdt_df, 'iterrows') and not wdt_df.empty and not wdt_df['delay'].isna().all() ): upstream_stations = set(wdt_df['upstream_tech'].astype(str)) max_delay = int(wdt_df['delay'].max()) else: upstream_stations = set() max_delay = 0 for s in upstream_stations: if s not in set(model.station): continue for offset in range(max_delay): h_abs = terminal_hour - offset if h_abs not in set(model.hour): continue for m in model.month: for y in model.year: try: v = float(model.get_value( model.outflow[s, h_abs, m, y] )) except (KeyError, TypeError): continue state['prior_outflow'][(s, h_abs, m, y)] = v if hasattr(model, 'storage') and getattr(model, 'storage_tech', None): epr_lookup = full_params.get('energy_to_power_ratio') or {} dt = full_params['dt'] for z in model.zone: for te in model.storage_tech: # Compose cap_existing for this (year, zone, tech). Use # the FIRST year in the window (PCM only models one). year = model.year[0] cap_mw = float(model.get_value( model.cap_existing[year, z, te] )) if hasattr(model.cap_existing, '__getitem__') else 0.0 epr = float(epr_lookup.get(te, 0.0)) denom = cap_mw * epr * dt if denom <= 0: # No installed storage in this (z, te) slot -- # skip; the next window's init lookup defaults to 0. continue for m in model.month: val_mwh = float(model.get_value( model.storage[terminal_hour, m, year, z, te] )) esl = val_mwh / denom # Clamp into [0, 1] -- LP feasibility guarantees # storage stays in [0, cap*epr*dt], but # floating-point noise can push the ratio just # outside. esl = min(max(esl, 0.0), 1.0) state['battery_storage'][te, z] = esl return state def _extract_window_dispatch( model, commit_hours: list, year: int ) -> dict: """Pull the dispatch decisions for ``commit_hours`` out of a solved window model. Returns a dict of arrays keyed like the CEM output.""" out = {'gen': [], 'charge': [], 'trans_export': [], 'genflow': [], 'spillflow': [], 'reserve_up': [], 'reserve_down': [], 'lns': [], 'lmp': []} months = list(model.month) zones = list(model.zone) techs = list(model.tech) stations = list(model.station) if hasattr(model, 'station') else [] storage_set = set(getattr(model, 'storage_tech', []) or []) zone_techs = getattr(model, 'zone_techs', None) or { z: list(techs) for z in zones } out_neighbours = getattr(model, 'out_neighbours', None) or { z: list(zones) for z in zones } for h in commit_hours: for m in months: for z in zones: for te in zone_techs.get(z, []): out['gen'].append({ 'hour': h, 'month': m, 'year': year, 'zone': z, 'tech': te, 'value': float(model.get_value( model.gen[h, m, year, z, te] )), }) if hasattr(model, 'charge') and te in storage_set: out['charge'].append({ 'hour': h, 'month': m, 'year': year, 'zone': z, 'tech': te, 'value': float(model.get_value( model.charge[h, m, year, z, te] )), }) if hasattr(model, 'reserve_up'): out['reserve_up'].append({ 'hour': h, 'month': m, 'year': year, 'zone': z, 'tech': te, 'value': float(model.get_value( model.reserve_up[h, m, year, z, te] )), }) out['reserve_down'].append({ 'hour': h, 'month': m, 'year': year, 'zone': z, 'tech': te, 'value': float(model.get_value( model.reserve_down[h, m, year, z, te] )), }) # Load-not-served: per-(h, m, y, z), populated only # when allow_load_shedding is on. if hasattr(model, 'lns'): out['lns'].append({ 'hour': h, 'month': m, 'year': year, 'zone': z, 'value': float(model.get_value( model.lns[h, m, year, z] )), }) for z in zones: for z2 in out_neighbours.get(z, []): out['trans_export'].append({ 'hour': h, 'month': m, 'year': year, 'zone1': z, 'zone2': z2, 'value': float(model.get_value( model.trans_export[h, m, year, z, z2] )), }) for s in stations: out['genflow'].append({ 'hour': h, 'month': m, 'year': year, 'station': s, 'value': float(model.get_value( model.genflow[s, h, m, year] )), }) out['spillflow'].append({ 'hour': h, 'month': m, 'year': year, 'station': s, 'value': float(model.get_value( model.spillflow[s, h, m, year] )), }) # LMP: dual of the nodal power-balance constraint, scaled to # real-year USD/MWh. The constraint is posed as # ``demand - supply == 0`` so the raw dual is the negative of the # marginal cost of one more MWh of demand -- flip the sign to # follow the convention. The objective discounts every cost term # by ``var_factor[y, z] / weight``; multiplying the dual by # ``weight / var_factor`` undoes that and gives a real-year # marginal price. Skipped silently if the solver did not return # duals (e.g. infeasible). # Pick the right per-backend dual-extractor once. HiGHS exposes # the dual through the typed ``ConstraintAttribute.Dual`` API, but # PoI 0.4's Gurobi backend raises ``AttributeError: Quadratric`` # (sic) on the same call for linear constraints, so we have to # also keep a raw-name fallback (``Pi`` for Gurobi). def _make_dual_extractor(): import pyoptinterface as poi probe = next(iter(model.power_balance_cons.values()), None) if probe is None: return None try: model.get_constraint_attribute(probe, poi.ConstraintAttribute.Dual) return lambda c: float( model.get_constraint_attribute(c, poi.ConstraintAttribute.Dual) ) except Exception: pass for raw_name in ('Pi', 'dual'): try: model.get_constraint_raw_attribute_double(probe, raw_name) return lambda c, n=raw_name: float( model.get_constraint_raw_attribute_double(c, n) ) except Exception: continue return None extract_dual = _make_dual_extractor() if extract_dual is None: logging.warning("could not extract LMP duals: backend exposes no Pi / dual") else: weight = model.params['weight'] vf = model.params['var_factor'] for h in commit_hours: for m in months: for z in zones: c = model.power_balance_cons[h, m, year, z] dual = extract_dual(c) out['lmp'].append({ 'hour': h, 'month': m, 'year': year, 'zone': z, 'value': -dual * weight / float(vf[year, z]), }) return out # ---------- aggregation + write ------------------------------------------ def _aggregate(window_outs: list) -> dict: """Concatenate per-window dispatch dicts into one per-key list.""" merged = {k: [] for k in window_outs[0].keys()} for w in window_outs: for k, rows in w.items(): merged[k].extend(rows) return merged def _save_pcm_netcdf(merged: dict, out_path: Path) -> None: """Save per-variable result tables. For sparse-by-construction outputs (``gen`` / ``charge`` / ``trans_export``: only ``active_zt`` or ``active_lines`` cells populated), pivoting to a dense ``(hour, month, year, zone, ...)`` xarray creates a NaN-padded array that's orders of magnitude larger than the data. On a full-year Thai PCM run that's 8760 x 472 x 472 = 1.95 B cells just for ``trans_export`` (~16 GB), which overflows the NetCDF3 32-bit size field. Strategy: write each variable as a long-format Parquet sidecar next to ``out_path`` (in ``out_path`` with the ``.nc`` suffix swapped for a directory). Parquet is compressed, columnar, and natively sparse. A small ``manifest.json`` records the schema. """ out_dir = out_path.with_suffix('') out_dir.mkdir(parents=True, exist_ok=True) schema = {} for key, rows in merged.items(): if not rows: continue df = pd.DataFrame(rows) df.to_parquet(out_dir / f'{key}.parquet', compression='zstd') schema[key] = list(df.columns) (out_dir / 'manifest.json').write_text( json.dumps({'variables': schema, 'format': 'parquet'}, indent=2) ) logging.info( 'PCM dispatch written to %s/ (%d Parquet variables)', out_dir, len(schema), ) # ---------- main entry ---------------------------------------------------
[文档]@timer def run_pcm( scenario_dir, *, year: Optional[int] = None, horizon_h: int = 48, step_h: int = 24, cap_source: Optional[str] = None, total_h: Optional[int] = None, allow_load_shedding: Optional[bool] = None, voll: Optional[float] = None, ) -> None: """Solve PCM dispatch over one year with rolling horizon. Parameters ---------- scenario_dir : path-like Directory containing ``config.json``, ``params.json``, ``input/``. year : int, optional Which model year to dispatch. Defaults to the first year of the config's planning horizon. horizon_h : int Window length in hours. Defaults to 48. step_h : int Window advance in hours (committed segment). Defaults to 24. cap_source : str, optional Path to a CEM ``baseline.nc`` or capacity CSV. If omitted, the CEM-shipped existing fleet is used unchanged. total_h : int, optional Cap on the number of hours simulated from ``hour[0]``. Useful as a smoke test on large nodal models where the full year is intractable. Defaults to ``None`` (run all hours). allow_load_shedding : bool, optional Allow non-negative slack (``lns``) on every nodal power balance, priced at ``voll`` in the objective. Lets PCM windows complete even when the dispatch can't physically meet demand at some (hour, zone). Overrides ``reliability_parameters.allow_load_shedding`` from ``config.json`` when set. voll : float, optional Value of lost load (USD/MWh) for the load-shedding penalty. Overrides ``reliability_parameters.voll`` from ``config.json`` when set. Defaults to 10000. """ setup_logging() scenario_dir = Path(scenario_dir).resolve() cfg = { 'filepath': str(scenario_dir), 'config_filename': str(scenario_dir / 'config.json'), 'params_filename': str(scenario_dir / 'params.json'), } # initialize_environment registers an argparse parser with one flag # per scenario parameter (carbon_tax, demand, ...). Our CLI flags # (--year, --horizon, ...) would collide with that parser when # invoked via "python -m prepshot.pcm ...". Strip our flags by # presenting initialize_environment a clean argv -- we've already # captured what we need from the user. import sys as _sys _saved_argv = _sys.argv _sys.argv = [_saved_argv[0]] try: full_params = initialize_environment(cfg) finally: _sys.argv = _saved_argv pcm_block = json.loads( (scenario_dir / 'config.json').read_text() ).get('pcm_parameters') or {} if year is None: year = pcm_block.get('year', full_params['year'][0]) horizon_h = horizon_h or pcm_block.get('horizon_h', 48) step_h = step_h or pcm_block.get('step_h', 24) cap_source = cap_source or pcm_block.get('cap_source') if total_h is None: total_h = pcm_block.get('total_h') # Load-shedding overrides: CLI / kwarg wins over config; config # wins over the default. We stamp the merged value back into # full_params so every per-window deepcopy picks it up. if allow_load_shedding is not None: full_params['allow_load_shedding'] = bool(allow_load_shedding) if voll is not None: full_params['voll'] = float(voll) if full_params.get('allow_load_shedding'): logging.info( 'PCM: load shedding enabled (VOLL=%.0f $/MWh)', full_params.get('voll', 10000.0), ) if cap_source: cap_lookup = load_fixed_capacity( Path(cap_source), int(year), scenario_dir ) logging.info( 'PCM: loaded fixed capacity for year %s from %s (%d entries)', year, cap_source, len(cap_lookup), ) else: cap_lookup = None logging.info( 'PCM: no cap-source given; using existing fleet from CEM inputs' ) # Initial state: per-station / per-battery starting storage. state = { 'hydro_storage': dict( full_params.get('initial_reservoir_storage_level') or {} ), 'battery_storage': {}, } full_hours = list(full_params['hour']) n_hours = len(full_hours) if total_h is not None: n_hours = min(n_hours, int(total_h)) logging.info( 'PCM: total_h=%d -- simulating only hours [%d..%d] ' '(of %d available)', total_h, full_hours[0], full_hours[n_hours - 1], len(full_hours), ) window_outs = [] t = 0 while t < n_hours: wh_end_idx = min(t + horizon_h, n_hours) commit_end_idx = min(t + step_h, n_hours) window_hours = full_hours[t:wh_end_idx] commit_hours = full_hours[t:commit_end_idx] win_params = _build_window_params( full_params, int(year), window_hours, state ) if cap_lookup is not None: _override_existing_fleet(win_params, cap_lookup) m = create_model(win_params) _lock_no_expansion(m) ok = solve_model(m, win_params) if not ok: raise RuntimeError( f'PCM window hours {window_hours[0]}..{window_hours[-1]} ' f'failed to solve to optimality.' ) window_outs.append( _extract_window_dispatch(m, commit_hours, int(year)) ) state = _extract_window_state(m, win_params, commit_hours[-1]) logging.info( 'PCM window [%d..%d] solved; committed [%d..%d]; advancing.', window_hours[0], window_hours[-1], commit_hours[0], commit_hours[-1], ) t += step_h merged = _aggregate(window_outs) out_path = scenario_dir / 'output' / 'baseline_pcm.nc' _save_pcm_netcdf(merged, out_path)
[文档]def main() -> None: """CLI entry point: ``python -m prepshot.pcm``.""" p = argparse.ArgumentParser(prog='python -m prepshot.pcm') p.add_argument( 'scenario', type=Path, default=Path.cwd(), nargs='?', help='Scenario directory (defaults to cwd).', ) p.add_argument('--year', type=int, default=None) p.add_argument('--horizon', type=int, default=48, help='Window length in hours.') p.add_argument('--step', type=int, default=24, help='Window advance in hours.') p.add_argument('--cap-source', default=None, help='Path to a CEM baseline.nc or capacity CSV.') p.add_argument('--total-h', type=int, default=None, help='Cap simulated hours from hour[0]; smoke-test ' 'knob for large nodal models.') p.add_argument('--allow-load-shedding', action='store_true', default=None, help='Allow non-negative load-not-served slack on ' 'every (h, m, y, z) priced at VOLL.') p.add_argument('--voll', type=float, default=None, help='Value of lost load in $/MWh (default 10000).') args = p.parse_args() run_pcm( args.scenario, year=args.year, horizon_h=args.horizon, step_h=args.step, cap_source=args.cap_source, total_h=args.total_h, allow_load_shedding=args.allow_load_shedding, voll=args.voll, )
if __name__ == '__main__': main()