Source code for trilearn.distributions.sequential_junction_tree_distributions
"""
Junction tree distributions suitable for SMC sampling.
"""
import numpy as np
import trilearn.graph.decomposable
from trilearn.distributions import gaussian_graphical_model
from trilearn.distributions import discrete_dec_log_linear as loglin
import trilearn.graph.junction_tree as jtlib
[docs]
class SequentialJTDistribution(object):
"""
Abstract class of junction tree distributions for SMC sampling.
"""
[docs]
def log_ratio(self,
old_cliques,
old_separators,
new_cliques,
new_separators,
old_JT,
new_JT):
pass
def __str__(self):
pass
class UniformJTDistribution(SequentialJTDistribution):
""" A sequential formulation of
.. math::
P(T)= \\frac{1}{\\text{# junction trees}}
"""
def __init__(self, p):
self.p = p
def log_ratio(self,
old_cliques,
old_separators,
new_cliques,
new_separators,
old_JT,
new_JT):
return 0.0
[docs]
class CondUniformJTDistribution(SequentialJTDistribution):
""" A sequential formulation of
.. math::
P(T) = P(T|G)P(G),
where
.. math::
P(G)= \\frac{1}{\\text{# decomposable graphs}}
and
.. math::
P(T|G) = \\frac{1}{\\text{#junction trees for G}}.
"""
def __init__(self, p):
self.p = p
[docs]
def log_ratio(self,
old_cliques,
old_separators,
new_cliques,
new_separators,
old_JT,
new_JT):
return -trilearn.graph.junction_tree.log_n_junction_trees_update_ratio(new_separators,
old_JT, new_JT)
[docs]
class CondUniformGivenSizeJTDistribution(SequentialJTDistribution):
""" A sequential formulation of
.. math::
P(T) = P(T, G) = P(T|G)P(G)
, where
.. math::
P(G)=1/(\\text{#decomopsable graphs}) * I(\\text{size of } G = k)
and
.. math::
P(T|G) = 1/(\\text{#junction trees for G}).
"""
def __init__(self, p, size):
self.p = p
self.size = size
[docs]
def log_ratio(self,
old_cliques,
old_separators,
new_cliques,
new_separators,
old_JT,
new_JT):
"""Log-likelihood ratio of new_JT and old_JT.
Args:
old_cliques ([type]): [description]
old_separators ([type]): [description]
new_cliques ([type]): [description]
new_separators ([type]): [description]
old_JT ([type]): [description]
new_JT ([type]): [description]
Returns:
[type]: [description]
"""
graph = new_JT.to_graph()
if graph.size() <= self.size:
return -trilearn.graph.junction_tree.log_n_junction_trees_update_ratio(new_separators,
old_JT, new_JT)
else:
return -np.inf
[docs]
class LogLinearJTPosterior(SequentialJTDistribution):
"""
Posterior for a log-linear model.
"""
[docs]
def init_model(self, X, cell_alpha, levels, cache_complete_set_prob={},
counts={}):
"""
Args:
cell_alpha: the constant number of pseudo counts for each cell
in the full distribution.
"""
self.p = len(levels)
self.levels = levels
self.cache_complete_set_prob = cache_complete_set_prob
self.cell_alpha = cell_alpha
self.data = X
self.no_levels = np.array([len(l) for l in levels])
self.counts = counts
[docs]
def init_model_from_json(self, sd_json):
self.init_model(np.array(sd_json["data"]), # TODO: Might be a bug
sd_json["parameters"]["cell_alpha"],
np.array(sd_json["parameters"]["levels"]),
cache_complete_set_prob={})
[docs]
def get_json_model(self):
return {"name": "loglin_jt_post",
"parameters": {"cell_alpha": self.cell_alpha,
"levels": [list(l) for l in self.levels]},
"data": self.data.tolist()}
[docs]
def log_likelihood(self, graph):
tree = trilearn.graph.decomposable.junction_tree(graph)
separators = tree.get_separators()
return loglin.log_likelihood_partial(tree.nodes(), separators, self.no_levels, self.cell_alpha,
self.counts, self.data, self.levels, self.cache_complete_set_prob)
[docs]
def log_ratio(self,
old_cliques,
old_separators,
new_cliques,
new_separators,
old_JT,
new_JT):
log_mu_ratio = trilearn.graph.junction_tree.log_n_junction_trees_update_ratio(new_separators,
old_JT, new_JT)
ll_ratio = self.log_likelihood_diff(old_cliques,
old_separators,
new_cliques,
new_separators,
old_JT,
new_JT)
return ll_ratio - log_mu_ratio
[docs]
def log_likelihood_diff(self, old_cliques, old_separators,
new_cliques, new_separators, old_JT, new_JT):
""" Log-likelihood difference when cliques and separators are added and
removed.
"""
old = loglin.log_likelihood_partial(old_cliques, old_separators, self.no_levels, self.cell_alpha,
self.counts, self.data, self.levels, self.cache_complete_set_prob)
new = loglin.log_likelihood_partial(new_cliques, new_separators, self.no_levels, self.cell_alpha,
self.counts, self.data, self.levels, self.cache_complete_set_prob)
return new - old
def __str__(self):
return "loglin_posterior_n_"+str(self.data.shape[0])+"_p_"+str(self.p)+"_pseudo_obs_"+str(self.cell_alpha)
[docs]
class GGMJTPosterior(SequentialJTDistribution):
""" Posterior of Junction tree for a GGM.
"""
[docs]
def init_model(self, X, D, delta, cache={}):
self.parameters = {"delta": delta,
"D": D}
self.SS = X.T * X
self.X = X
self.cache = cache
self.n = X.shape[0]
self.p = X.shape[1]
self.idmatrices = [np.identity(i) for i in range(self.p+1)]
[docs]
def init_model_from_json(self, sd_json):
self.init_model(np.asmatrix(sd_json["data"]),
np.asmatrix(sd_json["parameters"]["D"]),
sd_json["parameters"]["delta"],
{})
[docs]
def get_json_model(self):
return {"name": "ggm_jt_post",
"parameters": {"delta": self.parameters["delta"],
"D": self.parameters["D"].tolist()},
"data": self.X.tolist()}
[docs]
def log_ratio(self,
old_cliques,
old_separators,
new_cliques,
new_separators,
old_JT,
new_JT):
log_mu_ratio = trilearn.graph.junction_tree.log_n_junction_trees_update_ratio(new_separators,
old_JT, new_JT)
log_J_ratio = self.ll_diff(old_cliques,
old_separators,
new_cliques,
new_separators,
old_JT,
new_JT)
return log_J_ratio - log_mu_ratio
[docs]
def ll_diff(self,
old_cliques,
old_separators,
new_cliques,
new_separators,
old_JT,
new_JT):
old = gaussian_graphical_model.log_likelihood_partial(self.SS, self.n,
self.parameters["D"],
self.parameters["delta"],
old_cliques,
old_separators,
self.cache,
self.idmatrices)
new = gaussian_graphical_model.log_likelihood_partial(self.SS, self.n,
self.parameters["D"],
self.parameters["delta"],
new_cliques,
new_separators,
self.cache,
self.idmatrices)
return new - old
[docs]
def log_likelihood(self, graph):
return gaussian_graphical_model.log_likelihood(graph, self.SS, self.n,
self.parameters["D"],
self.parameters["delta"],
self.cache)
[docs]
def log_likelihood_partial(self, cliques, separators):
return gaussian_graphical_model.log_likelihood_partial(self.SS, self.n,
self.parameters["D"],
self.parameters["delta"],
cliques, separators, self.cache)
def __str__(self):
return "ggm_posterior_n_" + str(self.n) + "_p_" + str(self.p) + "_prior_scale_" + str(
self.parameters["delta"]) + "_shape_x"
[docs]
class UniformJTDistribution(SequentialJTDistribution):
""" A sequential formulation of P(T) = P(T|G)P(G), where
P(G)=1/(#decomopsable graphs)
and
P(T|G) = 1/(#junction trees for G).
"""
def __init__(self, p):
self.p = p
[docs]
def log_ratio(self,
old_cliques,
old_separators,
new_cliques,
new_separators,
old_JT,
new_JT):
return 0.0