import numpy as np
import pytest
from pytest import approx
from beliefs.inference.belief_propagation import BeliefPropagation, ConflictingEvidenceError
from beliefs.factors.cpd import TabularCPD
from beliefs.models.belief_update_node_model import (
BeliefUpdateNodeModel,
BernoulliOrNode,
BernoulliAndNode,
Node
)
@pytest.fixture(scope='module')
def edges_five_nodes():
"""Edges define a polytree with 5 nodes (connected in an X-shape with the
node, 'x', at the center of the X."""
edges = [('u', 'x'), ('v', 'x'), ('x', 'y'), ('x', 'z')]
return edges
@pytest.fixture(scope='module')
def simple_edges():
"""Edges define a polytree with 15 nodes."""
edges = [('1', '3'), ('2', '3'), ('3', '5'), ('4', '5'), ('5', '10'),
('5', '9'), ('6', '8'), ('7', '8'), ('8', '9'), ('9', '11'),
('9', 'x'), ('14', 'x'), ('x', '12'), ('x', '13')]
return edges
@pytest.fixture(scope='module')
def many_parents_edges():
"""Node 62 has 18 parents and no children."""
edges = [('96', '62'), ('80', '62'), ('98', '62'),
('100', '62'), ('86', '62'), ('102', '62'), ('104', '62'),
('64', '62'), ('106', '62'), ('108', '62'), ('110', '62'),
('112', '62'), ('114', '62'), ('116', '62'), ('118', '62'),
('122', '62'), ('70', '62'), ('94', '62')]
return edges
@pytest.fixture(scope='function')
def five_node_model(edges_five_nodes):
return BeliefUpdateNodeModel.init_from_edges(edges_five_nodes, BernoulliOrNode)
@pytest.fixture(scope='function')
def simple_model(simple_edges):
return BeliefUpdateNodeModel.init_from_edges(simple_edges, BernoulliOrNode)
@pytest.fixture(scope='function')
def many_parents_model(many_parents_edges):
return BeliefUpdateNodeModel.init_from_edges(many_parents_edges, BernoulliOrNode)
@pytest.fixture(scope='function')
def many_parents_and_model(many_parents_edges):
return BeliefUpdateNodeModel.init_from_edges(many_parents_edges, BernoulliAndNode)
@pytest.fixture(scope='function')
def one_node_model():
a_node = BernoulliOrNode(label_id='x', children=[], parents=[])
return BeliefUpdateNodeModel(nodes_dict={'x': a_node})
@pytest.fixture(scope='function')
def five_node_and_model(edges_five_nodes):
return BeliefUpdateNodeModel.init_from_edges(edges_five_nodes, BernoulliAndNode)
@pytest.fixture(scope='function')
def mixed_cpd_model(edges_five_nodes):
"""
X-shaped 5 node model plus one more node, 'w', with edge from 'w' to 'z'.
'z' is an AND node while all other nodes are OR nodes.
"""
u_node = BernoulliOrNode(label_id='u', children=['x'], parents=[])
v_node = BernoulliOrNode(label_id='v', children=['x'], parents=[])
x_node = BernoulliOrNode(label_id='x', children=['y', 'z'], parents=['u', 'v'])
y_node = BernoulliOrNode(label_id='y', children=[], parents=['x'])
z_node = BernoulliAndNode(label_id='z', children=[], parents=['x', 'w'])
w_node = BernoulliOrNode(label_id='w', children=['z'], parents=[])
return BeliefUpdateNodeModel(nodes_dict={'u': u_node,
'v': v_node,
'x': x_node,
'y': y_node,
'z': z_node,
'w': w_node})
@pytest.fixture(scope='function')
def custom_cpd_model():
"""
Y-shaped model, with parents ,'u' and 'v' as Or-nodes, 'x' a node with
cardinality 3 and custom CPD, 'y' a node with cardinality 2 and custom CPD.
"""
custom_cpd_x = TabularCPD(variable='x',
variable_card=3,
parents=['u', 'v'],
parents_card=[2, 2],
values=[[0.2, 0, 0.3, 0.1],
[0.4, 1, 0.7, 0.2],
[0.4, 0, 0, 0.7]],
state_names={'x': ['lo', 'med', 'hi'],
'u': ['False', 'True'],
'v': ['False', 'True']})
custom_cpd_y = TabularCPD(variable='y',
variable_card=2,
parents=['x'],
parents_card=[3],
values=[[0.3, 0.1, 0],
[0.7, 0.9, 1]],
state_names={'x': ['lo', 'med', 'hi'],
'y': ['False', 'True']})
u_node = BernoulliOrNode(label_id='u', children=['x'], parents=[])
v_node = BernoulliOrNode(label_id='v', children=['x'], parents=[])
x_node = Node(children=['y'], cpd=custom_cpd_x)
y_node = Node(children=[], cpd=custom_cpd_y)
return BeliefUpdateNodeModel(nodes_dict={'u': u_node,
'v': v_node,
'x': x_node,
'y': y_node})
def get_label_mapped_to_positive_belief(query_result):
"""Return a dictionary mapping each label_id to the probability of
the label being True."""
return {label_id: belief[1] for label_id, belief in query_result.items()}
def compare_dictionaries(expected, observed):
for key, expected_value in expected.items():
observed_value = observed.get(key)
if observed_value is None:
raise KeyError("Expected key {} not in observed.")
assert observed_value == approx(expected_value), \
"Expected {} but got {}".format(expected_value, observed_value)
#==============================================================================================
# Tests of single Bernoulli node model
def test_no_evidence_one_node_model(one_node_model):
expected = {'x': 0.5}
infer = BeliefPropagation(one_node_model)
query_result = infer.query(evidence={})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_virtual_evidence_one_node_model(one_node_model):
"""Curator thinks YES is 10x more likely than NO based on virtual evidence."""
expected = {'x': 5/(0.5+5)}
infer = BeliefPropagation(one_node_model)
query_result = infer.query(evidence={'x': np.array([1, 10])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_MAYBE_default_evidence_one_node_model(one_node_model):
expected = {'x': 0.5}
infer = BeliefPropagation(one_node_model)
query_result = infer.query(evidence={'x': np.array([0.5, 0.5])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_YES_evidence_one_node_model(one_node_model):
expected = {'x': 1}
infer = BeliefPropagation(one_node_model)
query_result = infer.query(evidence={'x': np.array([0, 1])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_NO_evidence_one_node_model(one_node_model):
expected = {'x': 0}
infer = BeliefPropagation(one_node_model)
query_result = infer.query(evidence={'x': np.array([1, 0])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
#==============================================================================================
# Tests of 5-node, 4-edge model
def test_no_evidence_five_node_model(five_node_model):
expected = {'x': 1-0.5**2}
infer = BeliefPropagation(five_node_model)
query_result = infer.query(evidence={})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_virtual_evidence_for_node_x_five_node_model(five_node_model):
"""Virtual evidence for node x."""
expected = {'x': 0.967741935483871, 'y': 0.967741935483871, 'z': 0.967741935483871,
'u': 0.6451612903225806, 'v': 0.6451612903225806}
infer = BeliefPropagation(five_node_model)
query_result = infer.query(evidence={'x': np.array([1, 10])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
#==============================================================================================
# Tests of 5-node, 4-edge model with AND cpds
def test_no_evidence_five_node_and_model(five_node_and_model):
expected = {'x': 0.5**2}
infer = BeliefPropagation(five_node_and_model)
query_result = infer.query(evidence={})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_one_parent_false_five_node_and_model(five_node_and_model):
expected = {'x': 0}
infer = BeliefPropagation(five_node_and_model)
query_result = infer.query(evidence={'u': np.array([1,0])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_one_parent_true_five_node_and_model(five_node_and_model):
expected = {'x': 0.5}
infer = BeliefPropagation(five_node_and_model)
query_result = infer.query(evidence={'u': np.array([0,1])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_both_parents_true_five_node_and_model(five_node_and_model):
expected = {'x': 1, 'y': 1, 'z': 1}
infer = BeliefPropagation(five_node_and_model)
query_result = infer.query(evidence={'u': np.array([0,1]), 'v': np.array([0,1])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
#==============================================================================================
# Tests of mixed cpd model (all CPDs are OR, except for one AND node with 2 parents)
def test_no_evidence_mixed_cpd_model(mixed_cpd_model):
expected = {'x': 1-0.5**2, 'z': 0.5*(1-0.5**2)}
infer = BeliefPropagation(mixed_cpd_model)
query_result = infer.query(evidence={})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_x_false_w_true_mixed_cpd_model(mixed_cpd_model):
expected = {'u': 0, 'v': 0, 'y': 0, 'z': 0}
infer = BeliefPropagation(mixed_cpd_model)
query_result = infer.query(evidence={'x': np.array([1,0]), 'w': np.array([0,1])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_x_true_w_true_mixed_cpd_model(mixed_cpd_model):
expected = {'y': 1, 'z': 1}
infer = BeliefPropagation(mixed_cpd_model)
query_result = infer.query(evidence={'x': np.array([0,1]), 'w': np.array([0,1])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
#==============================================================================================
# Tests of simple BernoulliOr polytree model
def test_no_evidence_simple_model(simple_model):
expected = {'x': 0.984375, '14': 0.5, '7': 0.5, '2': 0.5, '3':
0.75, '13': 0.984375, '6': 0.5, '4': 0.5, '8': 0.75, '10': 0.875,
'1': 0.5, '9': 0.96875, '12': 0.984375, '5': 0.875, '11': 0.96875}
infer = BeliefPropagation(simple_model)
query_result = infer.query(evidence={})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_belief_propagation_no_modify_model_inplace(simple_model):
assert not simple_model.all_nodes_are_fully_initialized
infer = BeliefPropagation(simple_model, inplace=False)
_ = infer.query(evidence={})
# after belief propagation, model node values should be unchanged
assert not simple_model.all_nodes_are_fully_initialized
def test_belief_propagation_modify_model_inplace(simple_model):
assert not simple_model.all_nodes_are_fully_initialized
expected = {'x': 0.984375, '14': 0.5, '7': 0.5, '2': 0.5, '3':
0.75, '13': 0.984375, '6': 0.5, '4': 0.5, '8': 0.75, '10': 0.875,
'1': 0.5, '9': 0.96875, '12': 0.984375, '5': 0.875, '11': 0.96875}
infer = BeliefPropagation(simple_model, inplace=True)
_ = infer.query(evidence={})
assert simple_model.all_nodes_are_fully_initialized
beliefs_from_model = {node_id: node.belief[1] for
node_id, node in simple_model.nodes_dict.items()}
compare_dictionaries(expected, beliefs_from_model)
def test_positive_evidence_node_13(simple_model):
expected = {'6': 0.50793650793650791, '3': 0.76190476190476186,
'9': 0.98412698412698407, '8': 0.76190476190476186,
'x': 1.0, '4': 0.50793650793650791, '11': 0.98412698412698407,
'1': 0.50793650793650791, '5': 0.88888888888888884,
'2': 0.50793650793650791, '12': 1.0,
'14': 0.50793650793650791, '13': 1,
'10': 0.88888888888888884, '7': 0.50793650793650791}
infer = BeliefPropagation(simple_model)
query_result = infer.query(evidence={'13': np.array([0, 1])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_positive_evidence_node_5(simple_model):
expected = {'1': 0.5714285714285714, '5': 1, '3':
0.8571428571428571, '10': 1.0, '8': 0.75, '2': 0.5714285714285714,
'4': 0.5714285714285714, '6': 0.5, '7': 0.5, '14': 0.5, '12': 1.0,
'13': 1.0, '11': 1.0, '9': 1.0, 'x': 1.0}
infer = BeliefPropagation(simple_model)
query_result = infer.query(evidence={'5': np.array([0, 1])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_positive_evidence_node_5_negative_evidence_node_14(simple_model):
expected = {'6': 0.5, '7': 0.5, '9': 1.0, '3': 0.8571428571428571,
'1': 0.57142857142857151, '12': 1.0, 'x': 1.0, '11': 1.0, '14':
0.0, '2': 0.57142857142857151, '4': 0.5714285714285714, '5': 1.0,
'10': 1.0, '13': 1.0, '8': 0.75}
infer = BeliefPropagation(simple_model)
query_result = infer.query(evidence={'5': np.array([0, 1]), '14': np.array([1, 0])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_conflicting_evidence(simple_model):
infer = BeliefPropagation(simple_model)
with pytest.raises(ConflictingEvidenceError) as err:
query_result = infer.query(evidence={'x': np.array([1, 0]), '5': np.array([0, 1])})
assert "Can't run belief propagation with conflicting evidence" in str(err)
#==============================================================================================
# Tests of model with 18 parents sharing a single child
def test_no_evidence_many_parents_model(many_parents_model):
expected = {'64': 0.5, '86': 0.5, '62': 0.99999618530273438,
'116': 0.5, '100': 0.5, '108': 0.5, '122': 0.5, '114': 0.5, '98':
0.5, '106': 0.5, '94': 0.5, '80': 0.5, '102': 0.5, '70': 0.5,
'118': 0.5, '96': 0.5, '104': 0.5, '110': 0.5, '112': 0.5}
infer = BeliefPropagation(many_parents_model)
query_result = infer.query(evidence={})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_positive_evidence_node_112(many_parents_model):
"""If a single parent (112) is True, then (62) has to be True."""
expected = {'64': 0.5, '86': 0.5, '62': 1.0, '116': 0.5, '100':
0.5, '108': 0.5, '122': 0.5, '114': 0.5, '98': 0.5,
'106': 0.5, '94': 0.5, '80': 0.5, '102': 0.5, '70':
0.5, '118': 0.5, '96': 0.5, '104': 0.5, '110': 0.5,
'112': 1.0}
infer = BeliefPropagation(many_parents_model)
query_result = infer.query(evidence={'112': np.array([0, 1])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_negative_evidence_node_62(many_parents_model):
"""If node 62 is False, then all of its parents must be False."""
expected = {'64': 0, '86': 0, '62': 0, '116': 0, '100': 0, '108':
0, '122': 0, '114': 0, '98': 0, '106': 0, '94': 0,
'80': 0, '102': 0, '70': 0, '118': 0, '96': 0, '104':
0, '110': 0, '112': 0}
infer = BeliefPropagation(many_parents_model)
query_result = infer.query(evidence={'62': np.array([1, 0])})
result = get_label_mapped_to_positive_belief(query_result)
compare_dictionaries(expected, result)
def test_conflicting_evidence_and_model(many_parents_and_model):
"""If one of the parents of node 62 is False, then node 62 has to be False."""
infer = BeliefPropagation(many_parents_and_model)
with pytest.raises(ConflictingEvidenceError) as err:
query_result = infer.query(evidence={'62': np.array([0, 1]), '112': np.array([1, 0])})
assert "Can't run belief propagation with conflicting evidence" in str(err)
#==============================================================================================
# Model with two custom cpds
def test_no_evidence_custom_cpd_model(custom_cpd_model):
expected = {'x': np.array([0.15, 0.575, 0.275]),
'v': np.array([0.5, 0.5]),
'u': np.array([0.5, 0.5]),
'y': np.array([0.1025, 0.8975])}
infer = BeliefPropagation(custom_cpd_model)
query_result = infer.query(evidence={})
compare_dictionaries(expected, query_result)
def test_evidence_custom_cpd_model(custom_cpd_model):
"""Custom node is observed to be in 'med' state."""
expected = {'x': np.array([0., 1., 0.]),
'u': np.array([0.60869565, 0.39130435]),
'v': np.array([0.47826087, 0.52173913]),
'y': np.array([0.1, 0.9])}
infer = BeliefPropagation(custom_cpd_model)
query_result = infer.query(evidence={'x': np.array([0, 1, 0])})
compare_dictionaries(expected, query_result)