Source code for esda.join_counts_local_bv

import numpy as np
import pandas as pd
import warnings
from scipy import sparse
from sklearn.base import BaseEstimator
from libpysal import weights
from esda.crand import (
    crand as _crand_plus,
    njit as _njit,
    _prepare_univariate,
    _prepare_bivariate,
)


PERMUTATIONS = 999


[docs]class Join_Counts_Local_BV(BaseEstimator): """Univariate Local Join Count Statistic"""
[docs] def __init__( self, connectivity=None, permutations=PERMUTATIONS, n_jobs=1, keep_simulations=True, seed=None, island_weight=0, ): """ Initialize a Local_Join_Counts_BV estimator Parameters ---------- connectivity : scipy.sparse matrix object the connectivity structure describing the relationships between observed units. Need not be row-standardized. permutations : int number of random permutations for calculation of pseudo p_values n_jobs : int Number of cores to be used in the conditional randomisation. If -1, all available cores are used. keep_simulations : Boolean (default=True) If True, the entire matrix of replications under the null is stored in memory and accessible; otherwise, replications are not saved seed : None/int Seed to ensure reproducibility of conditional randomizations. Must be set here, and not outside of the function, since numba does not correctly interpret external seeds nor numpy.random.RandomState instances. island_weight: value to use as a weight for the "fake" neighbor for every island. If numpy.nan, will propagate to the final local statistic depending on the `stat_func`. If 0, then the lag is always zero for islands. """ self.connectivity = connectivity self.permutations = permutations self.n_jobs = n_jobs self.keep_simulations = keep_simulations self.seed = seed self.island_weight = island_weight
def fit(self, x, z, case="CLC", n_jobs=1, permutations=999): """ Parameters ---------- x : numpy.ndarray array containing binary (0/1) data z : numpy.ndarray array containing binary (0/1) data Returns ------- the fitted estimator. Notes ----- Technical details and derivations can be found in :cite:`AnselinLi2019`. Examples -------- >>> import libpysal >>> w = libpysal.weights.lat2W(4, 4) >>> x = np.ones(16) >>> x[0:8] = 0 >>> z = [0,1,0,1,1,1,1,1,0,0,1,1,0,0,1,1] >>> LJC_BV_C1 = Local_Join_Counts_BV(connectivity=w).fit(x, z, case="BJC") >>> LJC_BV_C2 = Local_Join_Counts_BV(connectivity=w).fit(x, z, case="CLC") >>> LJC_BV_C1.LJC >>> LJC_BV_C1.p_sim >>> LJC_BV_C2.LJC >>> LJC_BV_C2.p_sim Commpop data replicating GeoDa tutorial (Case 1) >>> import libpysal >>> import geopandas as gpd >>> commpop = gpd.read_file("https://github.com/jeffcsauer/GSOC2020/raw/master/validation/data/commpop.gpkg") >>> w = libpysal.weights.Queen.from_dataframe(commpop) >>> LJC_BV_Case1 = Local_Join_Counts_BV(connectivity=w).fit(commpop['popneg'], commpop['popplus'], case='BJC') >>> LJC_BV_Case1.LJC >>> LJC_BV_Case1.p_sim Guerry data replicating GeoDa tutorial (Case 2) >>> import libpysal >>> import geopandas as gpd >>> guerry = libpysal.examples.load_example('Guerry') >>> guerry_ds = gpd.read_file(guerry.get_path('Guerry.shp')) >>> guerry_ds['infq5'] = 0 >>> guerry_ds['donq5'] = 0 >>> guerry_ds.loc[(guerry_ds['Infants'] > 23574), 'infq5'] = 1 >>> guerry_ds.loc[(guerry_ds['Donatns'] > 10973), 'donq5'] = 1 >>> w = libpysal.weights.Queen.from_dataframe(guerry_ds) >>> LJC_BV_Case2 = Local_Join_Counts_BV(connectivity=w).fit(guerry_ds['infq5'], guerry_ds['donq5'], case='CLC') >>> LJC_BV_Case2.LJC >>> LJC_BV_Case2.p_sim """ # Need to ensure that the np.array() are of # dtype='float' for numba x = np.array(x, dtype="float") z = np.array(z, dtype="float") w = self.connectivity # Fill the diagonal with 0s w = weights.util.fill_diagonal(w, val=0) w.transform = "b" self.x = x self.z = z self.n = len(x) self.w = w self.case = case keep_simulations = self.keep_simulations n_jobs = self.n_jobs seed = self.seed self.LJC = self._statistic(x, z, w, case=case) if permutations: if case == "BJC": self.p_sim, self.rjoins = _crand_plus( z=np.column_stack((x, z)), w=self.w, observed=self.LJC, permutations=permutations, keep=True, n_jobs=n_jobs, stat_func=_ljc_bv_case1, island_weight=self.island_weight, ) # Set p-values for those with LJC of 0 to NaN self.p_sim[self.LJC == 0] = "NaN" elif case == "CLC": self.p_sim, self.rjoins = _crand_plus( z=np.column_stack((x, z)), w=self.w, observed=self.LJC, permutations=permutations, keep=True, n_jobs=n_jobs, stat_func=_ljc_bv_case2, island_weight=self.island_weight, ) # Set p-values for those with LJC of 0 to NaN self.p_sim[self.LJC == 0] = "NaN" else: raise NotImplementedError( f"The requested LJC method ({case}) \ is not currently supported!" ) return self @staticmethod def _statistic(x, z, w, case): # Create adjacency list. Note that remove_symmetric=False - this is # different from the esda.Join_Counts() function. adj_list = w.to_adjlist(remove_symmetric=False) # First, set up a series that maps the values # to the weights table zseries_x = pd.Series(x, index=w.id_order) zseries_z = pd.Series(z, index=w.id_order) # Map the values to the focal (i) values focal_x = zseries_x.loc[adj_list.focal].values focal_z = zseries_z.loc[adj_list.focal].values # Map the values to the neighbor (j) values neighbor_x = zseries_x.loc[adj_list.neighbor].values neighbor_z = zseries_z.loc[adj_list.neighbor].values if case == "BJC": BJC = ( (focal_x == 1) & (focal_z == 0) & (neighbor_x == 0) & (neighbor_z == 1) ) adj_list_BJC = pd.DataFrame( adj_list.focal.values, BJC.astype("uint8") ).reset_index() adj_list_BJC.columns = ["BJC", "ID"] adj_list_BJC = adj_list_BJC.groupby(by="ID").sum() return np.array(adj_list_BJC.BJC.values, dtype="float") elif case == "CLC": CLC = ( (focal_x == 1) & (focal_z == 1) & (neighbor_x == 1) & (neighbor_z == 1) ) adj_list_CLC = pd.DataFrame( adj_list.focal.values, CLC.astype("uint8") ).reset_index() adj_list_CLC.columns = ["CLC", "ID"] adj_list_CLC = adj_list_CLC.groupby(by="ID").sum() return np.array(adj_list_CLC.CLC.values, dtype="float") else: raise NotImplementedError( f"The requested LJC method ({case}) \ is not currently supported!" )
# -------------------------------------------------------------- # Conditional Randomization Function Implementations # -------------------------------------------------------------- # Note: scaling not used @_njit(fastmath=True) def _ljc_bv_case1(i, z, permuted_ids, weights_i, scaling): zx = z[:, 0] zy = z[:, 1] self_weight = weights_i[0] other_weights = weights_i[1:] zyi, zyrand = _prepare_univariate(i, zy, permuted_ids, other_weights) return zx[i] * (zyrand @ other_weights) @_njit(fastmath=True) def _ljc_bv_case2(i, z, permuted_ids, weights_i, scaling): zx = z[:, 0] zy = z[:, 1] self_weight = weights_i[0] other_weights = weights_i[1:] zxi, zxrand, zyi, zyrand = _prepare_bivariate(i, z, permuted_ids, other_weights) zf = zxrand * zyrand return zy[i] * (zf @ other_weights)