import glob
import os
import networkx as nx
import numpy as np
import pandas as pd
from numpy import linalg as la
from matplotlib import pyplot as plt
import seaborn as sns
from pandas.plotting import autocorrelation_plot
from tqdm import tqdm
import random
from sys import platform as sys_pf
if sys_pf == 'darwin':
import matplotlib
matplotlib.use("TkAgg")
[docs]def plot_heatmap(heatmap, cbar=False, annot=False, xticklabels=1, yticklabels=1):
mask = np.zeros_like(heatmap)
mask[np.triu_indices_from(mask)] = True
with sns.axes_style("white"):
sns.heatmap(heatmap, mask=mask, annot=annot,
cmap="Blues",
vmin=0.0, vmax=1.0, square=True,
cbar=cbar,
xticklabels=xticklabels, yticklabels=yticklabels)
#sns.set_style("whitegrid")
cax = plt.gcf().axes[-1]
cax.tick_params(labelsize=6)
[docs]def random_subset(A):
""" Draws a random subset of elements in a list, inclding the empty set.
Args:
A (list)
Returns:
set: Subset of A.
"""
tmp = np.array(list(A))
bin_samp = np.random.multinomial(1, [0.5, 0.5], size=len(tmp))
c = np.ma.masked_array(tmp, mask=bin_samp[:, 0])
rest = set(c.compressed())
return rest
[docs]def random_element_from_coll(A):
tmp = np.array(list(A))
ind = np.random.randint(len(A))
return tmp[ind]
[docs]def l2_loss(m1, m2):
""" L2 loss between m1 and m2.
Args:
m1 (Numpy array): A matrix
m1 (Numpy array): A matrix
Returns:
float
"""
A = np.matrix(m1)
B = np.matrix(m2)
return np.power(la.norm(A - B, "fro"), 2) # <A-B, A-B>
[docs]def l1_loss(m1, m2):
""" L1 loss.
Args:
m1 (Numpy array): A matrix
m1 (Numpy array): A matrix
Returns:
float
"""
A = np.matrix(m1)
B = np.matrix(m2)
p = A.shape[0]
(sign, logdet) = la.slogdet(A * B.I)
loss = np.trace(A.transpose() * B.I) # <A, B.I>
loss -= sign * logdet
loss -= p
return loss
[docs]def tpr(true_graph, est_graph):
""" Calculates the True positive rate of an estimated adjacency matrix.
"""
N = len(true_graph)
no_correct = 0.0
no_false_rej = 0.0
for i in range(N):
for j in range(N):
if est_graph.item(i, j) == 1 and true_graph.item(i, j) == 1:
no_correct += 1
if true_graph.item(i, j) == 1:
if est_graph.item(i, j) == 0:
no_false_rej += 1
return no_correct / (no_correct + no_false_rej)
[docs]def spc1(true_graph, est_graph):
"""
Takes 2 adjacency matrices.
"""
N = len(true_graph)
no_corr_rej = 0.0
no_wrong_incl = 0.0
for i in range(N):
for j in range(N):
if est_graph.item(i, j) == 1 and true_graph.item(i, j) == 0:
no_wrong_incl += 1
if est_graph.item(i, j) == 0 and true_graph.item(i, j) == 0:
no_corr_rej += 1
return no_corr_rej / (no_corr_rej + no_wrong_incl)
[docs]def get_marg_counts(full_data, subset):
""" Returns a contingency table in dictionary form.
Args:
data (np.array): The data in n x p form.
subset (list): The subset of interest
"""
if len(subset) == 0:
return None
counts = {}
for row in full_data:
cell = tuple(row[subset])
# print cell
if cell not in counts:
counts[cell] = 1
else:
counts[cell] += 1
return counts
[docs]def plot_matrix(m, filename, extension, title="Adjmat"):
""" Plots a 2-dim numpy array as heatmap.
Args:
m (numpy array): matrix to plot.
"""
m1 = np.array(m)
fig, ax = plt.subplots()
ax.pcolor(m1, cmap=plt.cm.Blues)
fig.suptitle(title, fontsize=20)
ax.invert_yaxis()
ax.xaxis.tick_top()
plt.savefig(filename+"."+extension, format=extension, dpi=100)
[docs]def sample_classification_datasets(mus, covmats, n_samples_in_each_class):
n_classes = len(covmats)
n_dim = covmats[0].shape[1]
# Generate training data
n_train = [n_samples_in_each_class] * n_classes
x = np.matrix(np.zeros((sum(n_train), n_dim))).reshape(sum(n_train), n_dim)
y = np.matrix(np.zeros((sum(n_train), 1), dtype=int)).reshape(sum(n_train), 1)
for c in range(n_classes):
fr = sum(n_train[:c])
to = sum(n_train[:c + 1])
x[np.ix_(range(fr, to),
range(n_dim))] = np.matrix(np.random.multivariate_normal(
np.array(mus[c]).flatten(),
covmats[c],
n_train[c]))
y[np.ix_(range(fr, to), [0])] = np.matrix(np.ones(n_train[c], dtype=int) * c).T
ys = pd.Series(np.array(y).flatten(), dtype=int)
df = pd.DataFrame(x)
df["y"] = ys
df = df[["y"] + range(n_dim)]
df.columns = ["y"] + ["x" + str(i) for i in range(1, n_dim + 1)]
return df
# return x, y
# return pd.DataFrame(x), pd.Series(np.array(y).flatten(), dtype=int)
[docs]def is_pos_def(x):
return np.all(np.linalg.eigvals(x) > 0)
[docs]def gen_prec_mat(graph, a):
def gen_mat(graph):
p = graph.order()
prec_mat = np.zeros(p * p).reshape(p, p)
adj_mat = nx.to_numpy_array(graph)
d = np.diag(np.random.uniform(low=a, high=1, size=p))
for i in range(p - 1):
for j in range(i + 1, p):
if adj_mat[i, j] == 1:
rn = random.uniform(a, 1)
if random.randint(0, 1) == 1:
rn *= -1
prec_mat[i, j] = rn
prec_mat[j, i] = rn
prec_mat += d
return prec_mat
prec_mat = gen_mat(graph)
p = graph.order()
i = 0
e = 0.1
while not is_pos_def(prec_mat):
i += 1
prec_mat += np.diag(np.ones(p)*e)
return prec_mat
[docs]def plot_multiple_traj_statistics(trajs, burnin_end,
write_to_file=False, annot=False, output_directory="./", file_extension="eps"):
#trajectories = group_trajectories_by_setting(trajs) # this must have a bug
trajectories = trajs
if not os.path.exists(output_directory):
os.mkdir(output_directory)
for param_setting, traj_list in trajectories.items():
print("Setting: " + str(traj_list[0].sampling_method))
print("Average sample time: " + str(np.mean(traj_list[0].time)))
sns.set_style("whitegrid")
for t in tqdm(traj_list, total=len(traj_list), desc="Plotting size"):
t.size(burnin_end).plot()
if write_to_file:
plt.savefig(output_directory +"/"+ str(t) + "_size." + file_extension)
plt.clf()
sns.set_style("whitegrid")
for t in tqdm(traj_list, total=len(traj_list), desc="Plotting log-likelihood"):
t.log_likelihood(burnin_end).plot()
if write_to_file:
plt.savefig(output_directory +"/"+ str(t) + "_log-likelihood."+file_extension)
plt.clf()
sns.set_style("whitegrid")
for t in tqdm(traj_list, total=len(traj_list), desc="Plotting autocorr"):
autocorrelation_plot(t.size(burnin_end))
if write_to_file:
plt.savefig(output_directory +"/"+ str(t) + "_autocorr"+"_burnin_"+str(burnin_end)+"."+file_extension)
plt.clf()
for i, t in tqdm(enumerate(traj_list), total=len(traj_list),
desc="Plotting heatmap, size auto-correlation, MAP and ML graph"):
plot_heatmap(t.empirical_distribution(burnin_end).heatmap(),
xticklabels=np.arange(1, t.seqdist.p +1),
yticklabels=np.arange(1, t.seqdist.p +1), annot=annot)
cax = plt.gcf().axes[-1]
cax.tick_params(labelsize=6)
if write_to_file:
plt.savefig(output_directory +"/"+ str(t) + "_heatmap_" + str(i) + "_burnin_"+str(burnin_end)+"."+file_extension)
plt.clf()
plot_heatmap(t.empirical_distribution(burnin_end).heatmap(), cbar=True,
xticklabels=np.arange(1, t.seqdist.p +1),
yticklabels=np.arange(1, t.seqdist.p +1), annot=annot)
cax = plt.gcf().axes[-1]
cax.tick_params(labelsize=6)
cax = plt.gcf().axes[-2]
cax.tick_params(labelsize=6)
if write_to_file:
plt.savefig(output_directory +"/"+ str(t) + "_heatmap_cbar_" + str(i) + "_burnin_"+str(burnin_end)+ "."+file_extension)
plt.clf()
sns.set_style("white")
autocorrelation_plot(t.size(burnin_end))
if write_to_file:
plt.savefig(output_directory +"/"+ str(t) + "_size_autocorr_" + str(i) + "_burnin_"+str(burnin_end)+ "."+file_extension)
plt.clf()
top = t.empirical_distribution().mode(1)
plot_heatmap(nx.to_numpy_array(top[0][0]),
xticklabels=np.arange(1, t.seqdist.p +1),
yticklabels=np.arange(1, t.seqdist.p +1)
)
cax = plt.gcf().axes[-1]
cax.tick_params(labelsize=6)
if write_to_file:
plt.savefig(output_directory +"/"+ str(t) + "_map_" + str(i) + "."+file_extension)
plt.clf()
plot_heatmap(nx.to_numpy_array(t.maximum_likelihood_graph()),
xticklabels=np.arange(1, t.seqdist.p +1),
yticklabels=np.arange(1, t.seqdist.p +1)
)
cax = plt.gcf().axes[-1]
cax.tick_params(labelsize=6)
if write_to_file:
plt.savefig(output_directory +"/"+ str(t) + "_ml_" + str(i) + "."+file_extension)
plt.clf()
[docs]def read_all_trajectories_in_dir(directory):
from trilearn.graph import trajectory as gtraj
trajlist = []
for filename in glob.glob(directory + "/*.json"):
print("Loading: " + str(filename))
t = gtraj.Trajectory()
t.read_file(filename)
trajlist.append(t)
return group_trajectories_by_setting(trajlist)
[docs]def group_trajectories_by_setting(trajlist):
# Gather all with the same parameter setting
trajectories = {}
# print(trajlist)
for t in trajlist:
if str(t) not in trajectories:
trajectories[str(t)] = []
trajectories[str(t)].append(t)
# print("trajs")
# print(trajectories)
return trajectories
[docs]def plot_graph_traj_statistics(graph_traj, write_to_file=False):
top = graph_traj.empirical_distribution().mode(5)
print("Probability\tEdge list: ")
for graph, prob in top:
print(str(prob) + "\t\t" + str(list(graph.edges())))
graph_traj.size().plot()
if write_to_file:
plt.savefig(str(graph_traj)+"_size.png")
plt.clf()
autocorrelation_plot(graph_traj.size())
if write_to_file:
plt.savefig(str(graph_traj)+"_autocorr.png")
plt.clf()
graph_traj.log_likelihood().plot()
if write_to_file:
plt.savefig(str(graph_traj)+"_loglik.png")
plt.clf()
plot_heatmap(graph_traj.empirical_distribution().heatmap())
if write_to_file:
plt.savefig(str(graph_traj)+"_heatmap.png")
plt.clf()
top = graph_traj.empirical_distribution().mode(1)
plot_heatmap(nx.to_numpy_array(top[0][0]))
if write_to_file:
plt.savefig(str(graph_traj)+"_map.png")
plt.clf()