from multiprocessing import Process
import multiprocessing
import os
import datetime
import time
import numpy as np
from tqdm import tqdm
import trilearn.graph
from trilearn import set_process as sp
from trilearn.distributions import sequential_junction_tree_distributions as seqdist
from trilearn.graph import trajectory as mcmctraj, junction_tree as jtlib
from trilearn.smc import approximate, approximate_cond
from trilearn import auxiliary_functions as aux
[docs]def sample_trajectory(smc_N, alpha, beta, radius, n_samples, seq_dist,
jt_traj=None, debug=False, reset_cache=True):
""" A particle Gibbs implementation for approximating distributions over
junction trees.
Args:
smc_N (int): Number of particles in SMC in each Gibbs iteration
n_samples (int): Number of Gibbs iterations (samples)
alpha (float): sparsity parameter for the Christmas tree algorithm
beta (float): sparsity parameter for the Christmas tree algorithm
radius (float): defines the radius within which ned nodes are selected
seq_dist (SequentialJTDistributions): the distribution to be sampled from
Returns:
Trajectory: Markov chain of the underlying graphs of the junction trees sampled by pgibbs.
"""
graph_traj = mcmctraj.Trajectory()
graph_traj.set_sampling_method({"method": "pgibbs",
"params": {"N": smc_N,
"alpha": alpha,
"beta": beta,
"radius": radius}})
graph_traj.set_sequential_distribution(seq_dist)
neig_set_cache = {}
(trees, log_w) = (None, None)
prev_tree = None
for i in tqdm(range(n_samples), desc="Particle Gibbs samples"):
if reset_cache is True:
seq_dist.cache = {}
start_time = time.time()
if i == 0:
#start_graph = nx.Graph()
#start_graph.add_nodes_from(range(seqdist.p))
#start_tree = dlib.junction_tree(start_graph)
(trees, log_w) = approximate(smc_N, alpha, beta, radius, seq_dist, neig_set_cache=neig_set_cache)
else:
# Sample backwards trajectories
perm_traj = sp.backward_perm_traj_sample(seq_dist.p, radius)
T_traj = trilearn.graph.junction_tree_collapser.backward_jt_traj_sample(perm_traj,
prev_tree)
(trees, log_w, Is) = approximate_cond(smc_N,
alpha,
beta,
radius,
seq_dist,
T_traj,
perm_traj, neig_set_cache=neig_set_cache)
# Sample T from T_1..p
log_w_array = np.array(log_w.T)[seq_dist.p - 1]
log_w_rescaled = log_w_array - max(log_w_array)
w_rescaled = np.exp(log_w_rescaled)
norm_w = w_rescaled / sum(w_rescaled)
I = np.random.choice(smc_N, size=1, p=norm_w)[0]
T = trees[I]
prev_tree = T
graph = jtlib.graph(T)
end_time = time.time()
graph_traj.add_sample(graph, end_time - start_time)
return graph_traj
[docs]def trajectory_to_file(n_particles, n_samples, alpha, beta, radius, seqdist, node_labels,
reset_cache=True, dir=".", output_filename="trajectory.csv", reseed=False):
""" Writes the trajectory of graphs generated by particle Gibbs to file.
Args:
n_particles (int): Number of particles in SMC in each Gibbs iteration
n_samples (int): Number of Gibbs iterations (samples)
alpha (float): sparsity parameter for the Christmas tree algorithm
beta (float): sparsity parameter for the Christmas tree algorithm
radius (float): defines the radius within which ned nodes are selected
seq_dist (SequentialJTDistributions): the distribution to be sampled from
filename_prefix (string): prefix to the filename
Returns:
Trajectory: Markov chain of underlying graphs of the junction trees sampled by pgibbs.
"""
if reseed is True:
np.random.seed()
graph_trajectory = sample_trajectory(n_particles, alpha, beta, radius, n_samples, seqdist, reset_cache=reset_cache)
date = datetime.datetime.today().strftime('%Y%m%d%H%m%S')
if not os.path.exists(dir):
os.mkdir(dir)
print(graph_trajectory)
if output_filename is None:
# This is the format that can be analyzed by trilearn. (Should be changed to Benchpress format in the future)
filename1 = dir + "/" + str(graph_trajectory) +"_"+ date + ".json"
graph_trajectory.write_file(filename=filename1)
print("wrote file: " + filename1)
else:
# This is for the Benchpress format
df = graph_trajectory.graph_diff_trajectory_df(node_labels)
df.to_csv(dir +"/"+output_filename, sep=",", index=False)
return graph_trajectory
[docs]def trajectory_to_queue(n_particles, n_samples, alpha, beta, radius, seqdist, queue,
reset_cache=True, reseed=False):
""" Writes the trajectory of graphs generated by particle Gibbs to file.
Args:
n_particles (int): Number of particles in SMC in each Gibbs iteration
n_samples (int): Number of Gibbs iterations (samples)
alpha (float): sparsity parameter for the Christmas tree algorithm
beta (float): sparsity parameter for the Christmas tree algorithm
radius (float): defines the radius within which ned nodes are selected
seq_dist (SequentialJTDistributions): the distribution to be sampled from
filename_prefix (string): prefix to the filename
Returns:
Trajectory: Markov chain of underlying graphs of the junction trees sampled by pgibbs.
"""
if reseed is True:
np.random.seed()
#print (n_particles, alpha, beta, radius, n_samples, str(seqdist), reset_cache)
graph_trajectory = sample_trajectory(n_particles, alpha, beta, radius, n_samples, seqdist, reset_cache=reset_cache)
queue.put(graph_trajectory)
[docs]def sample_trajectory_ggm(dataframe, n_particles, n_samples, D=None, delta=1.0, alpha=0.5, beta=0.5,
radius=None, reset_cache=True, **args):
""" Particle Gibbs for approximating distributions over
Gaussian graphical models.
Args:
n_particles (int): Number of particles in SMC in each Gibbs iteration
n_samples (int): Number of Gibbs iterations (samples)
alpha (float): sparsity parameter for the Christmas tree algorithm
beta (float): sparsity parameter for the Christmas tree algorithm
radius (float): defines the radius within which ned nodes are selected
dataframe (np.matrix): row matrix of data
D (np.matrix): matrix parameter for the hyper inverse wishart prior
delta (float): degrees of freedom for the hyper inverse wishart prior
cache (dict): cache for clique likelihoods
Returns:
Trajectory: Markov chain of the underlying graphs of the junction trees sampled by pgibbs.
"""
p = dataframe.shape[1]
if D is None:
D = np.identity(p)
if radius is None:
radius = p
sd = seqdist.GGMJTPosterior()
sd.init_model(np.asmatrix(dataframe), D, delta, {})
return sample_trajectory(n_particles, alpha, beta, radius, n_samples, sd, reset_cache=reset_cache)
[docs]def sample_trajectories_ggm(dataframe, n_particles, n_samples, D=None, delta=1.0, alphas=[0.5], betas=[0.5],
radii=[None], reset_cache=True, reps=1, **args):
graph_trajectories = []
for _ in range(reps):
for N in n_particles:
for T in n_samples:
for rad in radii:
for alpha in alphas:
for beta in betas:
if rad is None:
rad = dataframe.shape[1]
graph_trajectory = sample_trajectory_ggm(dataframe, n_particles=N,
n_samples=T, D=D, delta=delta,
alpha=alpha, beta=beta,
radius=rad, reset_cache=reset_cache)
graph_trajectories.append(graph_trajectory)
return graph_trajectories
[docs]def sample_trajectories_ggm_to_file(dataframe, n_particles, n_samples, D=None, delta=1.0, alphas=[0.5], betas=[0.5],
radii=[None], reset_cache=True, reps=1, output_directory=".",
output_filename="trajectory.json",
**args):
p = dataframe.shape[1]
if D is None:
D = np.identity(p)
if radii == [None]:
radii = [p]
node_labels = np.array(dataframe.columns, dtype=str)
graph_trajectories = []
for _ in range(reps):
for N in n_particles:
for T in n_samples:
for rad in radii:
for alpha in alphas:
for beta in betas:
sd = seqdist.GGMJTPosterior()
sd.init_model(np.asmatrix(dataframe), D, delta, {})
graph_trajectory = trajectory_to_file(N, T, alpha, beta, rad, sd, node_labels,
reset_cache=reset_cache,
output_filename=output_filename,
dir=output_directory)
graph_trajectories.append(graph_trajectory)
return graph_trajectories
[docs]def sample_trajectories_ggm_parallel(dataframe, n_particles, n_samples, D=None, delta=1.0, alphas=[0.5], betas=[0.5],
radii=[None], reset_cache=True, reps=1, **args):
p = dataframe.shape[1]
if D is None:
D = np.identity(p)
if radii == [None]:
radii = [p]
queue = multiprocessing.Queue()
processes = []
rets = []
for _ in range(reps):
for N in n_particles:
for T in n_samples:
for rad in radii:
for alpha in alphas:
for beta in betas:
sd = seqdist.GGMJTPosterior()
sd.init_model(np.asmatrix(dataframe), D, delta, {})
print("Starting: " + str((N, T, alpha, beta, rad,
str(sd), reset_cache, True)))
proc = Process(target=trajectory_to_queue,
args=(N, T, alpha, beta, rad,
sd, queue, reset_cache, True))
processes.append(proc)
proc.start()
time.sleep(2)
for _ in processes:
ret = queue.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
[docs]def sample_trajectory_loglin(dataframe, n_particles, n_samples, pseudo_obs=1.0, alpha=0.5, beta=0.5,
radius=None, reset_cache=True, **args):
p = dataframe.shape[1]
if radius is None:
radius = p
n_levels = np.array(dataframe.columns.get_level_values(1), dtype=int)
levels = np.array([range(l) for l in n_levels])
sd = seqdist.LogLinearJTPosterior()
sd.init_model(dataframe.get_values(), pseudo_obs, levels, {})
return sample_trajectory(n_particles, alpha, beta, radius, n_samples, sd, reset_cache=reset_cache)
[docs]def sample_trajectories_loglin(dataframe, n_particles, n_samples, pseudo_observations=[1.0], alphas=[0.5], betas=[0.5],
radii=[None], reset_cache=True, reps=1, **args):
graph_trajectories = []
for _ in range(reps):
for N in n_particles:
for T in n_samples:
for rad in radii:
for alpha in alphas:
for beta in betas:
for pseudo_obs in pseudo_observations:
graph_trajectory = sample_trajectory_loglin(dataframe, n_particles=N, n_samples=T,
pseudo_obs=pseudo_obs, alpha=alpha,
beta=beta, radius=rad, reset_cache=reset_cache)
graph_trajectories.append(graph_trajectory)
return graph_trajectories
[docs]def sample_trajectories_loglin_to_file(dataframe, n_particles, n_samples, pseudo_observations=[1.0],
alphas=[0.5], betas=[0.5],
radii=[None], reset_cache=True, reps=1,
output_directory=".",
output_filename="trajectory.json", **args):
p = dataframe.shape[1]
if radii == [None]:
radii = [p]
n_levels = np.array(dataframe.columns.get_level_values(1), dtype=int)
levels = np.array([range(l) for l in n_levels])
node_labels = np.array(dataframe.columns.get_level_values(0))
graph_trajectories = []
cache = {}
for _ in range(reps):
for N in n_particles:
for T in n_samples:
for rad in radii:
for alpha in alphas:
for beta in betas:
for pseudo_obs in pseudo_observations:
sd = seqdist.LogLinearJTPosterior()
sd.init_model(dataframe.get_values(), pseudo_obs, levels, cache_complete_set_prob=cache)
graph_trajectory = trajectory_to_file(N, T, alpha, beta, rad, sd, node_labels,
reset_cache=reset_cache,
output_filename=output_filename,
dir=output_directory)
graph_trajectories.append(graph_trajectory)
return graph_trajectories
[docs]def sample_trajectories_loglin_parallel(dataframe, n_particles, n_samples, pseudo_observations=[1.0], alphas=[0.5], betas=[0.5],
radii=[None], reset_cache=True, reps=1, output_directory=".", **args):
p = dataframe.shape[1]
if radii == [None]:
radii = [p]
n_levels = np.array(dataframe.columns.get_level_values(1), dtype=int)
levels = np.array([range(l) for l in n_levels])
node_labels = np.array(dataframe.columns.get_level_values(0))
queue = multiprocessing.Queue()
processes = []
rets = []
cache = {}
for _ in range(reps):
for N in n_particles:
for T in n_samples:
for rad in radii:
for alpha in alphas:
for beta in betas:
for pseudo_obs in pseudo_observations:
sd = seqdist.LogLinearJTPosterior()
sd.init_model(dataframe.get_values(), pseudo_obs, levels, cache_complete_set_prob=cache)
print("Starting: " + str((N, T, alpha, beta, rad,
str(sd), reset_cache, output_directory, True)))
proc = Process(target=trajectory_to_queue,
args=(N, T, alpha, beta, rad,
sd, queue, reset_cache, True))
proc.start()
processes.append(proc)
time.sleep(2)
for _ in processes:
ret = queue.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets