Source code for squidpy.gr._ripley

"""Functions for point patterns spatial statistics."""

from __future__ import annotations

from typing import (
    TYPE_CHECKING,
    Literal,
    Union,  # noqa: F401
)

import numpy as np
import pandas as pd
from anndata import AnnData
from numpy.random import default_rng
from scanpy import logging as logg
from scipy.spatial import ConvexHull, Delaunay
from scipy.spatial.distance import pdist
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import LabelEncoder
from spatialdata import SpatialData

from squidpy._constants._constants import RipleyStat
from squidpy._constants._pkg_constants import Key
from squidpy._docs import d, inject_docs
from squidpy._utils import NDArrayA
from squidpy.gr._utils import _assert_categorical_obs, _assert_spatial_basis, _save_data

__all__ = ["ripley"]


[docs] @d.dedent @inject_docs(key=Key.obsm.spatial, rp=RipleyStat) def ripley( adata: AnnData | SpatialData, cluster_key: str, mode: Literal["F", "G", "L"] = "F", spatial_key: str = Key.obsm.spatial, metric: str = "euclidean", n_neigh: int = 2, n_simulations: int = 100, n_observations: int = 1000, max_dist: float | None = None, n_steps: int = 50, seed: int | None = None, copy: bool = False, ) -> dict[str, pd.DataFrame | NDArrayA]: r""" Calculate various Ripley's statistics for point processes. According to the `'mode'` argument, it calculates one of the following Ripley's statistics: `{rp.F.s!r}`, `{rp.G.s!r}` or `{rp.L.s!r}` statistics. `{rp.F.s!r}`, `{rp.G.s!r}` are defined as: .. math:: F(t),G(t)=P( d_{{i,j}} \le t ) Where :math:`d_{{i,j}}` represents: - distances to a random Spatial Poisson Point Process for `{rp.F.s!r}`. - distances to any other point of the dataset for `{rp.G.s!r}`. `{rp.L.s!r}` we first need to compute :math:`K(t)`, which is defined as: .. math:: K(t) = \frac{{1}}{{\lambda}} \sum_{{i \ne j}} \frac{{I(d_{{i,j}}<t)}}{{n}} and then we apply a variance-stabilizing transformation: .. math:: L(t) = (\frac{{K(t)}}{{\pi}})^{{1/2}} Parameters ---------- %(adata)s %(cluster_key)s mode Which Ripley's statistic to compute. %(spatial_key)s metric Which metric to use for computing distances. For available metrics, check out :class:`sklearn.neighbors.DistanceMetric`. n_neigh Number of neighbors to consider for the KNN graph. n_simulations How many simulations to run for computing p-values. n_observations How many observations to generate for the Spatial Poisson Point Process. max_dist Maximum distances for the support. If `None`, `max_dist=`:math:`\sqrt{{area \over 2}}`. n_steps Number of steps for the support. %(seed)s %(copy)s Returns ------- %(ripley_stat_returns)s References ---------- For reference, check out `Wikipedia <https://en.wikipedia.org/wiki/Spatial_descriptive_statistics#Ripley's_K_and_L_functions>`_ or :cite:`Baddeley2015-lm`. """ if isinstance(adata, SpatialData): adata = adata.table _assert_categorical_obs(adata, key=cluster_key) _assert_spatial_basis(adata, key=spatial_key) coordinates = adata.obsm[spatial_key] clusters = adata.obs[cluster_key].values mode = RipleyStat(mode) # type: ignore[assignment] if TYPE_CHECKING: assert isinstance(mode, RipleyStat) # prepare support N = coordinates.shape[0] hull = ConvexHull(coordinates) area = hull.volume if max_dist is None: max_dist = (area / 2) ** 0.5 support = np.linspace(0, max_dist, n_steps) # prepare labels le = LabelEncoder().fit(clusters) cluster_idx = le.transform(clusters) obs_arr = np.empty((le.classes_.shape[0], n_steps)) start = logg.info( f"Calculating Ripley's {mode} statistic for `{le.classes_.shape[0]}` clusters and `{n_simulations}` simulations" ) for i in np.arange(np.max(cluster_idx) + 1): coord_c = coordinates[cluster_idx == i, :] if mode == RipleyStat.F: random = _ppp(hull, n_simulations=1, n_observations=n_observations, seed=seed) tree_c = NearestNeighbors(metric=metric, n_neighbors=n_neigh).fit(coord_c) distances, _ = tree_c.kneighbors(random, n_neighbors=n_neigh) bins, obs_stats = _f_g_function(distances.squeeze(), support) elif mode == RipleyStat.G: tree_c = NearestNeighbors(metric=metric, n_neighbors=n_neigh).fit(coord_c) distances, _ = tree_c.kneighbors(coordinates[cluster_idx != i, :], n_neighbors=n_neigh) bins, obs_stats = _f_g_function(distances.squeeze(), support) elif mode == RipleyStat.L: distances = pdist(coord_c, metric=metric) bins, obs_stats = _l_function(distances, support, N, area) else: raise NotImplementedError(f"Mode `{mode.s!r}` is not yet implemented.") obs_arr[i] = obs_stats sims = np.empty((n_simulations, len(bins))) pvalues = np.ones((le.classes_.shape[0], len(bins))) for i in range(n_simulations): random_i = _ppp(hull, n_simulations=1, n_observations=n_observations, seed=seed) if mode == RipleyStat.F: tree_i = NearestNeighbors(metric=metric, n_neighbors=n_neigh).fit(random_i) distances_i, _ = tree_i.kneighbors(random, n_neighbors=1) _, stats_i = _f_g_function(distances_i.squeeze(), support) elif mode == RipleyStat.G: tree_i = NearestNeighbors(metric=metric, n_neighbors=n_neigh).fit(random_i) distances_i, _ = tree_i.kneighbors(coordinates, n_neighbors=1) _, stats_i = _f_g_function(distances_i.squeeze(), support) elif mode == RipleyStat.L: distances_i = pdist(random_i, metric=metric) _, stats_i = _l_function(distances_i, support, N, area) else: raise NotImplementedError(f"Mode `{mode.s!r}` is not yet implemented.") for j in range(obs_arr.shape[0]): pvalues[j] += stats_i >= obs_arr[j] sims[i] = stats_i pvalues /= n_simulations + 1 pvalues = np.minimum(pvalues, 1 - pvalues) obs_df = _reshape_res(obs_arr.T, columns=le.classes_, index=bins, var_name=cluster_key) sims_df = _reshape_res(sims.T, columns=np.arange(n_simulations), index=bins, var_name="simulations") res = {f"{mode}_stat": obs_df, "sims_stat": sims_df, "bins": bins, "pvalues": pvalues} if TYPE_CHECKING: assert isinstance(res, dict) if copy: logg.info("Finish", time=start) return res _save_data(adata, attr="uns", key=Key.uns.ripley(cluster_key, mode), data=res, time=start)
def _reshape_res(results: NDArrayA, columns: NDArrayA | list[str], index: NDArrayA, var_name: str) -> pd.DataFrame: df = pd.DataFrame(results, columns=columns, index=index) df.index.set_names(["bins"], inplace=True) df = df.melt(var_name=var_name, value_name="stats", ignore_index=False) df[var_name] = df[var_name].astype("category", copy=True) df.reset_index(inplace=True) return df def _f_g_function(distances: NDArrayA, support: NDArrayA) -> tuple[NDArrayA, NDArrayA]: counts, bins = np.histogram(distances, bins=support) fracs = np.cumsum(counts) / counts.sum() return bins, np.concatenate((np.zeros((1,), dtype=float), fracs)) def _l_function(distances: NDArrayA, support: NDArrayA, n: int, area: float) -> tuple[NDArrayA, NDArrayA]: n_pairs_less_than_d = (distances < support.reshape(-1, 1)).sum(axis=1) intensity = n / area k_estimate = ((n_pairs_less_than_d * 2) / n) / intensity l_estimate = np.sqrt(k_estimate / np.pi) return support, l_estimate def _ppp(hull: ConvexHull, n_simulations: int, n_observations: int, seed: int | None = None) -> NDArrayA: """ Simulate Poisson Point Process on a polygon. Parameters ---------- hull Convex hull of the area of interest. n_simulations Number of simulated point processes. n_observations Number of observations to sample from each simulation. seed Random seed. Returns ------- An Array with shape ``(n_simulation, n_observations, 2)``. """ rng = default_rng(None if seed is None else seed) vxs = hull.points[hull.vertices] deln = Delaunay(vxs) bbox = np.array([*vxs.min(0), *vxs.max(0)]) result = np.empty((n_simulations, n_observations, 2)) for i_sim in range(n_simulations): i_obs = 0 while i_obs < n_observations: x, y = ( rng.uniform(bbox[0], bbox[2]), rng.uniform(bbox[1], bbox[3]), ) if deln.find_simplex((x, y)) >= 0: result[i_sim, i_obs] = (x, y) i_obs += 1 return result.squeeze()