Source code for dominosee.eca

import json
import numpy as np
from scipy.stats import binom
from numba import njit, prange, vectorize, guvectorize
import xarray as xr

@guvectorize(["void(boolean[:], boolean[:], uint16[:])"], "(n),(n)->()", nopython=True)
def _eca_precursor(b1w, b2, KRprec):
    KRprec[0] = np.sum(b1w & b2)

@guvectorize(["void(boolean[:], boolean[:], uint16[:])"], "(n),(n)->()", nopython=True)
def _eca_trigger(b1, b2wr, KRtrig):
    KRtrig[0] = np.sum(b1 & b2wr)

@njit(parallel=True)
def _eca_precursors_pair_njit(b1w, b2):
    result = np.zeros((b2.shape[0], b1w.shape[0]), dtype=np.uint16)
    m = b2.shape[0]     # 第一数组的第一维
    l = b1w.shape[0]    # 第二数组的第一维
    
    for i in prange(m):
        for j in range(l):
            result[i, j] = np.sum(b2[i, :] & b1w[j, :])
    return result

@njit(parallel=True)
def _eca_triggers_pair_njit(b1, b2wr):
    result = np.zeros((b1.shape[0], b2wr.shape[0]), dtype=np.uint16)
    m = b1.shape[0]      # 第一数组的第一维
    l = b2wr.shape[0]    # 第二数组的第一维
    
    for i in prange(m):
        for j in range(l):
            result[i, j] = np.sum(b1[i, :] & b2wr[j, :])
    return result


def _forward_window(time_series, delt=2, sym=True, tau=0):
    """
    Create a forward (precursor) window for Event Coincidence Analysis.
    
    Parameters
    ----------
    time_series : ndarray
        1D array representing a binary event series (0s and 1s)
    delt : int, default=2
        Size of the ECA window
    sym : bool, default=True
        Whether the ECA window is symmetric
    tau : int, default=0
        Delay parameter for the window (must be non-negative)
        
    Returns
    -------
    ndarray
        Binary array indicating presence of events in forward window
        
    Notes
    -----
    This function creates a window used for calculating precursors in ECA.
    It performs a convolution operation followed by thresholding.
    """
    # Input validation
    time_series = np.asarray(time_series)
    if time_series.ndim != 1:
        raise ValueError("time_series must be a 1D array")
    if tau < 0:
        raise ValueError("delay must be non-negative")
    if delt < 0:
        raise ValueError("delt must be non-negative")
        
    # Create window
    window = np.ones((1 + 1 * sym) * delt + 1)
    
    # Apply window
    if delt == 0:
        result = time_series.copy()
    else:
        result = (np.convolve(time_series, window)[sym*delt:-delt] >= 0.5)
    
    # Apply delay
    if tau > 0:
        result = np.roll(result, tau)
        result[:tau] = False
        
    return result

def _backward_window(time_series, delt=2, sym=True, tau=0):
    """
    Create a backward (trigger) window for Event Coincidence Analysis.
    
    Parameters
    ----------
    time_series : ndarray
        1D array representing a binary event series (0s and 1s)
    delt : int, default=2
        Size of the ECA window
    sym : bool, default=True
        Whether the ECA window is symmetric
    tau : int, default=0
        Delay parameter for the window (must be non-negative)
        
    Returns
    -------
    ndarray
        Binary array indicating presence of events in backward window
        
    Notes
    -----
    This function creates a window used for calculating triggers in ECA.
    If symmetric=True, it's the same as forward_window, otherwise it uses
    a different slice of the convolution.
    """
    # Input validation
    time_series = np.asarray(time_series)
    if time_series.ndim != 1:
        raise ValueError("time_series must be a 1D array")
    if tau < 0:
        raise ValueError("tau must be non-negative")
    if delt < 0:
        raise ValueError("delt must be non-negative")
    
    # If symmetric, we can reuse the forward window logic
    if sym:
        result = _forward_window(time_series, delt, sym, tau)
    else:
        # Create window
        window = np.ones(delt + 1)
        
        # Apply window with different slicing
        result = (np.convolve(time_series, window)[delt:] >= 0.5)
    
    # Apply delay in opposite direction for backward window
    if tau > 0:
        result = np.roll(result, -tau)
        result[-tau:] = False
        
    return result

# Keep the original function for backward compatibility
def eca_window_legacy(b, delt=2, sym=True, tau=0):
    """Legacy version of eca_window function for backward compatibility."""
    return _forward_window(b, delt, sym, tau), _backward_window(b, delt, sym, tau)


[docs] def get_eca_precursor_window(da: xr.DataArray, delt: int=2, sym: bool=True, tau: int=0) -> xr.DataArray: da_precursor_window = xr.apply_ufunc(_forward_window, da, input_core_dims=[["time"]], output_core_dims=[["time"]], vectorize=True, dask="parallelized", kwargs={"delt": delt, "sym": sym, "tau": tau}) da_precursor_window.attrs = {'long_name': 'Precursor Window', 'units': '1', 'description': 'Window for precursor event identification', "eca_params": json.dumps({"delt": delt, "sym": sym, "tau": tau})} return da_precursor_window
[docs] def get_eca_trigger_window(da: xr.DataArray, delt: int=2, sym: bool=True, tau: int=0) -> xr.DataArray: da_trigger_window = xr.apply_ufunc(_backward_window, da, input_core_dims=[["time"]], output_core_dims=[["time"]], vectorize=True, dask="parallelized", kwargs={"delt": delt, "sym": sym, "tau": tau}) da_trigger_window.attrs = {'long_name': 'Trigger Window', 'units': '1', 'description': 'Window for trigger event identification', "eca_params": json.dumps({"delt": delt, "sym": sym, "tau": tau})} return da_trigger_window
[docs] def get_eca_precursor(eventA_precursor_window: xr.DataArray, eventB: xr.DataArray, func="njit") -> xr.DataArray: # if long_name of eventA_precursor_window is not "Precursor Window", raise ValueError if eventA_precursor_window.attrs["long_name"] != "Precursor Window": raise ValueError("long_name of eventA_precursor_window must be 'Precursor Window'") else: eca_params = eventA_precursor_window.attrs["eca_params"] # append spatial dimension with location order A => B eventB = eventB.rename({var: f"{var}B" for var in np.setdiff1d(eventB.dims, "time")}) spatial_dimB = np.setdiff1d(eventB.dims, "time") eventA_precursor_window = eventA_precursor_window.rename({var: f"{var}A" for var in np.setdiff1d(eventA_precursor_window.dims, "time")}) spatial_dimA = np.setdiff1d(eventA_precursor_window.dims, "time") if func == "njit": da_precursor = xr.apply_ufunc(_eca_precursors_pair_njit, eventA_precursor_window, eventB, vectorize=True, input_core_dims=[[spatial_dimA[-1], "time"], [spatial_dimB[-1], "time"]], output_core_dims=[[spatial_dimA[-1], spatial_dimB[-1]]], dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True}, output_dtypes=[np.uint16], ) else: da_precursor = xr.apply_ufunc(_eca_precursor, eventA_precursor_window, eventB, vectorize=False, input_core_dims=[["time"], ["time"]], output_core_dims=[[]], dask="parallelized",) da_precursor.attrs = {'long_name': 'Precursor Events', 'units': 'count', 'description': 'Number of precursor events (from location A to location B) in location B', "eca_params": eca_params} return da_precursor
[docs] def get_eca_trigger(eventA_trigger_window: xr.DataArray, eventB: xr.DataArray, func="njit") -> xr.DataArray: # if long_name of eventA_trigger_window is not "Trigger Window", raise ValueError if eventA_trigger_window.attrs["long_name"] != "Trigger Window": raise ValueError("long_name of eventA_trigger_window must be 'Trigger Window'") else: eca_params = eventA_trigger_window.attrs["eca_params"] # append spatial dimension with location order A => B eventB = eventB.rename({var: f"{var}B" for var in np.setdiff1d(eventB.dims, "time")}) spatial_dimB = np.setdiff1d(eventB.dims, "time") eventA_trigger_window = eventA_trigger_window.rename({var: f"{var}A" for var in np.setdiff1d(eventA_trigger_window.dims, "time")}) spatial_dimA = np.setdiff1d(eventA_trigger_window.dims, "time") if func == "njit": da_trigger = xr.apply_ufunc(_eca_triggers_pair_njit, eventA_trigger_window, eventB, vectorize=True, input_core_dims=[[spatial_dimA[-1], "time"], [spatial_dimB[-1], "time"]], output_core_dims=[[spatial_dimA[-1], spatial_dimB[-1]]], dask="parallelized", dask_gufunc_kwargs={"allow_rechunk": True}, output_dtypes=[np.uint16], ) else: da_trigger = xr.apply_ufunc(_eca_trigger, eventA_trigger_window, eventB, vectorize=False, input_core_dims=[["time"], ["time"]], output_core_dims=[[]], dask="parallelized",) da_trigger.attrs = {'long_name': 'Trigger Events', 'units': 'count', 'description': 'Number of trigger events (from location A to location B) in location A', "eca_params": eca_params} return da_trigger
[docs] def get_eca_precursor_from_events(eventA: xr.DataArray, eventB: xr.DataArray, delt: int=2, sym: bool=True, tau: int=0, func="njit") -> xr.DataArray: """ Calculate precursor events from eventA (events in location A) to eventB (events in location B) based on event coincidence analysis. Parameters ---------- eventA : xr.DataArray Binary event time series at location A eventB : xr.DataArray Binary event time series at location B delt : int, optional Length of the coincidence window, by default 2 sym : bool, optional If True, use symmetric window, by default True tau : int, optional Time lag from eventA to eventB, by default 0 func : str, optional Function to use for calculation - "njit" for numba optimized, or "guvectorize" for gufunc, by default "njit" Returns ------- xr.DataArray Number of precursor events from location A to location B """ # get precursor window eventA_precursor_window = get_eca_precursor_window(eventA, delt, sym, tau) # get precursor da_precursor = get_eca_precursor(eventA_precursor_window, eventB, func) return da_precursor
[docs] def get_eca_trigger_from_events(eventA: xr.DataArray, eventB: xr.DataArray, delt: int=2, sym: bool=True, tau: int=0, func="njit") -> xr.DataArray: """ Calculate trigger events from eventA (events in location A) to eventB (events in location B) based on event coincidence analysis. Parameters ---------- eventA : xr.DataArray Binary event time series at location A eventB : xr.DataArray Binary event time series at location B delt : int, optional Length of the coincidence window, by default 2 sym : bool, optional If True, use symmetric window, by default True tau : int, optional Time lag from eventA to eventB, by default 0 func : str, optional Function to use for calculation - "njit" for numba optimized, or "guvectorize" for gufunc, by default "njit" """ # get trigger window eventA_trigger_window = get_eca_trigger_window(eventA, delt, sym, tau) # get trigger da_trigger = get_eca_trigger(eventA_trigger_window, eventB, func) return da_trigger
[docs] def get_eca_precursor_confidence(precursor: xr.DataArray, eventA: xr.DataArray, eventB: xr.DataArray, min_eventnum: int=2) -> xr.DataArray: # get eca_params from precursor eca_params = json.loads(precursor.attrs["eca_params"]) TOL = eca_params["delt"] * eca_params["sym"] + 1 tau = eca_params["tau"] T = eventA.sizes["time"] # examine # get NA, NB NA = eventA.sum(dim="time").rename({f"{var}": f"{var}A" for var in np.setdiff1d(eventA.dims, "time")}) NB = eventB.sum(dim="time").rename({f"{var}": f"{var}B" for var in np.setdiff1d(eventB.dims, "time")}) # assert NA, NB are in the coordinates of precursor assert np.all(np.isin(NA.dims, precursor.dims)) assert np.all(np.isin(NB.dims, precursor.dims)) prec_conf = xr.apply_ufunc(prec_confidence, precursor, NA, NB, dask="parallelized", kwargs={"TOL": TOL, "T": T, "tau": tau}).rename("prec_conf") if min_eventnum > 0: prec_conf = prec_conf.where(NA >= min_eventnum, 0.0) prec_conf = prec_conf.where(NB >= min_eventnum, 0.0) prec_conf.attrs = {'long_name': 'Precursor confidence', 'units': "", 'description': 'Confidence of precursor rates (from location A to location B) in location B', "eca_params": precursor.attrs["eca_params"]} return prec_conf
[docs] def get_eca_trigger_confidence(trigger: xr.DataArray, eventA: xr.DataArray, eventB: xr.DataArray, min_eventnum: int=2) -> xr.DataArray: # get eca_params from trigger eca_params = json.loads(trigger.attrs["eca_params"]) TOL = eca_params["delt"] * eca_params["sym"] + 1 tau = eca_params["tau"] T = eventA.sizes["time"] # examine # get NA, NB NA = eventA.sum(dim="time").rename({f"{var}": f"{var}A" for var in np.setdiff1d(eventA.dims, "time")}) NB = eventB.sum(dim="time").rename({f"{var}": f"{var}B" for var in np.setdiff1d(eventB.dims, "time")}) # calculate confidence trigger_conf = xr.apply_ufunc(trig_confidence, trigger, NA, NB, dask="parallelized", kwargs={"TOL": TOL, "T": T, "tau": tau}).rename("trigger_conf") if min_eventnum > 0: trigger_conf = trigger_conf.where(NA >= min_eventnum, 0.0) trigger_conf = trigger_conf.where(NB >= min_eventnum, 0.0) trigger_conf.attrs = {'long_name': 'Trigger confidence', 'units': "", 'description': 'Confidence of trigger rates (from location A to location B) in location A', "eca_params": trigger.attrs["eca_params"]} return trigger_conf
""" Legacy stack-style calculation """ # @njit(parallel=False) # def eca(b1, b2, b1w, b2wr, dtype='uint16'): # # TODO: 拆分成precursor & trigger 两个函数;因为有时只需要一种 # KRprec = np.zeros((b1.shape[0], b2.shape[0]), dtype=dtype) # precursor rates # KRtrig = np.zeros((b1.shape[0], b2.shape[0]), dtype=dtype) # triggering rates # for j in range(b1.shape[0]): # for k in range(b2.shape[0]): # TODO: 规范化代码时考虑只在这一层使用多核,对比一下速度;避免MPI,从而避免稀疏要求 # KRprec[j, k] = np.sum(b2[k, :] & b1w[j, :]) # precursor: b1 => (b2) # KRtrig[j, k] = np.sum(b1[j, :] & b2wr[k, :]) # trigger: (b1) => b2 # return KRprec, KRtrig # @njit(parallel=True) # def eca_parallel(b1, b2, b1w, b2wr, dtype='uint16'): # KRprec = np.zeros((b1.shape[0], b2.shape[0]), dtype=dtype) # precursor rates # KRtrig = np.zeros((b1.shape[0], b2.shape[0]), dtype=dtype) # triggering rates # for j in prange(b1.shape[0]): # for k in range(b2.shape[0]): # TODO: 规范化代码时考虑只在这一层使用多核,对比一下速度;避免MPI,从而避免稀疏要求 # KRprec[j, k] = np.sum(b2[k, :] & b1w[j, :]) # b1 => (b2) # KRtrig[j, k] = np.sum(b1[j, :] & b2wr[k, :]) # (b1) => b2 # return KRprec, KRtrig # def eca_dataset(b1: xr.DataArray, b2: xr.DataArray, b1w: xr.DataArray, b2wr: xr.DataArray, dtype=None, parallel=True): # # TODO: 修正loc A/B的命名 # # infer dtype based on the length of time dimension # if dtype is None: # dtype = np.uint8 if b1.shape[0] < 256 else np.uint16 if b1.shape[0] < 65536 else np.uint32 # Cannot use string here in numba # # get the location name from dims # xdim = np.setdiff1d(b1.dims, "time")[0] # layernames = [f"{b1.name}_{xdim}A", f"{b2.name}_{xdim}B"] # # make sure all DataArray is in ("location", "time") coordinate order if not # if b1.dims[0] != 'location': # b1 = b1.transpose('location', 'time') # if b2.dims[0] != 'location': # b2 = b2.transpose('location', 'time') # if b1w.dims[0] != 'location': # b1w = b1w.transpose('location', 'time') # if b2wr.dims[0] != 'location': # b2wr = b2wr.transpose('location', 'time') # # calculate the ECA # if parallel: # ECRprec, ECRtrig = eca_parallel(b1.values, b2.values, b1w.values, b2wr.values, dtype=dtype) # else: # ECRprec, ECRtrig = eca(b1.values, b2.values, b1w.values, b2wr.values, dtype=dtype) # # create DataArray # coords_locA = b1.indexes['location'].rename(["lat_locA", "lon_locA"]) # 这里一定不能用b1.coords['location'] rename,因为不会作用于MultiIndex # coords_locB = b2.indexes['location'].rename(["lat_locB", "lon_locB"]) # ECRprec = xr.DataArray(ECRprec, coords=[coords_locA, coords_locB], dims=layernames, name="prec_evt", # attrs={'long_name': 'Precursor Events', 'units': 'count', 'dtype': dtype.__name__, # 'description': 'Number of precursor events (from location A to location B) in location B', # 'eca_params': b1w.attrs["eca_params"]}) # ECRtrig = xr.DataArray(ECRtrig, coords=[coords_locA, coords_locB], dims=layernames, name="trig_evt", # attrs={'long_name': 'Trigger Events', 'units': 'count', 'dtype': dtype.__name__, # 'description': 'Number of trigger events (from location A to location B) in location A', # 'eca_params': b2wr.attrs["eca_params"]}) # return ECRprec, ECRtrig # def eca_window(b, delt=2, sym=True, tau=0): # """ # b: 一维向量,表示时间序列 # delt: ECA窗口的大小 # sym: ECA窗口是否对称 # tau: ECA窗口的延迟 # return: # bw: 用于计算precursor的窗口 # bwr: 用于计算trigger的窗口 # note: # 这里窗口的计算服务于后续与之对应的点的ECA计算, 因此此处不是反映precursor/trigger的对应点, 而是另外一侧 # """ # if tau < 0: # raise ValueError("tau must be non-negative") # window = np.ones((1 + 1 * sym) * delt + 1) # # 用于precursor计算的窗口 # if delt == 0: # bw = b #.copy() # else: # # bw = np.apply_along_axis(lambda x: np.convolve(x, window)[sym*delt:-delt] >= 0.5, 1, b) # bw = (np.convolve(b, window)[sym*delt:-delt] >= 0.5) # if tau > 0: # bw = np.roll(bw, tau) # bw[:, :tau] = False # # 用于trigger计算的窗口 # if sym: # bwr = bw.copy() # else: # bwr = np.convolve(b, window)[delt:] >= 0.5 # if tau > 0: # bwr = np.roll(bwr, -tau) # bwr[:, -tau:] = False # return bw, bwr # def get_eca_window(da: xr.DataArray, delt: int=2, sym: bool=True, tau: int=0) -> xr.DataArray: # eca_params = {'delt': delt, 'sym': sym, 'tau': tau} # da_prec_window, da_trig_window = xr.apply_ufunc(eca_window, da, input_core_dims=[["time"]], output_core_dims=[["time"], ["time"]], # vectorize=True, dask="parallelized", kwargs=eca_params) # da_prec_window.attrs = {'long_name': 'Precursor Window', 'units': '1', 'description': 'Window for precursor event identification', # "eca_params": json.dumps(eca_params)} # da_trig_window.attrs = {'long_name': 'Trigger Window', 'units': '1', 'description': 'Window for trigger event identification', # "eca_params": json.dumps(eca_params)} # return da_prec_window, da_trig_window """ Confidence calculation """ def prec_confidence(kp, na, nb, TOL, T, tau): return binom.cdf(kp, n=nb, p=1-(1-TOL/(T-tau))**na).astype(np.float32) #不需要reshape,xarray会自动broadcast def trig_confidence(kt, na, nb, TOL, T, tau): return binom.cdf(kt, n=na, p=1-(1-TOL/(T-tau))**nb).astype(np.float32)
[docs] def get_prec_confidence(da_prec: xr.DataArray, da_layerA: xr.DataArray, da_layerB: xr.DataArray, min_eventnum: int=2, time_period=None) -> xr.DataArray: layer_locA = da_prec.dims[0].split("_")[0] layer_locB = da_prec.dims[1].split("_")[0] da_evN_locA = da_layerA.sum(dim="time").rename({"location": f"{layer_locA}_locationA"}).rename({"lat": "lat_locA", "lon": "lon_locA"}) da_evN_locA.indexes[f"{layer_locA}_locationA"].rename({"lat": "lat_locA", "lon": "lon_locA"}, inplace=True) da_evN_locB = da_layerB.sum(dim="time").rename({"location": f"{layer_locB}_locationB"}).rename({"lat": "lat_locB", "lon": "lon_locB"}) da_evN_locB.indexes[f"{layer_locB}_locationB"].rename({"lat": "lat_locB", "lon": "lon_locB"}, inplace=True) eca_params = json.loads(da_prec.attrs["eca_params"]) TOL = eca_params["delt"] * eca_params["sym"] + 1 tau = eca_params["tau"] T = da_layerA.sizes["time"] # examine # print(f"Calculating confidence with TOL={TOL}, tau={tau}; T={T}") prec_sig = xr.apply_ufunc(prec_confidence, da_prec, da_evN_locA, da_evN_locB, dask="parallelized", kwargs={"TOL": TOL, "T": T, "tau": tau}).rename("prec_sig") # prec_sig = prec_sig.compute() # for debug if min_eventnum > 0: prec_sig = prec_sig.where(da_evN_locA >= min_eventnum, 0.0) prec_sig = prec_sig.where(da_evN_locB >= min_eventnum, 0.0) prec_sig.attrs = {'long_name': 'Precursor confidence', 'units': "", 'description': 'Confidence of precursor rates (from location A to location B) in location B', "eca_params": da_prec.attrs["eca_params"]} return prec_sig
[docs] def get_trig_confidence(da_trig: xr.DataArray, da_layerA: xr.DataArray, da_layerB: xr.DataArray, min_eventnum: int=2, time_period=None) -> xr.DataArray: layer_locA = da_trig.dims[0].split("_")[0] layer_locB = da_trig.dims[1].split("_")[0] da_evN_locA = da_layerA.sum(dim="time").rename({"location": f"{layer_locA}_locationA"}).rename({"lat": "lat_locA", "lon": "lon_locA"}) da_evN_locA.indexes[f"{layer_locA}_locationA"].rename({"lat": "lat_locA", "lon": "lon_locA"}, inplace=True) da_evN_locB = da_layerB.sum(dim="time").rename({"location": f"{layer_locB}_locationB"}).rename({"lat": "lat_locB", "lon": "lon_locB"}) da_evN_locB.indexes[f"{layer_locB}_locationB"].rename({"lat": "lat_locB", "lon": "lon_locB"}, inplace=True) eca_params = json.loads(da_trig.attrs["eca_params"]) TOL = eca_params["delt"] * eca_params["sym"] + 1 tau = eca_params["tau"] T = da_layerA.sizes["time"] # examine # print(f"Calculating confidence with TOL={TOL}, tau={tau}; T={T}") trig_sig = xr.apply_ufunc(trig_confidence, da_trig, da_evN_locA, da_evN_locB, dask="parallelized", kwargs={"TOL": TOL, "T": T, "tau": tau}).rename("trig_sig") # trig_sig = trig_sig.compute() # for debug if min_eventnum > 0: trig_sig = trig_sig.where(da_evN_locA >= min_eventnum, 0.0) trig_sig = trig_sig.where(da_evN_locB >= min_eventnum, 0.0) trig_sig.attrs = {'long_name': 'Trigger confidence', 'units': "", 'description': 'Confidence of trigger rates (from location A to location B) in location A', "eca_params": da_trig.attrs["eca_params"]} return trig_sig