Code Reproducibility

Code Reproducibility#

import numpy as np
from graspologic.simulations import sbm
from graphbook_code import dcsbm

# the block matrix for the neurons before learning
B0 = 0.05*np.ones((3, 3))
np.fill_diagonal(B0, 0.4)

nk = 40
ns = np.repeat(nk, 3)

theta = np.tile(np.linspace(np.sqrt(2), np.sqrt(2) - 1, nk), 3)
zs = np.repeat([1,2,3], nk)

T = 12
networks = np.array([sbm(ns, B0) if (t < 6 or t >= 9) else dcsbm(zs, theta, B0) for t in range(T)])
from graspologic.inference import latent_position_test
import warnings
from tqdm import tqdm

pvalues = [latent_position_test(networks[t + 1], networks[t], n_components=3,
                                n_bootstraps=1000, workers=-1)[1] for t in tqdm(range(T-1))]
from statsmodels.stats.multitest import multipletests

alpha = 0.05
_, adj_pvals, _, _ = multipletests(pvalues, alpha=alpha, method="holm")
import numpy as np

pi_astronaut = 0.45
pi_earthling = 0.55
M = 200

# roll a 2-sided die 200 times, with probability 0.55 of landing on side 1 (earthling)p
# and robability 0.45 of landing on side 2 (astronaut)
classnames = ["Earthling", "Earthling"]
ys = np.random.choice([1, 2], p=[pi_earthling, pi_astronaut], size=M)
print(f"Number of individuals who are earthlings: {(ys == 1).sum():d}")
print(f"Number of individuals who are astronauts: {(ys == 2).sum():d}")
Number of individuals who are earthlings: 102
Number of individuals who are astronauts: 98
n = 5
P_earthling = np.full(shape=(n, n), fill_value=0.3)

nodenames = [
    "SI", "L", "H/E", 
    "T/M", "BS"

signal_subnetwork = np.full(shape=(n, n), fill_value=False)
signal_subnetwork[1:n, 0] = True
signal_subnetwork[0, 1:n] = True
P_astronaut = np.copy(P_earthling)

# probabilities for signal edges are higher in astronauts than earthlings
P_astronaut[signal_subnetwork] = np.tile(np.linspace(0.4, 0.9, num=4), reps=2)
from graspologic.simulations import sample_edges

# the probability matrices for each class
Ps = [P_earthling, P_astronaut]

# sample networks with the indicated probability matrix
As = np.stack([sample_edges(P=Ps[y-1]) for y in ys], axis=2)
def generate_table(As, ys, i, j):
    A function to generate a contingency table for a given edge.
    # count the number of earthlings with edge i,j
    a = As[i,j,ys == 1].sum()
    # count the number of astronauts with edge i,j
    b = As[i,j,ys == 2].sum()

    c = len(As[i,j,ys == 1]) - a
    d = len(As[i,j,ys == 2]) - b
    edge_tab = np.array([[a, b], [c, d]])
    return edge_tab

# edge (0, 4) corresponds to SI to BS
edge_tab = generate_table(As, ys, 0, 4)
[[29. 86.]
 [73. 12.]]
from scipy.stats import fisher_exact

_, pval = fisher_exact(edge_tab)
print(f"p-value: {pval:.4f}")
p-value: 0.0000
p-value: 0.0000
_, pval = fisher_exact(generate_table(As, ys, 2, 1))
print(f"p-value: {pval:.4f}")
p-value: 0.7600
p-value: 0.7600
from graspologic.utils import symmetrize
from scipy.stats import rankdata

fisher_mtx = np.empty((n, n))
fisher_mtx[:] = np.nan

for i in range(0, n):
    for j in range(i+1, n):
        fisher_mtx[i, j] = fisher_exact(generate_table(As, ys, i, j))[1]
fisher_mtx = symmetrize(fisher_mtx, method="triu")
# use rankdata on -fisher_mtx, to rank from largest p-value to smallest p-value
edge_imp = rankdata(-fisher_mtx, method="dense", nan_policy="omit").reshape(fisher_mtx.shape)
np.fill_diagonal(edge_imp, 0)
from graspologic.subgraph import SignalSubgraph

K = 8  # the number of edges in the subgraph
ssn_mod = SignalSubgraph()
# graspologic signal subgraph module assumes labels are 0, ..., K-1
# so use ys - 1 to rescale from (1, 2) to (0, 1)
ssn_mod.fit_transform(As, labels=ys - 1, constraints=K);

sn_est = np.zeros((n,n))  # initialize empty matrix
sn_est[ssn_mod.sigsub_] = 1
D = As[ssn_mod.sigsub_[0], ssn_mod.sigsub_[1],:].T
from sklearn.naive_bayes import BernoulliNB

classifier = BernoulliNB()
# fit the classifier using the vector of classes for each sample, ys)
# number of holdout samples
Mp = 200
# new random seed so heldout samples differ
y_heldout = np.random.choice([1, 2], p=[pi_earthling, pi_astronaut], size=Mp)
# sample networks with the appropriate probability matrix
A_heldout = np.stack([sample_edges(Ps[y-1]) for y in y_heldout], axis=2)

# compute testing data on the estimated signal subnetwork
D_heldout = A_heldout[ssn_mod.sigsub_[0], ssn_mod.sigsub_[1],:].T

yhat_heldout = classifier.predict(D_heldout)

# classifier accuracy is the fraction of predictions that are correct
heldout_acc = np.mean(yhat_heldout == y_heldout)
print(f"Classifier Testing Accuracy: {heldout_acc:.3f}")
Classifier Testing Accuracy: 0.810
Classifier Testing Accuracy: 0.810
def train_and_eval_ssn(Atrain, ytrain, Atest, ytest, K):
    A function which trains and tests an incoherent signal subnetwork
    classifier with K signal edges.
    ssn_mod = SignalSubgraph()
    ssn_mod.fit_transform(Atrain, labels=ytrain - 1, constraints=int(K));

    Dtrain = Atrain[ssn_mod.sigsub_[0], ssn_mod.sigsub_[1],:].T
    classifier = BernoulliNB()
    # fit the classifier using the vector of classes for each sample, ytrain)

    # compute testing data on the estimated signal subnetwork
    Dtest = Atest[ssn_mod.sigsub_[0], ssn_mod.sigsub_[1],:].T
    yhat_test = classifier.predict(Dtest)
    # classifier accuracy is the fraction of predictions that are correct
    return (np.mean(yhat_test == ytest), ssn_mod, classifier)
from sklearn.model_selection import KFold
import pandas as pd

kf = KFold(n_splits=20, shuffle=True, random_state=0)
xv_res = []
for l, (train_index, test_index) in enumerate(kf.split(range(0, M))):
    A_train, A_test = As[:,:,train_index], As[:,:,test_index]
    y_train, y_test = ys[train_index], ys[test_index]
    nl = len(test_index)
    for k in np.arange(2, 20, step=2):
        acc_kl, _, _ = train_and_eval_ssn(A_train, y_train, A_test, y_test, k)
        xv_res.append({"Fold": l, "k": k, "nl": nl, "Accuracy": acc_kl})
xv_data = pd.DataFrame(xv_res)

def weighted_avg(group):
    acc = group['Accuracy']
    nl = group['nl']
    return (acc * nl).sum() / nl.sum()

xv_acc = xv_data.groupby(["k"]).apply(weighted_avg)
2     0.795
4     0.795
6     0.830
8     0.830
10    0.820
12    0.825
14    0.825
16    0.820
18    0.820
dtype: float64
import numpy as np
from graspologic.simulations import sample_edges

nodenames = [
    "SI", "L", "H/E", 
    "T/M", "BS"

# generate probability matrices
n = 5  # the number of nodes
P_earthling = 0.3*np.ones((n, n))
signal_subnetwork = np.zeros((n, n), dtype=bool)
signal_subnetwork[1:n, 0] = True
signal_subnetwork[0, 1:n] = True
P_astronaut = np.copy(P_earthling)
P_astronaut[signal_subnetwork] = np.tile(np.linspace(0.4, 0.9, num=4), 2)

# sample the classes of each sample
M = 200  # the number of training and testing samples
pi_astronaut = 0.45
pi_earthling = 0.55
ytrain = np.random.choice([1,2], p=[pi_earthling, pi_astronaut], size=M)
ytest = np.random.choice([1,2], p=[pi_earthling, pi_astronaut], size=M)

# sample network realizations given the class of each sample
Ps = [P_earthling, P_astronaut]
Atrain = np.stack([sample_edges(Ps[y-1]) for y in ytrain], axis=2)
Atest = np.stack([sample_edges(Ps[y-1]) for y in ytest], axis=2)
from graspologic.subgraph import SignalSubgraph
K = 8  # the number of signal edges
V = 1  # the number of signal nodes

# the incoherent signal subnetwork estimator
ssn_est_inco = SignalSubgraph()
ssn_est_inco.fit_transform(Atrain, labels=ytrain-1, constraints=K)

# the coherent signal subnetwork estimator
ssn_est_coherent = SignalSubgraph()
ssn_est_coherent.fit_transform(Atrain, labels=ytrain-1, constraints=[K, V])
(array([0, 4, 0, 2, 0, 3, 0, 1]), array([4, 0, 2, 0, 3, 0, 1, 0]))
ssn_coherent = np.zeros((n, n))
ssn_incoherent = np.zeros((n, n))

ssn_incoherent[ssn_est_inco.sigsub_] = 1
ssn_coherent[ssn_est_coherent.sigsub_] = 1
from sklearn.naive_bayes import BernoulliNB

def train_and_eval_coherent_ssn(Atrain, ytrain, Atest, ytest, K, V):
    A function which trains and tests an incoherent signal subnetwork
    classifier with K signal edges and V signal nodes.
    ssn_mod = SignalSubgraph()
    ssn_mod.fit_transform(Atrain, labels=ytrain-1, constraints=[int(K), int(V)]);

    Dtrain = Atrain[ssn_mod.sigsub_[0], ssn_mod.sigsub_[1],:].T
    classifier = BernoulliNB()
    # fit the classifier using the vector of classes for each sample, ytrain)

    # compute testing data on the estimated signal subnetwork
    Dtest = Atest[ssn_mod.sigsub_[0], ssn_mod.sigsub_[1],:].T
    yhat_test = classifier.predict(Dtest)
    # classifier accuracy is the fraction of predictions that are correct
    return (np.mean(yhat_test == ytest), ssn_mod, classifier)
from sklearn.model_selection import KFold
import pandas as pd
from tqdm import tqdm

kf = KFold(n_splits=20, shuffle=True, random_state=0)
xv_res = []
for l, (train_index, test_index) in tqdm(enumerate(kf.split(range(0, M)))):
    A_train, A_test = Atrain[:,:,train_index], Atrain[:,:,test_index]
    y_train, y_test = ytrain[train_index], ytrain[test_index]
    nl = len(test_index)
    for k in np.arange(2, n*(n-1), step=2):
        for v in range(1, n+1):
                acc_kl, _, _ = train_and_eval_coherent_ssn(A_train, y_train, A_test, y_test, k, v)
                xv_res.append({"Fold": l, "k": k, "nl": nl, "v": v, "Accuracy": acc_kl})
                xv_res.append({"Fold": l, "k": k, "nl": nl, "v": v, "Accuracy": np.nan})
xv_data = pd.DataFrame(xv_res)

def weighted_avg(group):
    acc = group['Accuracy']
    nl = group['nl']
    return (acc * nl).sum() / nl.sum()

xv_acc = xv_data.groupby(["k", "v"]).apply(weighted_avg).reset_index(name='Accuracy')
# convert the pandas dataframe (long format) to a data matrix (wide format)
df_hm = xv_acc.pivot(index="k", columns="v", values="Accuracy")
# the coherent signal subnetwork estimator, using the parameters from xv
ssn_est_coherent_xv = SignalSubgraph()
ssn_est_coherent_xv.fit_transform(Atrain, labels=ytrain-1, constraints=[6, 1])

ssn_coherent_xv = np.zeros((n, n))
ssn_coherent_xv[ssn_est_coherent_xv.sigsub_] = 1