Source code for dominosee.grid.grid

"""Equidistant points on a sphere.

Fibbonachi Spiral:
https://bduvenhage.me/geometry/2019/07/31/generating-equidistant-vectors.html

Fekete points:
https://arxiv.org/pdf/0808.1202.pdf

Geodesic grid: (sec. 3.2)
https://arxiv.org/pdf/1711.05618.pdf

Review on geodesic grids:
http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.113.997&rep=rep1&type=pdf

"""
# Original work Copyright (C) 2022 Felix Strnad <felix.strnad@uni-tuebingen.de>
# From: https://github.com/mlcs/climnet/blob/main/climnet/grid/grid.py
# Modifications Copyright (C) 2025 Hui-Min Wang <wanghuimin@u.nus.edu>
# 
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# -----------------------------------------------------------------------------

import math
import os
import pickle
import sys
import time
import warnings
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple, Union, Any

import numpy as np
from scipy.spatial import SphericalVoronoi
from scipy.spatial.transform import Rotation as Rot

from .fekete import bendito, points_on_sphere

class BaseGrid(ABC):
    """Abstract base class for geographical grids.
    
    This class provides the common interface and functionality for different
    types of geographical grids.
    
    Attributes:
        grid: Dictionary containing grid coordinates with 'lat' and 'lon' keys
    """
    def __init__(self) -> None:
        self.grid: Optional[Dict[str, np.ndarray]] = None

    @abstractmethod
    def get_distance_equator(self) -> float:
        """Return distance between points at the equator in km."""
        pass

    @abstractmethod
    def create_grid(self) -> Dict[str, np.ndarray]:
        """Create and return the grid as a dictionary with 'lat' and 'lon' keys."""
        pass

    def to_pickle(self, filepath: str) -> None:
        """Save grid to pickle file.
        
        Args:
            filepath: Path where to save the pickled grid
        """
        with open(filepath, 'wb') as fp:
            pickle.dump(self, fp, protocol=pickle.HIGHEST_PROTOCOL)
    
    @classmethod
    def from_pickle(cls, filepath: str):
        """Load grid from pickle file.
        
        Args:
            filepath: Path to the pickled grid file
            
        Returns:
            Grid instance loaded from file
        """
        with open(filepath, 'rb') as fp:
            return pickle.load(fp)

    def cut_grid(self, lat_range: Tuple[float, float], lon_range: Tuple[float, float]) -> Dict[str, np.ndarray]:
        """Extract grid points within specified latitude and longitude ranges.

        TODO: allow taking regions around the date line (longitude wrap-around)
        Note: Ranges crossing the date line (longitude wrap-around) are not yet supported.

        Args:
            lat_range: (min_lat, max_lat) in degrees
            lon_range: (min_lon, max_lon) in degrees
            
        Returns:
            Dictionary with 'lat' and 'lon' keys containing filtered coordinates
            
        Raises:
            ValueError: If longitude range crosses the date line
            RuntimeError: If no grid has been created yet
        """
        if self.grid is None:
            raise RuntimeError("No grid exists. Call create_grid() first.")
            
        if lon_range[0] > lon_range[1]:
            raise ValueError("Ranges around the date line are not yet supported.")
            
        print(f"Cutting grid to lat: {lat_range}, lon: {lon_range}")

        mask = ((self.grid['lat'] >= lat_range[0]) & 
                (self.grid['lat'] <= lat_range[1]) & 
                (self.grid['lon'] >= lon_range[0]) & 
                (self.grid['lon'] <= lon_range[1]))
        cut_grid = {'lat': self.grid['lat'][mask], 'lon': self.grid['lon'][mask]}
        
        return cut_grid


class RegularGrid(BaseGrid):
    """Gaussian Grid of the earth which is the classical grid type.

    [WIP] This class is under development and not fully tested.

    Args:
    ----
    grid_step_lon: float 
        Grid step in longitudinal direction in degree
    grid_step_lat: float
        Grid step in longitudinal direction in degree

    """

    def __init__(self, grid_step_lon, grid_step_lat, grid=None):
        self.grid_step_lon = grid_step_lon
        self.grid_step_lat = grid_step_lat
        self.grid = grid
        if grid is None:
            self.create_grid() 

    def create_grid(self) -> Dict[str, np.ndarray]:
        """Create regular grid with specified longitude and latitude steps.
        
        Returns:
            Dictionary with 'lat' and 'lon' keys containing grid coordinates
        """
        init_lon, init_lat = self._regular_lon_lat_step(self.grid_step_lon, self.grid_step_lat)

        lon_mesh, lat_mesh = np.meshgrid(init_lon, init_lat)
        
        self.grid = {'lat': lat_mesh.flatten(), 'lon': lon_mesh.flatten()}

        return self.grid
        
    def get_distance_equator(self):
        """Return distance between points at the equator."""
        d_lon = deg_to_equatorial_distance(self.grid_step_lon, radius=6371)
        return d_lon
    
    @staticmethod
    def _regular_lon_lat_step(lon_step: float, lat_step: float) -> Tuple[np.ndarray, np.ndarray]:
        """Create longitude and latitude arrays with specified steps, centered within bounds.
        
        Args:
            lon_step: Longitude step in degrees
            lat_step: Latitude step in degrees
            
        Returns:
            Tuple of (longitude_array, latitude_array)
        """
        num_lon = int(np.ceil(360/lon_step))
        lon_border = (360 - (num_lon-1) * lon_step)/2
        num_lat = int(np.ceil(180/lat_step))
        lat_border = (180 - (num_lat-1) * lat_step)/2
        lon = np.linspace(-180 + lon_border, 180 - lon_border, num_lon)
        lat = np.linspace(-90 + lat_border, 90 - lat_border, num_lat)
        return lon, lat


class GaussianGrid(BaseGrid):
    """Gaussian Grid of the earth which is the classical grid type.

    [WIP] This class is under development and not fully tested.

    Args:
    ----
    grid_step_lon: float 
        Grid step in longitudinal direction in degree
    grid_step_lat: float
        Grid step in longitudinal direction in degree

    """

    def __init__(self, grid_step_lon, grid_step_lat, grid=None):
        self.grid_step_lon = grid_step_lon
        self.grid_step_lat = grid_step_lat
        self.grid = grid
        if grid is None:
            self.create_grid()



    def create_grid(self): # TODO: 确认高斯网格的起止点
        init_lat = np.arange(-89.5, 90.5, self.grid_step_lat)
        init_lon = np.arange(-179.5, 180.5, self.grid_step_lon)

        lon_mesh, lat_mesh = np.meshgrid(init_lon, init_lat) # lats as repeat, lons as tile
        
        self.grid = {'lat': lat_mesh.flatten(), 'lon': lon_mesh.flatten()}

        return self.grid
        


    def get_distance_equator(self):
        """Return distance between points at the equator."""
        d_lon = deg_to_equatorial_distance(self.grid_step_lon, radius=6371)
        return d_lon


class FibonacciGrid(BaseGrid):
    """Fibonacci sphere creates a equidistance grid on a sphere.
    
    [WIP] This class is under development and not fully tested.

    Parameters:
    -----------
    distance_between_points: float
        Distance between the equidistance grid points in km.
    grid: dict (or 'old' makes old version of fib grid, 'maxmin' to maximize min. min-distance)

        If grid is already computed, e.g. {'lon': [], 'lat': []}. Default: None
    """

    def __init__(self, distance_between_points, grid=None, epsilon = 0.36, save = True):
        self.distance = distance_between_points
        self.num_points = distance_to_grid_num(self.distance)
        self.epsilon = None
        self.grid = None
        if grid is None: # maxavg is standard
            self.create_grid(self.num_points, epsilon, save = save)
            self.epsilon = epsilon
        elif grid == 'old':
            self.create_grid('old')
        elif grid == 'maxmin':
            eps = self._maxmin_epsilon(self.num_points)
            self.create_grid(self.num_points, eps, save = save)
            self.epsilon = eps
        else:
            self.grid = grid
        self.reduced_grid = None
    

    def create_grid(self, num_points = 1, epsilon = 0.36, save =True):
        if num_points == 'old':
            """Create Fibonacci grid."""
            print(f'Create fibonacci grid with {self.num_points} points.')
            cartesian_grid = self.fibonacci_sphere(self.num_points)
            lon, lat = cart_to_geo(cartesian_grid[:,0],
                                        cartesian_grid[:,1],
                                        cartesian_grid[:,2])
        else:
            filepath = f'fibonaccigrid_{num_points}_{epsilon}.p'
            if os.path.exists(filepath):
                print(f'\nLoad Fibonacci grid with {self.num_points} points and epsilon = {epsilon}.')
                with open(filepath, 'rb') as fp:
                    self.grid = pickle.load(fp)
                    return self.grid
            else:
                print(f'\nCreate refined fibonacci grid with {num_points} points and epsilon = {epsilon}.')
                goldenRatio = (1 + 5**0.5)/2
                i = np.arange(0, num_points) 
                theta = 2 *np.pi * i / goldenRatio
                phi = np.arccos(1 - 2*(i+epsilon)/(num_points-1+2*epsilon))
                x, y, z = np.cos(theta) * np.sin(phi), np.sin(theta) * np.sin(phi), np.cos(phi)
                lon, lat = cart_to_geo(x,y,z)
                self.grid = {'lon': lon, 'lat': lat}
                if save:
                    with open(filepath, 'wb') as fp:
                        pickle.dump(self.grid, fp, protocol=pickle.HIGHEST_PROTOCOL)
        self.grid = {'lon': lon, 'lat': lat}
        return self.grid

    def fibonacci_sphere(self, num_points=1):
        """Creates the fibonacci sphere points on a unit sphere.
        Code inspired by:
        https://stackoverflow.com/questions/9600801/evenly-distributing-n-points-on-a-sphere
        """
        points = []
        phi = math.pi * (3. - math.sqrt(5.))  # golden angle in radians

        for i in range(num_points):
            y = 1 - (i / float(num_points - 1)) * 2  # y goes from 1 to -1
            radius = math.sqrt(1 - y * y)  # radius at y

            theta = phi * i  # golden angle increment

            x = math.cos(theta) * radius
            z = math.sin(theta) * radius

            points.append([x, y, z])

        return np.array(points)


    def fibonacci_refined(self, num_points = 1, epsilon = 0.36): # epsilon = 0.36 for optimal average distance
            
        print(f'Create refined fibonacci grid with {self.num_points} points and epsilon={epsilon}.')

        goldenRatio = (1 + 5**0.5)/2
        i = np.arange(0, num_points) 
        theta = 2 *np.pi * i / goldenRatio
        phi = np.arccos(1 - 2*(i+epsilon)/(num_points-1+2*epsilon))
        x, y, z = np.cos(theta) * np.sin(phi), np.sin(theta) * np.sin(phi), np.cos(phi)
        lon, lat = cartesian2spherical(x,y,z)
        self.grid = {'lon': lon, 'lat': lat}
        return self.grid

    def get_num_points(self):
        """Get number of points for this grid's distance setting.
        
        Returns:
            Number of points required for the current distance
        """
        return distance_to_grid_num(self.distance)


    def fit_numPoints_distance(self):
        """Fit functional relationship between next-nearest-neighbor distance of
           fibonacci points and number of points."""
        num_points = np.linspace(200, 10000, 20, dtype=int)
        main_distance = []
        for n in num_points:
            points = self.fibonacci_sphere(n)
            lon, lat = cart_to_geo(points[:,0], points[:,1], points[:,2])

            distance = neighbor_distance(lon, lat)
            hist, bin_edge = np.histogram(distance.flatten(), bins=100)

            main_distance.append(bin_edge[np.argmax(hist)])

        # fit function
        logx = np.log(main_distance)
        logy = np.log(num_points)
        coeffs = np.polyfit(logx, logy, deg=1)

        def func(x):
            return np.exp(coeffs[1]) * x**(coeffs[0])

        dist_array = np.linspace(100, 1400, 20)
        y_fit = func(dist_array)

        # # Plot fit and data
        # fig, ax = plt.subplots()
        # ax.plot(main_distance, num_points)
        # ax.plot(dist_array, y_fit )
        # ax.set_xscale('linear')
        # ax.set_yscale('linear')

        return coeffs

    def get_distance_equator(self):
        """Return distance between points at the equator."""
        return  self.distance
    
    @staticmethod
    def _maxmin_epsilon(num_points):
        """Determine optimal epsilon value for maximizing minimum distance between points.
        
        Args:
            num_points: Number of points in the grid
            
        Returns:
            float: Optimal epsilon value for the given number of points
        """
        if num_points >= 600000:
            epsilon = 214
        elif num_points>= 400000:
            epsilon = 75
        elif num_points>= 11000:
            epsilon = 27
        elif num_points>= 890:
            epsilon = 10
        elif num_points>= 177:
            epsilon = 3.33
        elif num_points>= 24:
            epsilon = 1.33
        else:
            epsilon = 0.33
        return epsilon
    
    def keep_original_points(self, orig_grid, regular = True):
        if self.grid is None:
            self.create_grid()
        if regular:
            new_lon, new_lat, dists = [], [], []
            lons = np.sort(np.unique(orig_grid['lon']))
            lats = np.sort(np.unique(orig_grid['lat']))
            for i in range(len(self.grid['lat'])):
                lo = self.grid['lon'][i]
                la = self.grid['lat'][i]
                pm_lon = np.array([(lons[j]-lo)*(lons[j+1]-lo) for j in range(len(lons)-1)])
                pm_lat = np.array([(lats[j]-la)*(lats[j+1]-la) for j in range(len(lats)-1)])
                if np.where(pm_lon<0)[0].shape[0] == 0:
                    rel_lon = [lons[0], lons[-1]]
                else:
                    lon_idx = np.where(pm_lon<0)[0][0]
                    rel_lon = [lons[lon_idx], lons[lon_idx+1]]
                if np.where(pm_lat<0)[0].shape[0] == 0:
                    rel_lat = [lats[0], lats[-1]]
                else:
                    lat_idx = np.where(pm_lat<0)[0][0]
                    rel_lat = [lats[lat_idx], lats[lat_idx+1]]
                min_dist = 99999
                for l1 in rel_lon:
                    for l2 in rel_lat:
                        this_dist = geo_distance(l1, l2, lo, la)
                        if this_dist < min_dist:
                            min_lon, min_lat = l1, l2
                            min_dist = this_dist
                new_lon.append(min_lon)
                new_lat.append(min_lat)
                dists.append(min_dist)
            self.reduced_grid = {'lon': np.array(new_lon), 'lat': np.array(new_lat)}
            return np.array(dists)
        else:
            raise KeyError('Only regular grids!')

    def min_dists(self, grid2 = None):
        if grid2 is None:
            lon1, lon2 = self.grid['lon'], self.grid['lon']
            lat1, lat2 = self.grid['lat'], self.grid['lat']
            d = 9999 * np.ones((len(lon1),len(lon2)))
            for i in range(len(lon1)):
                for j in range((len(lon1))):
                    if i < j:
                        d[i,j] = geo_distance(lon1[i], lat1[i], lon2[j], lat2[j])
                    elif i>j:
                        d[i,j] = d[j,i]
            return d.min(axis=1)
        else: # min dist from self.grid point to other grid
            lon1, lon2 = self.grid['lon'], grid2['lon']
            lat1, lat2 = self.grid['lat'], grid2['lat']
            d = 9999 * np.ones((len(lon1),len(lon2)))
            for i in range(len(lon1)):
                for j in range(len(lon2)):
                    d[i,j] = geo_distance(lon1[i], lat1[i], lon2[j], lat2[j])
            return d.min(axis=1)


[docs] class FeketeGrid(BaseGrid): """FeketeGrid creates a equidistance grid on a sphere. Parameters: ----------- num_points: int Number of points to generate on the sphere num_iter: int, optional Number of iterations for grid optimization. Default: 1000 grid: None, dict, or tuple, optional Grid initialization method: - None: Create new grid from scratch (default) - dict: Initialize from {'lat': [...], 'lon': [...]} dictionary - tuple: Initialize from fekete.bendito output (X, dq) parallel: bool or None, optional Whether to use parallel execution. None for auto-detect based on memory """
[docs] def __init__(self, num_points: int, num_iter: int = 0, grid: Optional[Union[Dict[str, np.ndarray], Tuple]] = None, parallel: Optional[bool] = None) -> None: # Validate inputs if not isinstance(num_points, int) or num_points <= 0: raise ValueError("num_points must be a positive integer") if not isinstance(num_iter, int) or num_iter < 0: raise ValueError("num_iter must be a non-negative integer") # Initialize basic parameters self.num_points = num_points self.num_iter = num_iter self.grid: Optional[Dict[str, np.ndarray]] = None self.dq: List[float] = [] self._plys_coords: Optional[np.ndarray] = None # Create or load grid based on input type if grid is None: # Case 1: Create new grid from scratch self.create_grid(num_iter=num_iter, parallel=parallel) elif isinstance(grid, dict): # Case 2: Initialize from lat/lon dictionary self._create_from_dict(grid) elif isinstance(grid, tuple): # Case 3: Initialize from fekete.py output tuple self.grid = self.create_grid_from_tuple(grid) else: raise TypeError("Grid must be None, dict with 'lat'/'lon' keys, or tuple from fekete.py output")
""" properties and parameters """ @property def distance(self) -> float: """Get the average distance between points in km.""" return grid_num_to_distance(self.num_points)
[docs] def get_distance_equator(self) -> float: """Return distance between points at the equator in km.""" return self.distance
def _determine_parallel_mode(self, parallel: Optional[bool], num_points: int) -> bool: """Determine whether to use parallel execution based on memory constraints. Args: parallel: User preference for parallel execution (None for auto-detect) num_points: Number of points in the grid Returns: True if parallel execution should be used, False otherwise """ if parallel is None: try: # Test if we can allocate the required memory for serial execution test_array = np.zeros((num_points, num_points), dtype=np.float64) del test_array # Clean up immediately return False except MemoryError: print("Not enough memory to run serially. Running in parallel ...") return True return parallel """ different ways to create FeketeGrid """ def _create_from_dict(self, grid_dict: Dict[str, np.ndarray]) -> None: """Create grid from lat/lon dictionary. Args: grid_dict: Dictionary with 'lat' and 'lon' keys containing coordinate arrays Raises: ValueError: If dictionary format is invalid or arrays have different lengths """ if not isinstance(grid_dict, dict) or 'lat' not in grid_dict or 'lon' not in grid_dict: raise ValueError("Grid dict must contain 'lat' and 'lon' keys") # Validate that lat and lon arrays have the same length if len(grid_dict['lat']) != len(grid_dict['lon']): raise ValueError("Latitude and longitude arrays must have the same length") # Check if grid length matches num_points actual_points = len(grid_dict['lat']) if actual_points != self.num_points: warnings.warn(f"Grid has {actual_points} points but num_points was set to {self.num_points}. " f"Updating num_points to {actual_points}.") self.num_points = actual_points # Store the grid and initialize dq as empty list self.grid = {'lat': np.array(grid_dict['lat']), 'lon': np.array(grid_dict['lon'])} self.dq = []
[docs] def create_grid_from_tuple(self, grid_tuple: Tuple[np.ndarray, List[float]]) -> Dict[str, np.ndarray]: """Create grid from fekete.bendito output tuple. Args: grid_tuple: Tuple containing (X, dq) where X is 3D Cartesian coordinates and dq is the optimization history Returns: Dictionary with 'lat' and 'lon' keys containing grid coordinates Raises: ValueError: If tuple format is invalid """ try: X, self.dq = grid_tuple except (ValueError, TypeError): raise ValueError('Grid tuple should contain (coordinates_array, optimization_history)') if not isinstance(X, np.ndarray) or X.shape[1] != 3: raise ValueError('Coordinates array should be Nx3 numpy array') lon, lat = cart_to_geo(X[:,0], X[:,1], X[:,2]) grid = {'lon': lon, 'lat': lat} # Check if grid length matches num_points actual_points = len(grid['lat']) if actual_points != self.num_points: warnings.warn(f"Grid has {actual_points} points but num_points was set to {self.num_points}. " f"Updating num_points to {actual_points}.") self.num_points = actual_points return grid
[docs] def create_grid(self, num_iter: int = 1000, parallel: Optional[bool] = None) -> None: """Create new Fekete grid from scratch. Args: num_iter: Number of iterations for grid optimization parallel: Whether to use parallel execution (None for auto-detect) """ # Initialize with random configuration self.initialize_grid(save=False) # TODO: save is kept for future development of `save_epoch` # Improve the grid if iterations are requested if num_iter > 0: self.improve_grid(num_iter=num_iter, save=False, parallel=parallel)
""" save and load """
[docs] def to_pickle(self, filepath: Optional[str] = None) -> None: """Save FeketeGrid to pickle file. Args: filepath: Path to save file. If None, generates default name. """ if filepath is None: filepath = f'feketegrid_n{self.num_points}_i{self.num_iter}.p' return super().to_pickle(filepath)
[docs] def save_grid(self, filepath: Optional[str] = None) -> None: """Save grid to pickle file. .. deprecated:: Use to_pickle() instead. This method will be removed in a future version. """ warnings.warn( "save_grid is deprecated and will be removed in a future version. Use to_pickle instead.", DeprecationWarning, stacklevel=2 ) if filepath is None: filepath = f'feketegrid_{self.num_points}_{self.num_iter}.p' with open(filepath, 'wb') as fp: pickle.dump((self.grid, self.dq), fp, protocol=pickle.HIGHEST_PROTOCOL)
[docs] @classmethod def load_grid(cls, filepath: Optional[str] = None): """Load grid from pickle file. .. deprecated:: Use from_pickle() instead. This method will be removed in a future version. """ warnings.warn( "load_grid is deprecated and will be removed in a future version. Use from_pickle instead.", DeprecationWarning, stacklevel=2 ) if filepath is None: filepath = f'feketegrid_{self.num_points}_{self.num_iter}.p' with open(filepath, 'rb') as fp: self.grid, self.dq = pickle.load(fp)
""" Core: Initialize => Improve logic """
[docs] def initialize_grid(self, save: bool = True) -> Dict[str, np.ndarray]: print("Epochwise generation launched. Generating random initial configuration ...") X = points_on_sphere(self.num_points) # initial random configuration lon, lat = cart_to_geo(X[:,0], X[:,1], X[:,2]) self.num_iter = 0 self.grid = {'lon': lon, 'lat': lat} self.dq = [] if save: filepath = f'feketegrid_{self.num_points}_{0}.p' self.save_grid(filepath) return self.grid
[docs] def improve_grid(self, num_iter: int = 1000, save: bool = True, parallel: Optional[bool] = None) -> Dict[str, np.ndarray]: """Improve existing grid through optimization iterations. Args: num_iter: Number of optimization iterations to perform save: Whether to save the improved grid to file parallel: Whether to use parallel execution (None for auto-detect) Returns: Updated grid dictionary with 'lat' and 'lon' keys Raises: RuntimeError: If no grid exists to improve """ if self.grid is None: raise RuntimeError("No grid exists to improve. Call initialize_grid() first.") X0 = np.array(geo_to_cart(self.grid['lon'], self.grid['lat'])).T parallel_mode = self._determine_parallel_mode(parallel, self.num_points) if parallel is not None else False X, dq = bendito(N=self.num_points, maxiter=num_iter, X=X0, parallel=parallel_mode) self.dq.append(dq) lon, lat = cart_to_geo(X[:,0], X[:,1], X[:,2]) self.grid = {'lon': lon, 'lat': lat} self.num_iter = self.num_iter + num_iter if save: filepath = f'feketegrid_{self.num_points}_{self.num_iter}.p' self.save_grid(filepath) return self.grid
""" Boundaries calculation """
[docs] def create_SphericalVoronoi(self): import scipy.spatial as sp spVoronoi = sp.SphericalVoronoi(np.array(geo_to_cart(self.grid["lon"], self.grid["lat"])).T, threshold=1e-8) spVoronoi.sort_vertices_of_regions() self.regions_vertidx = spVoronoi.regions self.vertices = np.array(cart_to_geo(*spVoronoi.vertices.T)).T self.regions_vert = [self.vertices[idx, :] for idx in self.regions_vertidx] # self.spVoronoi = spVoronoi # TODO: remove after testing return self.regions_vert
def _harmonize_SphericalVoronoi(self): import shapely as sh # 先harmonize个数 harmonTarget = np.array([len(x) for x in self.regions_vert]).max() harmonized_regions_vertidx = self.regions_vertidx harmonized_regions_vertidx = [np.insert(np.array(r), np.zeros(harmonTarget-len(r), dtype="int"), r[0]) for r in harmonized_regions_vertidx] # 再harmonize方向(根据ccw) harmonized_regions_vertidx = [r if sh.LinearRing(self.vertices[r, :]).is_ccw else r[::-1] for r in harmonized_regions_vertidx] # np.array([sh.LinearRing(self.vertices[r, :]).is_ccw for r in harmonized_regions_vertidx]).all() # TEST for ccw # 再生成_plys_coords self._plys_coords = np.array([self.vertices[r, :] for r in harmonized_regions_vertidx]) return self._plys_coords """ API for output """
[docs] def to_cdo_gridfile(self, filepath = None, out_vertex = False): self.to_gridfile(filepath, out_vertex)
[docs] def to_gridfile(self, filepath = None, out_vertex = False): if filepath is None: filepath = f'feketegrid_n{self.num_points}_it{self.num_iter}.txt' with open(filepath, 'w') as fp: fp.write("gridtype\t= unstructured\n") fp.write(f"gridsize\t= {self.num_points}\n") fp.write(f"xsize\t= {self.num_points}\n") fp.write(f"ysize\t= {self.num_points}\n") xvals = str.join(" ", self.grid["lon"].astype(str)) fp.write(f"xvals\t= {xvals}\n") yvals = str.join(" ", self.grid["lat"].astype(str)) fp.write(f"yvals\t= {yvals}\n") if out_vertex: if self._plys_coords is None: self.create_SphericalVoronoi() # _ = self.harmonize_nvertex() _ = self._harmonize_SphericalVoronoi() fp.write(f"nvertex\t= {self._plys_coords.shape[1]}\n") xbounds = str.join(" ", self._plys_coords[:, :, 0].ravel().astype(str)) fp.write(f"xbounds\t= {xbounds}\n") ybounds = str.join(" ", self._plys_coords[:, :, 1].ravel().astype(str)) fp.write(f"ybounds\t= {ybounds}\n") fp.write("xunits\t= degrees\n") fp.write("yunits\t= degrees\n") return filepath
# TODO: API to uxarray here! """ Utilities """
[docs] def align_to_target_grid(self, target_coords, method='L-BFGS-B', initial_guess=None): """ Align Fekete grid to target coordinates by minimizing spherical distances. This function uses numerical optimization to find the rotation that minimizes the sum of great circle distances between grid points and their targets. Unlike the Wahba solution which minimizes Euclidean distances, this directly optimizes spherical distances. Parameters ---------- target_coords : dict Target coordinates with 'lon' and 'lat' keys containing arrays of target positions. Must have same length as current grid. method : str, default='L-BFGS-B' Optimization method. Options: 'L-BFGS-B', 'BFGS', 'Powell', 'Nelder-Mead' initial_guess : array-like, optional Initial rotation vector (3,) for optimization. If None, uses Wahba solution as starting point. Returns ------- dict New grid coordinates {'lon': array, 'lat': array} after optimal spherical alignment. Also contains optimization info: 'success', 'message', 'nfev', 'fun' Notes ----- Optimization method: **Numerical optimization** (not theoretical solution) - Uses rotation vector parameterization to avoid matrix constraints - Minimizes: Σ arccos(clamp((R @ current_i) · target_i, -1, 1)) - No closed-form solution exists for this objective function - Convergence depends on initial guess and grid configuration The rotation vector r represents rotation by ||r|| radians around axis r/||r||. """ from scipy.optimize import minimize # Validate inputs (same as previous function) if self.grid is None: raise ValueError("No Fekete grid exists. Call create_grid() first.") if not isinstance(target_coords, dict) or 'lat' not in target_coords or 'lon' not in target_coords: raise ValueError("target_coords must be dict with 'lat' and 'lon' keys") if len(target_coords['lat']) != len(self.grid['lat']): raise ValueError("target_coords must have same length as current grid") # Convert to 3D Cartesian coordinates curr_x, curr_y, curr_z = geo_to_cart(self.grid['lon'], self.grid['lat']) current_points = np.array([curr_x, curr_y, curr_z]).T current_points = current_points / np.linalg.norm(current_points, axis=1, keepdims=True) target_x, target_y, target_z = geo_to_cart(target_coords['lon'], target_coords['lat']) target_points = np.array([target_x, target_y, target_z]).T target_points = target_points / np.linalg.norm(target_points, axis=1, keepdims=True) def spherical_distance_objective(rotation_vector): """Objective function: sum of spherical distances after rotation.""" # Convert rotation vector to rotation matrix if np.linalg.norm(rotation_vector) < 1e-10: R = np.eye(3) else: R = Rot.from_rotvec(rotation_vector).as_matrix() # Apply rotation rotated_points = (R @ current_points.T).T rotated_points = rotated_points / np.linalg.norm(rotated_points, axis=1, keepdims=True) # Compute spherical distances dot_products = np.sum(rotated_points * target_points, axis=1) # Clamp to [-1, 1] to handle numerical errors dot_products = np.clip(dot_products, -1.0, 1.0) spherical_distances = np.arccos(dot_products) return np.sum(spherical_distances) # Get initial guess if initial_guess is None: # Use Wahba solution as starting point wahba_result = self.align_to_target_grid(target_coords) # Convert to rotation vector by comparing original and Wahba result wahba_x, wahba_y, wahba_z = geo_to_cart(wahba_result['lon'], wahba_result['lat']) wahba_points = np.array([wahba_x, wahba_y, wahba_z]).T wahba_points = wahba_points / np.linalg.norm(wahba_points, axis=1, keepdims=True) # Find rotation from current to Wahba result H = current_points.T @ wahba_points U, S, Vt = np.linalg.svd(H) R_init = Vt.T @ U.T if np.linalg.det(R_init) < 0: Vt_corrected = Vt.copy() Vt_corrected[-1, :] *= -1 R_init = Vt_corrected.T @ U.T initial_guess = Rot.from_matrix(R_init).as_rotvec() # Optimize using numerical methods result = minimize( spherical_distance_objective, initial_guess, method=method, options={'ftol': 1e-9, 'gtol': 1e-9} ) # Apply optimal rotation if np.linalg.norm(result.x) < 1e-10: optimal_R = np.eye(3) else: optimal_R = Rot.from_rotvec(result.x).as_matrix() final_points = (optimal_R @ current_points.T).T final_points = final_points / np.linalg.norm(final_points, axis=1, keepdims=True) # Convert back to spherical coordinates new_lon, new_lat = cart_to_geo(final_points[:, 0], final_points[:, 1], final_points[:, 2]) return { 'lon': new_lon, 'lat': new_lat, 'success': result.success, 'message': result.message, 'nfev': result.nfev, 'fun': result.fun # Final spherical distance sum }
[docs] def map_to_regular_grid(self, target_grid): """ Creates a one-to-one mapping between Fekete grid points and regular grid points. Each Fekete point is mapped to its nearest available regular grid point, ensuring no regular grid point is used twice. Results maintain original Fekete grid order. Parameters ---------- target_grid : dict Dictionary with 'lon' and 'lat' keys containing arrays of regular grid coordinates Returns ------- dict Dictionary containing: - 'regular_coords': corresponding regular grid coordinates (lon, lat pairs) - 'distances': distances between mapped points - 'unmapped_indices': indices of Fekete points that couldn't be mapped Raises ------ ValueError If no grid has been created yet """ if self.grid is None: raise ValueError("No Fekete grid exists. Call create_grid() first.") # Extract and sort unique coordinates from target regular grid target_lons = np.sort(np.unique(target_grid['lon'])) target_lats = np.sort(np.unique(target_grid['lat'])) num_fekete_points = len(self.grid['lat']) candidate_distances = [] candidate_coordinates = [] # PHASE 1: Find candidate regular grid points for each Fekete point for fekete_idx in range(num_fekete_points): fekete_lon = self.grid['lon'][fekete_idx] fekete_lat = self.grid['lat'][fekete_idx] # Find longitude/latitude intervals (same logic as original) lon_products = (target_lons[:-1] - fekete_lon) * (target_lons[1:] - fekete_lon) lat_products = (target_lats[:-1] - fekete_lat) * (target_lats[1:] - fekete_lat) # Determine bounds lon_negative_indices = np.where(lon_products < 0)[0] if len(lon_negative_indices) == 0: bounding_lons = [target_lons[0], target_lons[-1]] else: lon_idx = lon_negative_indices[0] bounding_lons = [target_lons[lon_idx], target_lons[lon_idx + 1]] lat_negative_indices = np.where(lat_products < 0)[0] if len(lat_negative_indices) == 0: bounding_lats = [target_lats[0], target_lats[-1]] else: lat_idx = lat_negative_indices[0] bounding_lats = [target_lats[lat_idx], target_lats[lat_idx + 1]] # Calculate distances (same as original) distances = [] coordinates = [] for lon in bounding_lons: for lat in bounding_lats: distance = geo_distance(lon, lat, fekete_lon, fekete_lat) distances.append(distance) coordinates.append((lon, lat)) # Sort by distance distance_order = np.argsort(distances) candidate_distances.append(np.array(distances)[distance_order]) candidate_coordinates.append(np.array(coordinates)[distance_order]) # PHASE 2: Assignment - using original algorithm but with pre-allocated arrays # Pre-allocate result arrays for better performance assigned_coords = [] assigned_distances = [] unmapped_indices = [] used_coord_set = set() # Fast O(1) lookup instead of O(n) list search # Process in order of nearest distance (like original) nearest_distances = np.array([distances[0] for distances in candidate_distances]) processing_order = np.argsort(nearest_distances) for fekete_idx in processing_order: assigned = False # Try each candidate coordinate for candidate_idx in range(len(candidate_coordinates[fekete_idx])): coord = tuple(candidate_coordinates[fekete_idx][candidate_idx]) # Convert to tuple for set if coord not in used_coord_set: # Assign this coordinate assigned_coords.append(coord) assigned_distances.append(candidate_distances[fekete_idx][candidate_idx]) used_coord_set.add(coord) assigned = True break if not assigned: unmapped_indices.append(fekete_idx) warnings.warn( f'No available regular grid points for Fekete point at ' f'({self.grid["lon"][fekete_idx]:.3f}, {self.grid["lat"][fekete_idx]:.3f}). ' f'Point will be unmapped.' ) # PHASE 3: Restore original Fekete grid order (SIMPLIFIED) mapped_fekete_indices = [idx for idx in processing_order if idx not in unmapped_indices] if len(mapped_fekete_indices) > 0: # Create inverse mapping to restore original order order_map = {fekete_idx: i for i, fekete_idx in enumerate(mapped_fekete_indices)} restore_order = [order_map[idx] for idx in sorted(mapped_fekete_indices)] final_regular_coords = np.array(assigned_coords)[restore_order] final_distances = np.array(assigned_distances)[restore_order] mapped_fekete_indices = np.array(sorted(mapped_fekete_indices)) else: final_regular_coords = np.array([]).reshape(0, 2) final_distances = np.array([]) mapped_fekete_indices = np.array([]) return { # 'fekete_indices': mapped_fekete_indices, 'regular_coords': final_regular_coords, 'distances': final_distances, 'unmapped_indices': np.array(unmapped_indices) }
def grid_num_to_distance(num_points: int) -> float: """Convert number of grid points to distance between points. Calculates the corresponding distance between points (in km) for a given number of grid points on an equidistant Earth grid. Args: num_points: Number of points on the grid Returns: Distance between adjacent points in km Raises: ValueError: If num_points is not positive Example: >>> distance = grid_num_to_distance(1000) >>> print(f"Distance: {distance:.2f} km") """ if hasattr(num_points, 'item'): # Handle numpy array elements num_points = num_points.item() if not isinstance(num_points, int) or num_points <= 0: raise ValueError("num_points must be a positive integer") k = -2.01155176 # Same k as in distance_to_grid_num a = np.exp(20.0165958) # Same a as in distance_to_grid_num return (num_points/a)**(1/k) # Inverse formula: distance = (num_points/a)^(1/k) def distance_to_grid_num(distance: float) -> int: """Convert distance between points to required number of grid points. Calculates how many grid points are needed to achieve a specific distance between adjacent points (in km) on an equidistant Earth grid. Args: distance: Target distance between grid points in km Returns: Number of grid points required Raises: ValueError: If distance is not positive Example: >>> # For 0.25 degree spacing (≈ 27.75 km at equator) >>> num_points = distance_to_grid_num(111*0.25) # 111 km per degree at equator >>> print(f"Points needed: {num_points}") """ if hasattr(distance, 'item'): # Handle numpy array elements distance = distance.item() if not isinstance(distance, (int, float)) or distance <= 0: raise ValueError("distance must be a positive number") # Constants from log-log fit of grid data k = -2.01155176 a = np.exp(20.0165958) return int(a * distance**k) # def regular_lon_lat(num_lon, num_lat): # creates regular grid with borders half the distance of one step at each border # lon = np.linspace(-180+360/(2*num_lon),180-360/(2*num_lon),num_lon) # lat = np.linspace(-90 + 180/(2*num_lat), 90 - 180/(2*num_lat), num_lat) # return lon, lat def cart_to_geo(x: Union[float, np.ndarray], y: Union[float, np.ndarray], z: Union[float, np.ndarray]) -> Tuple[Union[float, np.ndarray], Union[float, np.ndarray]]: """Convert 3D Cartesian coordinates to geographic coordinates (longitude and latitude). Args: x: X coordinate in 3D Cartesian space (unit sphere) y: Y coordinate in 3D Cartesian space (unit sphere) z: Z coordinate in 3D Cartesian space (unit sphere) Returns: Tuple of (longitude, latitude) in degrees - longitude: [-180, 180] degrees - latitude: [-90, 90] degrees Note: Input coordinates should be on the unit sphere (x² + y² + z² = 1) """ lat = np.degrees(np.arcsin(z)) lon = np.degrees(np.arctan2(y, x)) return lon, lat def geo_to_cart(lon: Union[float, np.ndarray], lat: Union[float, np.ndarray], radius: float = 1) -> Tuple[Union[float, np.ndarray], Union[float, np.ndarray], Union[float, np.ndarray]]: """Convert geographic coordinates (longitude and latitude) to 3D Cartesian coordinates. Args: lon: Longitude in degrees [-180, 180] lat: Latitude in degrees [-90, 90] radius: Radius of the sphere, default is 1 (unit sphere) Returns: Tuple of (x, y, z) coordinates in 3D Cartesian space """ if radius <= 0: raise ValueError("radius must be positive") lon_rad = np.deg2rad(lon) lat_rad = np.deg2rad(lat) x = radius * np.cos(lon_rad) * np.cos(lat_rad) y = radius * np.sin(lon_rad) * np.cos(lat_rad) z = radius * np.sin(lat_rad) return x, y, z @np.vectorize def geo_distance(lon1: Union[float, np.ndarray], lat1: Union[float, np.ndarray], lon2: Union[float, np.ndarray], lat2: Union[float, np.ndarray], radius: float = 6371) -> Union[float, np.ndarray]: """Calculate the great-circle distance between two points on a sphere. Uses the haversine formula to compute the shortest distance over the earth's surface. This implementation uses arctan2 instead of arcsin for better numerical stability, especially for very small distances. Args: lon1: Longitude of first point in degrees lat1: Latitude of first point in degrees lon2: Longitude of second point in degrees lat2: Latitude of second point in degrees radius: Radius of the sphere in km, default is Earth's radius (6371 km) Returns: Distance between points in the same units as radius Raises: ValueError: If radius is not positive Note: This function is automatically vectorized for numpy arrays. """ if radius <= 0: raise ValueError("radius must be positive") # Convert to radians lon1_rad, lat1_rad, lon2_rad, lat2_rad = map(np.radians, [lon1, lat1, lon2, lat2]) dlon = lon2_rad - lon1_rad dlat = lat2_rad - lat1_rad # Haversine formula a = np.sin(dlat / 2) ** 2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon / 2) ** 2 c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a)) return radius * c def deg_to_equatorial_distance(grid_step: float, radius: float = 6371) -> float: """Convert angular grid resolution (degrees) to equivalent distance at the equator (km). Args: grid_step: Angular resolution/spacing in degrees radius: Radius of the sphere in km, default is Earth's radius (6371 km) Returns: Equivalent spatial distance/spacing in kilometers at the equator Raises: ValueError: If grid_step is not positive """ if grid_step <= 0: raise ValueError("grid_step must be positive") return geo_distance(0, 0, grid_step, 0, radius=radius) def equatorial_distance_to_deg(distance: float, radius: float = 6371) -> float: """Convert equatorial distance (km) to equivalent angular grid resolution (degrees). Args: distance: Spatial distance/spacing in kilometers at the equator radius: Radius of the sphere in km, default is Earth's radius (6371 km) Returns: Equivalent angular resolution/spacing in degrees Raises: ValueError: If distance is not positive """ if distance <= 0: raise ValueError("distance must be positive") return distance * 360 / (2 * np.pi * radius) # def min_dists(grid1, grid2 = None): TODO: implement grid2 but I dont know why need it # if grid2 is None: # lon1, lon2 = grid1['lon'], grid1['lon'] # lat1, lat2 = grid1['lat'], grid1['lat'] # d = 9999 * np.ones((len(lon1),len(lon2))) # for i in range(len(lon1)): # for j in range((len(lon1))): # if i < j: # d[i,j] = geo_distance(lon1[i], lat1[i], lon2[j], lat2[j]) # elif i>j: # d[i,j] = d[j,i] # return d.min(axis=1) # else: # min dist from self.grid point to other grid # lon1, lon2 = grid1['lon'], grid2['lon'] # lat1, lat2 = grid1['lat'], grid2['lat'] # d = 9999 * np.ones((len(lon1),len(lon2))) # for i in range(len(lon1)): # for j in range(len(lon2)): # d[i,j] = geo_distance(lon1[i], lat1[i], lon2[j], lat2[j]) # return d.min(axis=1) def neighbour_distance(grid: Dict[str, np.ndarray]) -> np.ndarray: """Calculate the distance to the nearest neighboring point for each point in the grid. This function computes the nearest neighbor distance for each point, which is useful for analyzing grid uniformity and identifying spatial outliers. Args: grid: Grid dictionary with 'lon' and 'lat' keys containing coordinates in degrees Returns: Array of minimum distances (in km) to the nearest neighbor for each point """ lon = np.asarray(grid['lon']) lat = np.asarray(grid['lat']) n_points = len(lon) # Initialize distance matrix with NaN values d = np.full((n_points, n_points), np.nan) # Calculate distances between all pairs of points (only upper triangle for efficiency) for i in range(n_points): for j in range(i+1, n_points): distance = geo_distance(lon[i], lat[i], lon[j], lat[j]) d[i, j] = distance d[j, i] = distance # Mirror the distance matrix # Set diagonal to infinity to exclude self-distances np.fill_diagonal(d, np.inf) # Find minimum distance for each point return np.nanmin(d, axis=1)