Module treespace_metrics.create_trees
Expand source code
from networkx import DiGraph, all_simple_paths, add_path
from typing import List, Tuple
from treespace_metrics.utils import get_leaves, get_root, get_all_roots, path_to_edges
from treespace_metrics.drawing import draw_tree
from treespace_metrics.francis import vertex_disjoint_paths, rooted_spanning_tree
def least_common_ancestor(network: DiGraph, leaf_ending_path: list,
omnian_nodes: list) -> Tuple[str, str]:
"""
This function helps find the least common ancestor, if one exists
Args:
network: the original phylogenetic network N
leaf_ending_path: This is a path that ends in a leaf node in tree being generated
omnian_nodes: This is the omnian ending path
Returns:
Tuple: The base tree with only leaf paths, and the list of omnian paths
"""
lca = None
for omnian_path_node in omnian_nodes:
print("[LCA] finding LCA for", omnian_path_node)
for parent in network.predecessors(omnian_path_node):
print("[LCA] Checking edge", parent, omnian_path_node, "in N")
if parent in leaf_ending_path:
print("[LCA] Found LCA", parent)
lca = (parent, omnian_path_node)
break
return lca
def continue_building_tree(generated_tree: DiGraph, network: DiGraph) -> bool:
"""
This is a helper function to iter_tree
I check the current tree being made and see if I should continue trying to check the existing omnian paths to build higher
Args:
generated_tree: the tree currently being built to solve the minimum enum problem
network: the original network N
Returns:
Boolean: True - keep working on building the tree, False - stop building the tree
"""
root = get_root(network)
root_path = None
non_root_paths = []
for path in find_disjoint_paths(generated_tree, network):
if path[0] != root:
non_root_paths.append(path)
else:
root_path = path
# Print the adjacency list of the tree
print("[Checking Building Tree] Adjacency List of Tree")
for node, neighbors in generated_tree.adj.items():
print(f"{node}: {list(neighbors)}")
print("[Checking Building Tree] Root Path", root_path)
print("[Checking Building Tree] Non-Root Paths", non_root_paths)
# There has to be a path to the root already...
# if not keep searching then damn it!
if root_path is None:
return True
else:
# Check that all non-root paths are connected to the root path
for path in non_root_paths:
path_root = path[0]
parents_in_network = set(network.predecessors(path_root))
if not any(node in generated_tree for node in parents_in_network):
return True
# Otherwise, no need to continue building!
return False
def combine_paths_based_on_edge(generated_tree: DiGraph,
omnian_path: List[str],
current_leaf_path: List[str],
opl_to_ler_edge: List[str],
lca_to_opr_edge: List[str]) -> DiGraph:
"""
This is a helper function to iter_tree
To make life easier, I can use this to add a new path to an existing leaf path
For example, if I have a path
edge: 16 -> 17
(Omnian, to be added to the graph) 10 -> 12 -> 15 -> 16
(Leaf, currently on the graph) 9 -> 14 -> 17 -> 18 -> 19 -> L2
My output will be the tree will only have path
10 -> 12 -> 15 -> 16 -> 17 -> 18 -> 19 -> L2 (drops 9 and 14 to keep the path disjoint)
The function is assuming current_leaf_path ends in a leaf node
Args:
generated_tree (DiGraph): The tree being generated in `iter_tree`.
omnian_path (list): The omnian path selected that will be added to the tree as this will give most unused nodes.
current_leaf_path (list): The path that is currently in the tree, ending in a leaf at the moment.
opl_to_ler_edge (tuple): The edge to add that will connect the updating path to the current leaf path.
lca_to_opr_edge (tuple): If the omnian path has a common ancestor with the leaf-ending path, this is how to factor that in.
Returns:
DiGraph: The updated tree with the new path added.
"""
omnian_path_leaf, leaf_ending_root = opl_to_ler_edge
if lca_to_opr_edge is None:
lca, omnian_path_root = current_leaf_path[0], omnian_path[0]
else:
lca, omnian_path_root = lca_to_opr_edge
# Identify the nodes in the first path up to the start of the edge to add
omnian_nodes = omnian_path[omnian_path_root:omnian_path.index(omnian_path_leaf) + 1]
# Identify the nodes in the second path from the end of the edge to add
leaf_nodes = current_leaf_path[current_leaf_path.index(leaf_ending_root):lca]
# Combine these nodes to create the new path
new_path = omnian_nodes + leaf_nodes
print("[UPDATE_PATH] New Path based on omnians", omnian_path)
print("[UPDATE_PATH] Current Leaf Path", current_leaf_path)
print("[UPDATE_PATH] New Path", new_path)
# Remove the old paths from the graph
generated_tree.remove_nodes_from(new_path)
generated_tree.remove_nodes_from(current_leaf_path)
# Add the new path to the graph
add_path(generated_tree, new_path)
return generated_tree
def find_disjoint_paths(generated_tree: DiGraph, network: DiGraph) -> list:
"""
This is a helper function to iter_tree.
Find all disjoint paths between all pairs of nodes in the graph.
Each graph I generate will be just disjoint paths, on the final step I will join these
disjoint paths to create a tree with root rho and the same leaf set
Args:
generated_tree: the tree that is currently being built to solve the minimum enum problem
network: the original network N
Returns:
List: The list of all paths found on the tree being generated
"""
leaves = get_leaves(network)
disjoint_paths = []
for target in generated_tree.nodes():
# target is a leaf node
if generated_tree.out_degree(target) == 0:
for source in generated_tree.nodes():
if source != target:
paths = list(all_simple_paths(generated_tree, source, target))
for path in paths:
# Check if a path is not a subset of any path in disjoint_paths
if not any(set(path).issubset(set(p)) for p in disjoint_paths):
disjoint_paths.append(path)
# Check JUST IN CASE the generated tree has a path of just a stray leaf node
current_leaves_found = set()
for path in disjoint_paths:
disjoint_path_leaf = path[len(path) - 1]
current_leaves_found.add(disjoint_path_leaf)
for leaf in leaves - current_leaves_found:
disjoint_paths.append([leaf])
return disjoint_paths
def all_nodes_covered(nodes_used: dict) -> bool:
"""
Check if all nodes were covered at least once
Args:
nodes_used: a dictionary with the number of times each node was used in a tree
Returns:
Boolean: True - all nodes were covered, False - some nodes were not covered
"""
for node_usage in nodes_used.values():
if node_usage == 0:
return False
return True
def count_untouched_score_in_path(omnian_path: list, nodes_used: dict,
start='', end='', inclusive=False) -> int:
"""
This is a helper function to iter_tree.
Used within exchange argument to prioritize which path with unmatched omnian to pick
There is an option to have a start and end parameter.
Say you have [1, 2, 3, 4, 5], and start = 2 end = 5,
you will only get the number of 0's for 3, 4; this is for inclusive=False.
However, if inclusive was true, then you would get the number of 0's for 2, 3, 4, 5.
This is to make the call whether to cut or not.
Args:
omnian_path: a list of a path that does end in an omnian node
nodes_used: a dictionary with the number of times each node was used in a tree
start: the start node to consider in the path
end: the end node to consider in the path
inclusive: a boolean to determine if the start and end nodes should be included in the count
Returns:
Int: the number of untouched nodes in the specified path slice
"""
count = 0
# Determine the slice of the path to consider
if start == '' and end == '':
path_slice = omnian_path
else:
try:
if inclusive:
start_index = omnian_path.index(start)
end_index = omnian_path.index(end) + 1
else:
start_index = omnian_path.index(start) + 1
end_index = omnian_path.index(end)
path_slice = omnian_path[start_index:end_index]
except ValueError:
raise ValueError("[COUNT_UNTOUCHED] Start or end node not found in the given path")
# Count the number of untouched nodes in the specified path slice
for node in path_slice:
if nodes_used[node] == 0:
count += 1
print("[COUNT_UNTOUCHED] Path ", path_slice, "has", count, "untouched nodes, inclusive", inclusive)
return count
def prune_tree(tree: DiGraph, network: DiGraph):
"""
This is a helper function to iter_tree
This takes care of
1- making sure you only have one root, rho. It does this by connecting loose ends up to any node
2- you only have the same leaves from the original network N. It does this by removing any loose ends.
So when I occasionally change paths, just leaving a hanging omnian will be cleaned up here
Args:
tree: the result of iter_tree, but we want to comply with the same leaf and root as N
network: the original network N
"""
root = get_root(network)
leaves = get_leaves(network)
# Make sure there is only one root in the tree you are making
current_roots = get_all_roots(tree)
while len(current_roots) != 1:
roots_to_delete = set()
for temp_root in current_roots:
if temp_root == root:
continue
else:
print("[Prune Tree] An extra root was found", temp_root)
parents = list(network.predecessors(temp_root))
found_root_path = False
for parent in parents:
# Try to pick node already existing in the generated tree
# This will make it easier to not lose track of the initial disjoint paths
if parent in tree.nodes():
tree.add_edge(parent, temp_root)
found_root_path = True
print("[Prune Tree] Will remove by adding edge: ", parent, '->', temp_root)
break
# If nothing found, just pick any predecessor from N
if not found_root_path:
tree.add_edge(parents[0], temp_root)
print("[Prune Tree] Will remove by adding edge: ", parents[0], '->', temp_root)
roots_to_delete.add(temp_root)
# Delete temp_roots and check if you need to keep adding more paths up...
for remove_node in roots_to_delete:
current_roots.remove(remove_node)
current_roots = current_roots.union(get_all_roots(tree))
# Make sure there are only the specified leaves allowed in the tree you are making
current_leaves = get_leaves(tree)
while leaves != current_leaves:
leaves_to_delete = set()
for leaf in current_leaves:
if leaf not in leaves:
print("[Prune Tree] Found invalid leaf", leaf, " deleting from generated tree")
tree.remove_node(leaf)
leaves_to_delete.add(leaf)
for leaf in leaves_to_delete:
current_leaves.remove(leaf)
current_leaves = current_leaves.union(get_leaves(tree))
def iter_tree(new_tree: DiGraph, omnian_paths: List,
nodes_used: dict, network: DiGraph) -> DiGraph:
"""
Iterate Tree:
Using the input disjoint paths, create a tree with 1 root and all leaves in N.
The general gist, the new_tree should start with disjoint paths ending with leaves in N.
I want to extend the disjoint paths using the omnian paths, to reach the root.
So taking the 'root' of these leaf ending disjoint paths, there are two cases
1- There is no least common ancestor, effectively I am appending these nodes to path to reach the root
2- There is a least common ancestor, I need update the disjoint path as follows:
For Case 1, there are a few terms I should define to make this easier to understand
- Leaf Ending Path (LEP): A disjoint path that ends in a leaf node
- Omnian Path (OP): A disjoint path that ends in an omnian node
There are two nodes to focus on the LEP, the Leaf Ending Root (LER) and a Least Common Ancestor (LCA) on the LEP.
There is a possibility that the path of omnians could lead back to the LEP when checking edges on Network N,
so to preserve the disjointness, I need to find the LCA on the LEP.
A visualisation of the problem:
LCA -> [LEP Nodes] -> LER -> [LEP Nodes] -> Leaf
LCA -> OPR -> [OP Nodes] OPL -> LER
To find the LCA, I check nodes in OP if they have an edge to LEP, if so, I can find the LCA.
From this process I will define two other others:
1. Omnian Path Leaf (OPL), the leaf of the OP, there is an edge (OPL, LER) in network N
2. Omnian Path Root (OPR), the root of the OP, there is an edge (LCA, OPR) in network N
I check if LCA -> [LEP Nodes] -> LER or LCA -> OPR -> [OP Nodes] OPL -> LER gives me more unused nodes or not.
Once the decision is made, I update the generating tree to update the LEP.
Main things I need to worry about to do this correctly are:
1- Cut [LCA, ..., LER]
2- Add [LCA, OPR, ..., OPL, LER]
Main edges to worry about are (LCA, OPR) (OPL, LER) if you decide to swap!
TODO: A note to PIs, you may want to do ONE more loop of omnian paths, because you may terminate too early
and no discover paths that only become 'visible' once you reach the root.
Args:
new_tree: the base tree based from network N with only paths ending in a leaf.
omnian_paths: a list of disjoint paths in the network N where the last node is an omnian node
nodes_used: a dictionary with the number of times each node was used in a tree
network: the original network N
Returns:
Tuple: a new tree that is the output of the iteration, with updated metrics
"""
tripwire = 0
while continue_building_tree(new_tree, network):
for leaf_ending_path in find_disjoint_paths(new_tree, network):
print("[ITER] Checking Leaf Ending Path", leaf_ending_path)
# First, check the parent of the disjoint path, if it goes to another leaf path,
# no need to check this further
path_root = leaf_ending_path[0]
parents_in_network = set(network.predecessors(path_root))
if any(node in new_tree for node in parents_in_network):
print("[ITER] This path has a parent going to another leaf ending path in Network N", leaf_ending_path)
continue
else:
# I need to poke around the omnian paths to see which nodes to go up to the root
print("[ITER] This path has no parent going to another leaf ending path", leaf_ending_path)
# Iterate through all the omnian ending paths, check g.predecessors to get the viable edges
# and respective disjoint path
candidate_edges_and_paths = []
for omnian_path in omnian_paths:
print("[ITER] Checking Omnian Path", omnian_path)
for omnian_node in omnian_path:
# I want to check which omnian paths would be connected to current disjoint paths on the tree
for potential_leaf_path_node in network.successors(omnian_node):
print("[ITER] Checking if edge exists in N, [Omnian]", omnian_node, "[Leaf]", potential_leaf_path_node)
if potential_leaf_path_node in leaf_ending_path:
opl_to_ler_edge = [omnian_node, potential_leaf_path_node]
print("[ITER] Found edge to exchange in N", opl_to_ler_edge)
candidate_edges_and_paths.append((opl_to_ler_edge, omnian_path))
# If no candidate edges, continue to the next leaf ending path that should be changed up
if len(candidate_edges_and_paths) == 0:
print("[ITER] no valid edge found")
continue
# Compute which edge to pick based on the number of untouched nodes
scores = {}
edge_to_path = {}
edge_to_lca_opl_edge = {}
for opl_to_ler_edge, omnian_path in candidate_edges_and_paths:
omnian_path_leaf, leaf_ending_root = opl_to_ler_edge
lca_omnian_root_edge = least_common_ancestor(network, leaf_ending_path, omnian_path)
# Case 1- Omnian path helps extend a leaf path up, so no least common ancestor within the path
# Case 2- Omnian path has the least common ancestor within the path
if lca_omnian_root_edge is None:
omnian_score = count_untouched_score_in_path(omnian_path, nodes_used,
inclusive=True)
tree_score = 0
else:
lca, omnian_path_root = lca_omnian_root_edge
# Remember, LCA is technically ONLY part of leaf ending path, NOT omnian path!
omnian_score = count_untouched_score_in_path(omnian_path, nodes_used,
start=omnian_path_root,
end=omnian_path_leaf,
inclusive=True)
tree_score = count_untouched_score_in_path(leaf_ending_path, nodes_used,
start=lca,
end=leaf_ending_root)
# You subtract, because you gain the nodes in omnian path,
# but might lose nodes in an existing leaf ending path
total_score = omnian_score - tree_score
scores[tuple(opl_to_ler_edge)] = total_score
edge_to_path[tuple(opl_to_ler_edge)] = omnian_path
if lca_omnian_root_edge is None:
edge_to_lca_opl_edge[tuple(opl_to_ler_edge)] = []
else:
edge_to_lca_opl_edge[tuple(opl_to_ler_edge)] = lca_omnian_root_edge
print("[ITER] edge", opl_to_ler_edge, " has a score of ", total_score)
best_opl_to_ler_edge = max(scores, key=scores.get)
best_score = scores[best_opl_to_ler_edge]
if best_score <= 0:
print("[ITER] Best score is negative or zero, skipping update since you will NOT gain new nodes!")
else:
omnian_path = edge_to_path[tuple(best_opl_to_ler_edge)]
best_edge_opl_to_ler = list(best_opl_to_ler_edge)
best_edge_lca_to_opr = edge_to_lca_opl_edge[tuple(best_opl_to_ler_edge)]
print("[ITER] Best Edge to pick", best_edge_opl_to_ler, "Updating Path now!")
combine_paths_based_on_edge(new_tree,
omnian_path,
leaf_ending_path,
best_edge_opl_to_ler,
best_edge_lca_to_opr)
print("[ITER] Completed Path Update", tripwire)
tripwire += 1
if tripwire > 3:
print("[ITER] Breaking out of loop! POST MORTEM NOW!")
break
print("[ITER] Completed Disjoint path creation, now pruning tree...")
# I should not need this function tbh...
# This should only be joining the disjoint paths
prune_tree(new_tree, network)
# Update Metrics of which nodes have been used in a tree
for node in new_tree.nodes():
counter = nodes_used[node]
counter += 1
nodes_used[node] = counter
return new_tree
def initialize_enum(g: DiGraph, disjoint_paths: list) -> Tuple[DiGraph, List]:
"""
This function has two important things to do
1- Create a Tree, with just the paths to leaves
2- Return Omnian paths
Args:
g: the original phylogenetic network N
disjoint_paths: a list of disjoint paths in the network N, each path is generated from spanning tree algorithm
Returns:
Tuple: The base tree with only leaf paths, and the list of omnian paths
"""
omnian_paths = []
leaf_paths = []
base_tree = DiGraph()
leaves = get_leaves(g)
# Essentially similar to the spanning tree:
# - Only add all paths ending in a leaf
for path in disjoint_paths:
if path[len(path) - 1] in leaves:
leaf_paths.append(path)
# Add the path to the tree, if the path is just leaf node,
# this fail-safe ensures it is added
if len(path) == 1:
base_tree.add_node(path[0])
else:
add_path(base_tree, path)
else:
omnian_paths.append(path)
for path in leaf_paths:
print("[INIT] This path is OK because it ends in a leaf", path)
for path in omnian_paths:
print("[INIT] This path ends in an unmatched omnian node", path)
return base_tree, omnian_paths
def enum_trees(g: DiGraph, graph_name: str, draw=False) -> list:
"""
The main function to compute minimum number of rooted trees spanning the network N
Args:
g: the original phylogenetic network N
graph_name: the name of the graph to be drawn, based on input file name
draw: a boolean to determine if the tree should be drawn in the images/ directory
Returns:
List: a list of DiGraphs, each representing a rooted tree
"""
trees = []
# Start with getting disjoint paths and drawing it on the graph for visualization
_, paths = vertex_disjoint_paths(g)
spanning_tree = rooted_spanning_tree(g, paths)
if draw:
draw_tree(g, graph_name + '-spanning-tree', highlight_edges=spanning_tree.edges())
draw_tree(g, graph_name + '-initial-disjoint-paths', highlight_edges=path_to_edges(paths))
node_used_count = {}
for node in g.nodes():
node_used_count[node] = 0
base_tree, omnian_tuple = initialize_enum(g, paths)
# Compute a metric for each disjoint part...
while not all_nodes_covered(node_used_count):
# 1- Use disjoint paths to create a tree with only leaves L
# Be sure to update the metrics too
tree = iter_tree(base_tree.copy(as_view=False), omnian_tuple, node_used_count, g)
trees.append(tree)
if draw:
draw_tree(tree, graph_name + '-tree-number-' + str(len(trees)))
# Have a quick and easy way to break, after making 1 tree, there are bugs,
# but it is the proof of concept that matters most.
if len(trees) >= 1:
break
return trees
Functions
def all_nodes_covered(nodes_used: dict) ‑> bool
-
Check if all nodes were covered at least once
Args
nodes_used
- a dictionary with the number of times each node was used in a tree
Returns
Boolean
- True - all nodes were covered, False - some nodes were not covered
Expand source code
def all_nodes_covered(nodes_used: dict) -> bool: """ Check if all nodes were covered at least once Args: nodes_used: a dictionary with the number of times each node was used in a tree Returns: Boolean: True - all nodes were covered, False - some nodes were not covered """ for node_usage in nodes_used.values(): if node_usage == 0: return False return True
def combine_paths_based_on_edge(generated_tree: networkx.classes.digraph.DiGraph, omnian_path: List[str], current_leaf_path: List[str], opl_to_ler_edge: List[str], lca_to_opr_edge: List[str]) ‑> networkx.classes.digraph.DiGraph
-
This is a helper function to iter_tree To make life easier, I can use this to add a new path to an existing leaf path For example, if I have a path edge: 16 -> 17 (Omnian, to be added to the graph) 10 -> 12 -> 15 -> 16 (Leaf, currently on the graph) 9 -> 14 -> 17 -> 18 -> 19 -> L2 My output will be the tree will only have path 10 -> 12 -> 15 -> 16 -> 17 -> 18 -> 19 -> L2 (drops 9 and 14 to keep the path disjoint) The function is assuming current_leaf_path ends in a leaf node
Args
generated_tree
:DiGraph
- The tree being generated in
iter_tree()
. omnian_path
:list
- The omnian path selected that will be added to the tree as this will give most unused nodes.
current_leaf_path
:list
- The path that is currently in the tree, ending in a leaf at the moment.
opl_to_ler_edge
:tuple
- The edge to add that will connect the updating path to the current leaf path.
lca_to_opr_edge
:tuple
- If the omnian path has a common ancestor with the leaf-ending path, this is how to factor that in.
Returns
DiGraph
- The updated tree with the new path added.
Expand source code
def combine_paths_based_on_edge(generated_tree: DiGraph, omnian_path: List[str], current_leaf_path: List[str], opl_to_ler_edge: List[str], lca_to_opr_edge: List[str]) -> DiGraph: """ This is a helper function to iter_tree To make life easier, I can use this to add a new path to an existing leaf path For example, if I have a path edge: 16 -> 17 (Omnian, to be added to the graph) 10 -> 12 -> 15 -> 16 (Leaf, currently on the graph) 9 -> 14 -> 17 -> 18 -> 19 -> L2 My output will be the tree will only have path 10 -> 12 -> 15 -> 16 -> 17 -> 18 -> 19 -> L2 (drops 9 and 14 to keep the path disjoint) The function is assuming current_leaf_path ends in a leaf node Args: generated_tree (DiGraph): The tree being generated in `iter_tree`. omnian_path (list): The omnian path selected that will be added to the tree as this will give most unused nodes. current_leaf_path (list): The path that is currently in the tree, ending in a leaf at the moment. opl_to_ler_edge (tuple): The edge to add that will connect the updating path to the current leaf path. lca_to_opr_edge (tuple): If the omnian path has a common ancestor with the leaf-ending path, this is how to factor that in. Returns: DiGraph: The updated tree with the new path added. """ omnian_path_leaf, leaf_ending_root = opl_to_ler_edge if lca_to_opr_edge is None: lca, omnian_path_root = current_leaf_path[0], omnian_path[0] else: lca, omnian_path_root = lca_to_opr_edge # Identify the nodes in the first path up to the start of the edge to add omnian_nodes = omnian_path[omnian_path_root:omnian_path.index(omnian_path_leaf) + 1] # Identify the nodes in the second path from the end of the edge to add leaf_nodes = current_leaf_path[current_leaf_path.index(leaf_ending_root):lca] # Combine these nodes to create the new path new_path = omnian_nodes + leaf_nodes print("[UPDATE_PATH] New Path based on omnians", omnian_path) print("[UPDATE_PATH] Current Leaf Path", current_leaf_path) print("[UPDATE_PATH] New Path", new_path) # Remove the old paths from the graph generated_tree.remove_nodes_from(new_path) generated_tree.remove_nodes_from(current_leaf_path) # Add the new path to the graph add_path(generated_tree, new_path) return generated_tree
def continue_building_tree(generated_tree: networkx.classes.digraph.DiGraph, network: networkx.classes.digraph.DiGraph) ‑> bool
-
This is a helper function to iter_tree I check the current tree being made and see if I should continue trying to check the existing omnian paths to build higher
Args
generated_tree
- the tree currently being built to solve the minimum enum problem
network
- the original network N
Returns
Boolean
- True - keep working on building the tree, False - stop building the tree
Expand source code
def continue_building_tree(generated_tree: DiGraph, network: DiGraph) -> bool: """ This is a helper function to iter_tree I check the current tree being made and see if I should continue trying to check the existing omnian paths to build higher Args: generated_tree: the tree currently being built to solve the minimum enum problem network: the original network N Returns: Boolean: True - keep working on building the tree, False - stop building the tree """ root = get_root(network) root_path = None non_root_paths = [] for path in find_disjoint_paths(generated_tree, network): if path[0] != root: non_root_paths.append(path) else: root_path = path # Print the adjacency list of the tree print("[Checking Building Tree] Adjacency List of Tree") for node, neighbors in generated_tree.adj.items(): print(f"{node}: {list(neighbors)}") print("[Checking Building Tree] Root Path", root_path) print("[Checking Building Tree] Non-Root Paths", non_root_paths) # There has to be a path to the root already... # if not keep searching then damn it! if root_path is None: return True else: # Check that all non-root paths are connected to the root path for path in non_root_paths: path_root = path[0] parents_in_network = set(network.predecessors(path_root)) if not any(node in generated_tree for node in parents_in_network): return True # Otherwise, no need to continue building! return False
def count_untouched_score_in_path(omnian_path: list, nodes_used: dict, start='', end='', inclusive=False) ‑> int
-
This is a helper function to iter_tree. Used within exchange argument to prioritize which path with unmatched omnian to pick There is an option to have a start and end parameter. Say you have [1, 2, 3, 4, 5], and start = 2 end = 5, you will only get the number of 0's for 3, 4; this is for inclusive=False. However, if inclusive was true, then you would get the number of 0's for 2, 3, 4, 5. This is to make the call whether to cut or not.
Args
omnian_path
- a list of a path that does end in an omnian node
nodes_used
- a dictionary with the number of times each node was used in a tree
start
- the start node to consider in the path
end
- the end node to consider in the path
inclusive
- a boolean to determine if the start and end nodes should be included in the count
Returns
Int
- the number of untouched nodes in the specified path slice
Expand source code
def count_untouched_score_in_path(omnian_path: list, nodes_used: dict, start='', end='', inclusive=False) -> int: """ This is a helper function to iter_tree. Used within exchange argument to prioritize which path with unmatched omnian to pick There is an option to have a start and end parameter. Say you have [1, 2, 3, 4, 5], and start = 2 end = 5, you will only get the number of 0's for 3, 4; this is for inclusive=False. However, if inclusive was true, then you would get the number of 0's for 2, 3, 4, 5. This is to make the call whether to cut or not. Args: omnian_path: a list of a path that does end in an omnian node nodes_used: a dictionary with the number of times each node was used in a tree start: the start node to consider in the path end: the end node to consider in the path inclusive: a boolean to determine if the start and end nodes should be included in the count Returns: Int: the number of untouched nodes in the specified path slice """ count = 0 # Determine the slice of the path to consider if start == '' and end == '': path_slice = omnian_path else: try: if inclusive: start_index = omnian_path.index(start) end_index = omnian_path.index(end) + 1 else: start_index = omnian_path.index(start) + 1 end_index = omnian_path.index(end) path_slice = omnian_path[start_index:end_index] except ValueError: raise ValueError("[COUNT_UNTOUCHED] Start or end node not found in the given path") # Count the number of untouched nodes in the specified path slice for node in path_slice: if nodes_used[node] == 0: count += 1 print("[COUNT_UNTOUCHED] Path ", path_slice, "has", count, "untouched nodes, inclusive", inclusive) return count
def enum_trees(g: networkx.classes.digraph.DiGraph, graph_name: str, draw=False) ‑> list
-
The main function to compute minimum number of rooted trees spanning the network N
Args
g
- the original phylogenetic network N
graph_name
- the name of the graph to be drawn, based on input file name
draw
- a boolean to determine if the tree should be drawn in the images/ directory
Returns
List
- a list of DiGraphs, each representing a rooted tree
Expand source code
def enum_trees(g: DiGraph, graph_name: str, draw=False) -> list: """ The main function to compute minimum number of rooted trees spanning the network N Args: g: the original phylogenetic network N graph_name: the name of the graph to be drawn, based on input file name draw: a boolean to determine if the tree should be drawn in the images/ directory Returns: List: a list of DiGraphs, each representing a rooted tree """ trees = [] # Start with getting disjoint paths and drawing it on the graph for visualization _, paths = vertex_disjoint_paths(g) spanning_tree = rooted_spanning_tree(g, paths) if draw: draw_tree(g, graph_name + '-spanning-tree', highlight_edges=spanning_tree.edges()) draw_tree(g, graph_name + '-initial-disjoint-paths', highlight_edges=path_to_edges(paths)) node_used_count = {} for node in g.nodes(): node_used_count[node] = 0 base_tree, omnian_tuple = initialize_enum(g, paths) # Compute a metric for each disjoint part... while not all_nodes_covered(node_used_count): # 1- Use disjoint paths to create a tree with only leaves L # Be sure to update the metrics too tree = iter_tree(base_tree.copy(as_view=False), omnian_tuple, node_used_count, g) trees.append(tree) if draw: draw_tree(tree, graph_name + '-tree-number-' + str(len(trees))) # Have a quick and easy way to break, after making 1 tree, there are bugs, # but it is the proof of concept that matters most. if len(trees) >= 1: break return trees
def find_disjoint_paths(generated_tree: networkx.classes.digraph.DiGraph, network: networkx.classes.digraph.DiGraph) ‑> list
-
This is a helper function to iter_tree. Find all disjoint paths between all pairs of nodes in the graph. Each graph I generate will be just disjoint paths, on the final step I will join these disjoint paths to create a tree with root rho and the same leaf set
Args
generated_tree
- the tree that is currently being built to solve the minimum enum problem
network
- the original network N
Returns
List
- The list of all paths found on the tree being generated
Expand source code
def find_disjoint_paths(generated_tree: DiGraph, network: DiGraph) -> list: """ This is a helper function to iter_tree. Find all disjoint paths between all pairs of nodes in the graph. Each graph I generate will be just disjoint paths, on the final step I will join these disjoint paths to create a tree with root rho and the same leaf set Args: generated_tree: the tree that is currently being built to solve the minimum enum problem network: the original network N Returns: List: The list of all paths found on the tree being generated """ leaves = get_leaves(network) disjoint_paths = [] for target in generated_tree.nodes(): # target is a leaf node if generated_tree.out_degree(target) == 0: for source in generated_tree.nodes(): if source != target: paths = list(all_simple_paths(generated_tree, source, target)) for path in paths: # Check if a path is not a subset of any path in disjoint_paths if not any(set(path).issubset(set(p)) for p in disjoint_paths): disjoint_paths.append(path) # Check JUST IN CASE the generated tree has a path of just a stray leaf node current_leaves_found = set() for path in disjoint_paths: disjoint_path_leaf = path[len(path) - 1] current_leaves_found.add(disjoint_path_leaf) for leaf in leaves - current_leaves_found: disjoint_paths.append([leaf]) return disjoint_paths
def initialize_enum(g: networkx.classes.digraph.DiGraph, disjoint_paths: list) ‑> Tuple[networkx.classes.digraph.DiGraph, List]
-
This function has two important things to do 1- Create a Tree, with just the paths to leaves 2- Return Omnian paths
Args
g
- the original phylogenetic network N
disjoint_paths
- a list of disjoint paths in the network N, each path is generated from spanning tree algorithm
Returns
Tuple
- The base tree with only leaf paths, and the list of omnian paths
Expand source code
def initialize_enum(g: DiGraph, disjoint_paths: list) -> Tuple[DiGraph, List]: """ This function has two important things to do 1- Create a Tree, with just the paths to leaves 2- Return Omnian paths Args: g: the original phylogenetic network N disjoint_paths: a list of disjoint paths in the network N, each path is generated from spanning tree algorithm Returns: Tuple: The base tree with only leaf paths, and the list of omnian paths """ omnian_paths = [] leaf_paths = [] base_tree = DiGraph() leaves = get_leaves(g) # Essentially similar to the spanning tree: # - Only add all paths ending in a leaf for path in disjoint_paths: if path[len(path) - 1] in leaves: leaf_paths.append(path) # Add the path to the tree, if the path is just leaf node, # this fail-safe ensures it is added if len(path) == 1: base_tree.add_node(path[0]) else: add_path(base_tree, path) else: omnian_paths.append(path) for path in leaf_paths: print("[INIT] This path is OK because it ends in a leaf", path) for path in omnian_paths: print("[INIT] This path ends in an unmatched omnian node", path) return base_tree, omnian_paths
def iter_tree(new_tree: networkx.classes.digraph.DiGraph, omnian_paths: List, nodes_used: dict, network: networkx.classes.digraph.DiGraph) ‑> networkx.classes.digraph.DiGraph
-
Iterate Tree: Using the input disjoint paths, create a tree with 1 root and all leaves in N. The general gist, the new_tree should start with disjoint paths ending with leaves in N. I want to extend the disjoint paths using the omnian paths, to reach the root.
So taking the 'root' of these leaf ending disjoint paths, there are two cases 1- There is no least common ancestor, effectively I am appending these nodes to path to reach the root 2- There is a least common ancestor, I need update the disjoint path as follows:
For Case 1, there are a few terms I should define to make this easier to understand - Leaf Ending Path (LEP): A disjoint path that ends in a leaf node - Omnian Path (OP): A disjoint path that ends in an omnian node There are two nodes to focus on the LEP, the Leaf Ending Root (LER) and a Least Common Ancestor (LCA) on the LEP. There is a possibility that the path of omnians could lead back to the LEP when checking edges on Network N, so to preserve the disjointness, I need to find the LCA on the LEP.
A visualisation of the problem: LCA -> [LEP Nodes] -> LER -> [LEP Nodes] -> Leaf LCA -> OPR -> [OP Nodes] OPL -> LER
To find the LCA, I check nodes in OP if they have an edge to LEP, if so, I can find the LCA. From this process I will define two other others: 1. Omnian Path Leaf (OPL), the leaf of the OP, there is an edge (OPL, LER) in network N 2. Omnian Path Root (OPR), the root of the OP, there is an edge (LCA, OPR) in network N
I check if LCA -> [LEP Nodes] -> LER or LCA -> OPR -> [OP Nodes] OPL -> LER gives me more unused nodes or not. Once the decision is made, I update the generating tree to update the LEP.
Main things I need to worry about to do this correctly are: 1- Cut [LCA, …, LER] 2- Add [LCA, OPR, …, OPL, LER] Main edges to worry about are (LCA, OPR) (OPL, LER) if you decide to swap!
TODO: A note to PIs, you may want to do ONE more loop of omnian paths, because you may terminate too early and no discover paths that only become 'visible' once you reach the root.
Args
new_tree
- the base tree based from network N with only paths ending in a leaf.
omnian_paths
- a list of disjoint paths in the network N where the last node is an omnian node
nodes_used
- a dictionary with the number of times each node was used in a tree
network
- the original network N
Returns
Tuple
- a new tree that is the output of the iteration, with updated metrics
Expand source code
def iter_tree(new_tree: DiGraph, omnian_paths: List, nodes_used: dict, network: DiGraph) -> DiGraph: """ Iterate Tree: Using the input disjoint paths, create a tree with 1 root and all leaves in N. The general gist, the new_tree should start with disjoint paths ending with leaves in N. I want to extend the disjoint paths using the omnian paths, to reach the root. So taking the 'root' of these leaf ending disjoint paths, there are two cases 1- There is no least common ancestor, effectively I am appending these nodes to path to reach the root 2- There is a least common ancestor, I need update the disjoint path as follows: For Case 1, there are a few terms I should define to make this easier to understand - Leaf Ending Path (LEP): A disjoint path that ends in a leaf node - Omnian Path (OP): A disjoint path that ends in an omnian node There are two nodes to focus on the LEP, the Leaf Ending Root (LER) and a Least Common Ancestor (LCA) on the LEP. There is a possibility that the path of omnians could lead back to the LEP when checking edges on Network N, so to preserve the disjointness, I need to find the LCA on the LEP. A visualisation of the problem: LCA -> [LEP Nodes] -> LER -> [LEP Nodes] -> Leaf LCA -> OPR -> [OP Nodes] OPL -> LER To find the LCA, I check nodes in OP if they have an edge to LEP, if so, I can find the LCA. From this process I will define two other others: 1. Omnian Path Leaf (OPL), the leaf of the OP, there is an edge (OPL, LER) in network N 2. Omnian Path Root (OPR), the root of the OP, there is an edge (LCA, OPR) in network N I check if LCA -> [LEP Nodes] -> LER or LCA -> OPR -> [OP Nodes] OPL -> LER gives me more unused nodes or not. Once the decision is made, I update the generating tree to update the LEP. Main things I need to worry about to do this correctly are: 1- Cut [LCA, ..., LER] 2- Add [LCA, OPR, ..., OPL, LER] Main edges to worry about are (LCA, OPR) (OPL, LER) if you decide to swap! TODO: A note to PIs, you may want to do ONE more loop of omnian paths, because you may terminate too early and no discover paths that only become 'visible' once you reach the root. Args: new_tree: the base tree based from network N with only paths ending in a leaf. omnian_paths: a list of disjoint paths in the network N where the last node is an omnian node nodes_used: a dictionary with the number of times each node was used in a tree network: the original network N Returns: Tuple: a new tree that is the output of the iteration, with updated metrics """ tripwire = 0 while continue_building_tree(new_tree, network): for leaf_ending_path in find_disjoint_paths(new_tree, network): print("[ITER] Checking Leaf Ending Path", leaf_ending_path) # First, check the parent of the disjoint path, if it goes to another leaf path, # no need to check this further path_root = leaf_ending_path[0] parents_in_network = set(network.predecessors(path_root)) if any(node in new_tree for node in parents_in_network): print("[ITER] This path has a parent going to another leaf ending path in Network N", leaf_ending_path) continue else: # I need to poke around the omnian paths to see which nodes to go up to the root print("[ITER] This path has no parent going to another leaf ending path", leaf_ending_path) # Iterate through all the omnian ending paths, check g.predecessors to get the viable edges # and respective disjoint path candidate_edges_and_paths = [] for omnian_path in omnian_paths: print("[ITER] Checking Omnian Path", omnian_path) for omnian_node in omnian_path: # I want to check which omnian paths would be connected to current disjoint paths on the tree for potential_leaf_path_node in network.successors(omnian_node): print("[ITER] Checking if edge exists in N, [Omnian]", omnian_node, "[Leaf]", potential_leaf_path_node) if potential_leaf_path_node in leaf_ending_path: opl_to_ler_edge = [omnian_node, potential_leaf_path_node] print("[ITER] Found edge to exchange in N", opl_to_ler_edge) candidate_edges_and_paths.append((opl_to_ler_edge, omnian_path)) # If no candidate edges, continue to the next leaf ending path that should be changed up if len(candidate_edges_and_paths) == 0: print("[ITER] no valid edge found") continue # Compute which edge to pick based on the number of untouched nodes scores = {} edge_to_path = {} edge_to_lca_opl_edge = {} for opl_to_ler_edge, omnian_path in candidate_edges_and_paths: omnian_path_leaf, leaf_ending_root = opl_to_ler_edge lca_omnian_root_edge = least_common_ancestor(network, leaf_ending_path, omnian_path) # Case 1- Omnian path helps extend a leaf path up, so no least common ancestor within the path # Case 2- Omnian path has the least common ancestor within the path if lca_omnian_root_edge is None: omnian_score = count_untouched_score_in_path(omnian_path, nodes_used, inclusive=True) tree_score = 0 else: lca, omnian_path_root = lca_omnian_root_edge # Remember, LCA is technically ONLY part of leaf ending path, NOT omnian path! omnian_score = count_untouched_score_in_path(omnian_path, nodes_used, start=omnian_path_root, end=omnian_path_leaf, inclusive=True) tree_score = count_untouched_score_in_path(leaf_ending_path, nodes_used, start=lca, end=leaf_ending_root) # You subtract, because you gain the nodes in omnian path, # but might lose nodes in an existing leaf ending path total_score = omnian_score - tree_score scores[tuple(opl_to_ler_edge)] = total_score edge_to_path[tuple(opl_to_ler_edge)] = omnian_path if lca_omnian_root_edge is None: edge_to_lca_opl_edge[tuple(opl_to_ler_edge)] = [] else: edge_to_lca_opl_edge[tuple(opl_to_ler_edge)] = lca_omnian_root_edge print("[ITER] edge", opl_to_ler_edge, " has a score of ", total_score) best_opl_to_ler_edge = max(scores, key=scores.get) best_score = scores[best_opl_to_ler_edge] if best_score <= 0: print("[ITER] Best score is negative or zero, skipping update since you will NOT gain new nodes!") else: omnian_path = edge_to_path[tuple(best_opl_to_ler_edge)] best_edge_opl_to_ler = list(best_opl_to_ler_edge) best_edge_lca_to_opr = edge_to_lca_opl_edge[tuple(best_opl_to_ler_edge)] print("[ITER] Best Edge to pick", best_edge_opl_to_ler, "Updating Path now!") combine_paths_based_on_edge(new_tree, omnian_path, leaf_ending_path, best_edge_opl_to_ler, best_edge_lca_to_opr) print("[ITER] Completed Path Update", tripwire) tripwire += 1 if tripwire > 3: print("[ITER] Breaking out of loop! POST MORTEM NOW!") break print("[ITER] Completed Disjoint path creation, now pruning tree...") # I should not need this function tbh... # This should only be joining the disjoint paths prune_tree(new_tree, network) # Update Metrics of which nodes have been used in a tree for node in new_tree.nodes(): counter = nodes_used[node] counter += 1 nodes_used[node] = counter return new_tree
def least_common_ancestor(network: networkx.classes.digraph.DiGraph, leaf_ending_path: list, omnian_nodes: list) ‑> Tuple[str, str]
-
This function helps find the least common ancestor, if one exists
Args
network
- the original phylogenetic network N
leaf_ending_path
- This is a path that ends in a leaf node in tree being generated
omnian_nodes
- This is the omnian ending path
Returns
Tuple
- The base tree with only leaf paths, and the list of omnian paths
Expand source code
def least_common_ancestor(network: DiGraph, leaf_ending_path: list, omnian_nodes: list) -> Tuple[str, str]: """ This function helps find the least common ancestor, if one exists Args: network: the original phylogenetic network N leaf_ending_path: This is a path that ends in a leaf node in tree being generated omnian_nodes: This is the omnian ending path Returns: Tuple: The base tree with only leaf paths, and the list of omnian paths """ lca = None for omnian_path_node in omnian_nodes: print("[LCA] finding LCA for", omnian_path_node) for parent in network.predecessors(omnian_path_node): print("[LCA] Checking edge", parent, omnian_path_node, "in N") if parent in leaf_ending_path: print("[LCA] Found LCA", parent) lca = (parent, omnian_path_node) break return lca
def prune_tree(tree: networkx.classes.digraph.DiGraph, network: networkx.classes.digraph.DiGraph)
-
This is a helper function to iter_tree This takes care of 1- making sure you only have one root, rho. It does this by connecting loose ends up to any node 2- you only have the same leaves from the original network N. It does this by removing any loose ends. So when I occasionally change paths, just leaving a hanging omnian will be cleaned up here
Args
tree
- the result of iter_tree, but we want to comply with the same leaf and root as N
network
- the original network N
Expand source code
def prune_tree(tree: DiGraph, network: DiGraph): """ This is a helper function to iter_tree This takes care of 1- making sure you only have one root, rho. It does this by connecting loose ends up to any node 2- you only have the same leaves from the original network N. It does this by removing any loose ends. So when I occasionally change paths, just leaving a hanging omnian will be cleaned up here Args: tree: the result of iter_tree, but we want to comply with the same leaf and root as N network: the original network N """ root = get_root(network) leaves = get_leaves(network) # Make sure there is only one root in the tree you are making current_roots = get_all_roots(tree) while len(current_roots) != 1: roots_to_delete = set() for temp_root in current_roots: if temp_root == root: continue else: print("[Prune Tree] An extra root was found", temp_root) parents = list(network.predecessors(temp_root)) found_root_path = False for parent in parents: # Try to pick node already existing in the generated tree # This will make it easier to not lose track of the initial disjoint paths if parent in tree.nodes(): tree.add_edge(parent, temp_root) found_root_path = True print("[Prune Tree] Will remove by adding edge: ", parent, '->', temp_root) break # If nothing found, just pick any predecessor from N if not found_root_path: tree.add_edge(parents[0], temp_root) print("[Prune Tree] Will remove by adding edge: ", parents[0], '->', temp_root) roots_to_delete.add(temp_root) # Delete temp_roots and check if you need to keep adding more paths up... for remove_node in roots_to_delete: current_roots.remove(remove_node) current_roots = current_roots.union(get_all_roots(tree)) # Make sure there are only the specified leaves allowed in the tree you are making current_leaves = get_leaves(tree) while leaves != current_leaves: leaves_to_delete = set() for leaf in current_leaves: if leaf not in leaves: print("[Prune Tree] Found invalid leaf", leaf, " deleting from generated tree") tree.remove_node(leaf) leaves_to_delete.add(leaf) for leaf in leaves_to_delete: current_leaves.remove(leaf) current_leaves = current_leaves.union(get_leaves(tree))