Source code for trilearn.pgibbs

from multiprocessing import Process
import multiprocessing
import os
import datetime
import time

import numpy as np
from tqdm import tqdm

import trilearn.graph
from trilearn import set_process as sp
from trilearn.distributions import sequential_junction_tree_distributions as seqdist
from trilearn.graph import trajectory as mcmctraj, junction_tree as jtlib
from trilearn.smc import approximate, approximate_cond
from trilearn import auxiliary_functions as aux


[docs]def sample_trajectory(smc_N, alpha, beta, radius, n_samples, seq_dist, jt_traj=None, debug=False, reset_cache=True): """ A particle Gibbs implementation for approximating distributions over junction trees. Args: smc_N (int): Number of particles in SMC in each Gibbs iteration n_samples (int): Number of Gibbs iterations (samples) alpha (float): sparsity parameter for the Christmas tree algorithm beta (float): sparsity parameter for the Christmas tree algorithm radius (float): defines the radius within which ned nodes are selected seq_dist (SequentialJTDistributions): the distribution to be sampled from Returns: Trajectory: Markov chain of the underlying graphs of the junction trees sampled by pgibbs. """ graph_traj = mcmctraj.Trajectory() graph_traj.set_sampling_method({"method": "pgibbs", "params": {"N": smc_N, "alpha": alpha, "beta": beta, "radius": radius}}) graph_traj.set_sequential_distribution(seq_dist) neig_set_cache = {} (trees, log_w) = (None, None) prev_tree = None for i in tqdm(range(n_samples), desc="Particle Gibbs samples"): if reset_cache is True: seq_dist.cache = {} start_time = time.time() if i == 0: #start_graph = nx.Graph() #start_graph.add_nodes_from(range(seqdist.p)) #start_tree = dlib.junction_tree(start_graph) (trees, log_w) = approximate(smc_N, alpha, beta, radius, seq_dist, neig_set_cache=neig_set_cache) else: # Sample backwards trajectories perm_traj = sp.backward_perm_traj_sample(seq_dist.p, radius) T_traj = trilearn.graph.junction_tree_collapser.backward_jt_traj_sample(perm_traj, prev_tree) (trees, log_w, Is) = approximate_cond(smc_N, alpha, beta, radius, seq_dist, T_traj, perm_traj, neig_set_cache=neig_set_cache) # Sample T from T_1..p log_w_array = np.array(log_w.T)[seq_dist.p - 1] log_w_rescaled = log_w_array - max(log_w_array) w_rescaled = np.exp(log_w_rescaled) norm_w = w_rescaled / sum(w_rescaled) I = np.random.choice(smc_N, size=1, p=norm_w)[0] T = trees[I] prev_tree = T graph = jtlib.graph(T) end_time = time.time() graph_traj.add_sample(graph, end_time - start_time) return graph_traj
[docs]def trajectory_to_file(n_particles, n_samples, alpha, beta, radius, seqdist, node_labels, reset_cache=True, dir=".", output_filename="trajectory.csv", reseed=False): """ Writes the trajectory of graphs generated by particle Gibbs to file. Args: n_particles (int): Number of particles in SMC in each Gibbs iteration n_samples (int): Number of Gibbs iterations (samples) alpha (float): sparsity parameter for the Christmas tree algorithm beta (float): sparsity parameter for the Christmas tree algorithm radius (float): defines the radius within which ned nodes are selected seq_dist (SequentialJTDistributions): the distribution to be sampled from filename_prefix (string): prefix to the filename Returns: Trajectory: Markov chain of underlying graphs of the junction trees sampled by pgibbs. """ if reseed is True: np.random.seed() graph_trajectory = sample_trajectory(n_particles, alpha, beta, radius, n_samples, seqdist, reset_cache=reset_cache) date = datetime.datetime.today().strftime('%Y%m%d%H%m%S') if not os.path.exists(dir): os.mkdir(dir) print(graph_trajectory) if output_filename is None: # This is the format that can be analyzed by trilearn. (Should be changed to Benchpress format in the future) filename1 = dir + "/" + str(graph_trajectory) +"_"+ date + ".json" graph_trajectory.write_file(filename=filename1) print("wrote file: " + filename1) else: # This is for the Benchpress format df = graph_trajectory.graph_diff_trajectory_df(node_labels) df.to_csv(dir +"/"+output_filename, sep=",", index=False) return graph_trajectory
[docs]def trajectory_to_queue(n_particles, n_samples, alpha, beta, radius, seqdist, queue, reset_cache=True, reseed=False): """ Writes the trajectory of graphs generated by particle Gibbs to file. Args: n_particles (int): Number of particles in SMC in each Gibbs iteration n_samples (int): Number of Gibbs iterations (samples) alpha (float): sparsity parameter for the Christmas tree algorithm beta (float): sparsity parameter for the Christmas tree algorithm radius (float): defines the radius within which ned nodes are selected seq_dist (SequentialJTDistributions): the distribution to be sampled from filename_prefix (string): prefix to the filename Returns: Trajectory: Markov chain of underlying graphs of the junction trees sampled by pgibbs. """ if reseed is True: np.random.seed() #print (n_particles, alpha, beta, radius, n_samples, str(seqdist), reset_cache) graph_trajectory = sample_trajectory(n_particles, alpha, beta, radius, n_samples, seqdist, reset_cache=reset_cache) queue.put(graph_trajectory)
[docs]def sample_trajectory_ggm(dataframe, n_particles, n_samples, D=None, delta=1.0, alpha=0.5, beta=0.5, radius=None, reset_cache=True, **args): """ Particle Gibbs for approximating distributions over Gaussian graphical models. Args: n_particles (int): Number of particles in SMC in each Gibbs iteration n_samples (int): Number of Gibbs iterations (samples) alpha (float): sparsity parameter for the Christmas tree algorithm beta (float): sparsity parameter for the Christmas tree algorithm radius (float): defines the radius within which ned nodes are selected dataframe (np.matrix): row matrix of data D (np.matrix): matrix parameter for the hyper inverse wishart prior delta (float): degrees of freedom for the hyper inverse wishart prior cache (dict): cache for clique likelihoods Returns: Trajectory: Markov chain of the underlying graphs of the junction trees sampled by pgibbs. """ p = dataframe.shape[1] if D is None: D = np.identity(p) if radius is None: radius = p sd = seqdist.GGMJTPosterior() sd.init_model(np.asmatrix(dataframe), D, delta, {}) return sample_trajectory(n_particles, alpha, beta, radius, n_samples, sd, reset_cache=reset_cache)
[docs]def sample_trajectories_ggm(dataframe, n_particles, n_samples, D=None, delta=1.0, alphas=[0.5], betas=[0.5], radii=[None], reset_cache=True, reps=1, **args): graph_trajectories = [] for _ in range(reps): for N in n_particles: for T in n_samples: for rad in radii: for alpha in alphas: for beta in betas: if rad is None: rad = dataframe.shape[1] graph_trajectory = sample_trajectory_ggm(dataframe, n_particles=N, n_samples=T, D=D, delta=delta, alpha=alpha, beta=beta, radius=rad, reset_cache=reset_cache) graph_trajectories.append(graph_trajectory) return graph_trajectories
[docs]def sample_trajectories_ggm_to_file(dataframe, n_particles, n_samples, D=None, delta=1.0, alphas=[0.5], betas=[0.5], radii=[None], reset_cache=True, reps=1, output_directory=".", output_filename="trajectory.json", **args): p = dataframe.shape[1] if D is None: D = np.identity(p) if radii == [None]: radii = [p] node_labels = np.array(dataframe.columns, dtype=str) graph_trajectories = [] for _ in range(reps): for N in n_particles: for T in n_samples: for rad in radii: for alpha in alphas: for beta in betas: sd = seqdist.GGMJTPosterior() sd.init_model(np.asmatrix(dataframe), D, delta, {}) graph_trajectory = trajectory_to_file(N, T, alpha, beta, rad, sd, node_labels, reset_cache=reset_cache, output_filename=output_filename, dir=output_directory) graph_trajectories.append(graph_trajectory) return graph_trajectories
[docs]def sample_trajectories_ggm_parallel(dataframe, n_particles, n_samples, D=None, delta=1.0, alphas=[0.5], betas=[0.5], radii=[None], reset_cache=True, reps=1, **args): p = dataframe.shape[1] if D is None: D = np.identity(p) if radii == [None]: radii = [p] queue = multiprocessing.Queue() processes = [] rets = [] for _ in range(reps): for N in n_particles: for T in n_samples: for rad in radii: for alpha in alphas: for beta in betas: sd = seqdist.GGMJTPosterior() sd.init_model(np.asmatrix(dataframe), D, delta, {}) print("Starting: " + str((N, T, alpha, beta, rad, str(sd), reset_cache, True))) proc = Process(target=trajectory_to_queue, args=(N, T, alpha, beta, rad, sd, queue, reset_cache, True)) processes.append(proc) proc.start() time.sleep(2) for _ in processes: ret = queue.get() # will block rets.append(ret) for p in processes: p.join() return rets
[docs]def sample_trajectory_loglin(dataframe, n_particles, n_samples, pseudo_obs=1.0, alpha=0.5, beta=0.5, radius=None, reset_cache=True, **args): p = dataframe.shape[1] if radius is None: radius = p n_levels = np.array(dataframe.columns.get_level_values(1), dtype=int) levels = np.array([range(l) for l in n_levels]) sd = seqdist.LogLinearJTPosterior() sd.init_model(dataframe.get_values(), pseudo_obs, levels, {}) return sample_trajectory(n_particles, alpha, beta, radius, n_samples, sd, reset_cache=reset_cache)
[docs]def sample_trajectories_loglin(dataframe, n_particles, n_samples, pseudo_observations=[1.0], alphas=[0.5], betas=[0.5], radii=[None], reset_cache=True, reps=1, **args): graph_trajectories = [] for _ in range(reps): for N in n_particles: for T in n_samples: for rad in radii: for alpha in alphas: for beta in betas: for pseudo_obs in pseudo_observations: graph_trajectory = sample_trajectory_loglin(dataframe, n_particles=N, n_samples=T, pseudo_obs=pseudo_obs, alpha=alpha, beta=beta, radius=rad, reset_cache=reset_cache) graph_trajectories.append(graph_trajectory) return graph_trajectories
[docs]def sample_trajectories_loglin_to_file(dataframe, n_particles, n_samples, pseudo_observations=[1.0], alphas=[0.5], betas=[0.5], radii=[None], reset_cache=True, reps=1, output_directory=".", output_filename="trajectory.json", **args): p = dataframe.shape[1] if radii == [None]: radii = [p] n_levels = np.array(dataframe.columns.get_level_values(1), dtype=int) levels = np.array([range(l) for l in n_levels]) node_labels = np.array(dataframe.columns.get_level_values(0)) graph_trajectories = [] cache = {} for _ in range(reps): for N in n_particles: for T in n_samples: for rad in radii: for alpha in alphas: for beta in betas: for pseudo_obs in pseudo_observations: sd = seqdist.LogLinearJTPosterior() sd.init_model(dataframe.get_values(), pseudo_obs, levels, cache_complete_set_prob=cache) graph_trajectory = trajectory_to_file(N, T, alpha, beta, rad, sd, node_labels, reset_cache=reset_cache, output_filename=output_filename, dir=output_directory) graph_trajectories.append(graph_trajectory) return graph_trajectories
[docs]def sample_trajectories_loglin_parallel(dataframe, n_particles, n_samples, pseudo_observations=[1.0], alphas=[0.5], betas=[0.5], radii=[None], reset_cache=True, reps=1, output_directory=".", **args): p = dataframe.shape[1] if radii == [None]: radii = [p] n_levels = np.array(dataframe.columns.get_level_values(1), dtype=int) levels = np.array([range(l) for l in n_levels]) node_labels = np.array(dataframe.columns.get_level_values(0)) queue = multiprocessing.Queue() processes = [] rets = [] cache = {} for _ in range(reps): for N in n_particles: for T in n_samples: for rad in radii: for alpha in alphas: for beta in betas: for pseudo_obs in pseudo_observations: sd = seqdist.LogLinearJTPosterior() sd.init_model(dataframe.get_values(), pseudo_obs, levels, cache_complete_set_prob=cache) print("Starting: " + str((N, T, alpha, beta, rad, str(sd), reset_cache, output_directory, True))) proc = Process(target=trajectory_to_queue, args=(N, T, alpha, beta, rad, sd, queue, reset_cache, True)) proc.start() processes.append(proc) time.sleep(2) for _ in processes: ret = queue.get() # will block rets.append(ret) for p in processes: p.join() return rets