Module treespace_metrics.max_cst

Expand source code
from networkx import DiGraph
from networkx.algorithms.flow import min_cost_flow
import platform

from treespace_metrics.utils import get_leaves
from treespace_metrics.francis import build_path, rooted_spanning_tree
from treespace_metrics.drawing import draw_tree

plt = platform.system()


def maximum_covering_subtree(network: DiGraph, name=None, draw=False) -> [DiGraph, int]:
    """
    Implements the algorithm described in 'Maximum Covering Subtrees for Phylogenetic Networks' by Davidov et al.
    Finds the minimum number of nodes to cut to make a network tree-based.

    Args:
        network (DiGraph): The phylogenetic network as a directed graph.
        name (str, optional): The name of the file to save the output image. Defaults to None.
        draw (bool, optional): Whether to generate and save an image of the network. Defaults to False.

    Returns:
        tuple[DiGraph, int]: A tuple containing the tree-based network and the number of nodes removed.
    """
    leaves = get_leaves(network)
    # Build min-cost flow network
    # Create V_in and V_out node for each
    f = create_flow_network(network, leaves)
    flows = min_cost_flow(f)
    start = []
    matches = []
    for src, flow in flows.items():
        if src == "s":
            for destination_node, value in flow.items():
                if value == 1:
                    dst_node_id = destination_node[2:]
                    start.append(dst_node_id)
        elif src.startswith('o-'):
            for destination_node, value in flow.items():
                if destination_node == "t":
                    continue
                if value == 1:
                    src_node_id = src[2:]
                    dst_node_id = destination_node[2:]
                    matches.append((src_node_id, dst_node_id))
                    # print(str(src_node_id) + "," + str(dst_node_id))
    paths = []
    for u in start:
        # You do delete matches, so you need to pass a copy
        p = build_path(u, matches.copy())
        paths.append(p)

    # Build rooted Spanning Tree
    tree_based_network = rooted_spanning_tree(network, paths)
    diff = set(network.nodes()) - set(tree_based_network.nodes())
    n = len(diff)

    if draw:
        if name is None:
            draw_tree(network, "original network")
            draw_tree(tree_based_network, "tree-based network")
        else:
            draw_tree(network, name)
            draw_tree(tree_based_network, name + "-MAX-CST")
    return tree_based_network, n


def create_flow_network(network: DiGraph, leaves: list) -> DiGraph:
    """
    Generates the flow network required to compute the minimum number of nodes to cut.

    Args:
        network (DiGraph): The phylogenetic network as a directed graph.
        leaves (list): A list of leaf nodes in the network.

    Returns:
        DiGraph: A directed graph representing the flow network.
    """
    f = DiGraph()
    f.add_node('s', demand=-len(leaves))
    f.add_node('t', demand=len(leaves))
    # Set-up all the nodes
    for node in network.nodes():
        f.add_node("i-" + str(node))
        f.add_node("o-" + str(node))
        f.add_edge("i-" + str(node), "o-" + str(node), capacity=1, weight=-1)
        f.add_edge('s', "i-" + str(node), capacity=1, weight=0)
        if node in leaves:
            f.add_edge("o-" + str(node), 't', capacity=1, weight=0)

    # Set-up all the edges
    for edge in network.edges():
        f.add_edge("o-" + str(edge[0]), "i-" + str(edge[1]), capacity=1, weight=0)
    return f

Functions

def create_flow_network(network: networkx.classes.digraph.DiGraph, leaves: list) ‑> networkx.classes.digraph.DiGraph

Generates the flow network required to compute the minimum number of nodes to cut.

Args

network : DiGraph
The phylogenetic network as a directed graph.
leaves : list
A list of leaf nodes in the network.

Returns

DiGraph
A directed graph representing the flow network.
Expand source code
def create_flow_network(network: DiGraph, leaves: list) -> DiGraph:
    """
    Generates the flow network required to compute the minimum number of nodes to cut.

    Args:
        network (DiGraph): The phylogenetic network as a directed graph.
        leaves (list): A list of leaf nodes in the network.

    Returns:
        DiGraph: A directed graph representing the flow network.
    """
    f = DiGraph()
    f.add_node('s', demand=-len(leaves))
    f.add_node('t', demand=len(leaves))
    # Set-up all the nodes
    for node in network.nodes():
        f.add_node("i-" + str(node))
        f.add_node("o-" + str(node))
        f.add_edge("i-" + str(node), "o-" + str(node), capacity=1, weight=-1)
        f.add_edge('s', "i-" + str(node), capacity=1, weight=0)
        if node in leaves:
            f.add_edge("o-" + str(node), 't', capacity=1, weight=0)

    # Set-up all the edges
    for edge in network.edges():
        f.add_edge("o-" + str(edge[0]), "i-" + str(edge[1]), capacity=1, weight=0)
    return f
def maximum_covering_subtree(network: networkx.classes.digraph.DiGraph, name=None, draw=False) ‑> []

Implements the algorithm described in 'Maximum Covering Subtrees for Phylogenetic Networks' by Davidov et al. Finds the minimum number of nodes to cut to make a network tree-based.

Args

network : DiGraph
The phylogenetic network as a directed graph.
name : str, optional
The name of the file to save the output image. Defaults to None.
draw : bool, optional
Whether to generate and save an image of the network. Defaults to False.

Returns

tuple[DiGraph, int]
A tuple containing the tree-based network and the number of nodes removed.
Expand source code
def maximum_covering_subtree(network: DiGraph, name=None, draw=False) -> [DiGraph, int]:
    """
    Implements the algorithm described in 'Maximum Covering Subtrees for Phylogenetic Networks' by Davidov et al.
    Finds the minimum number of nodes to cut to make a network tree-based.

    Args:
        network (DiGraph): The phylogenetic network as a directed graph.
        name (str, optional): The name of the file to save the output image. Defaults to None.
        draw (bool, optional): Whether to generate and save an image of the network. Defaults to False.

    Returns:
        tuple[DiGraph, int]: A tuple containing the tree-based network and the number of nodes removed.
    """
    leaves = get_leaves(network)
    # Build min-cost flow network
    # Create V_in and V_out node for each
    f = create_flow_network(network, leaves)
    flows = min_cost_flow(f)
    start = []
    matches = []
    for src, flow in flows.items():
        if src == "s":
            for destination_node, value in flow.items():
                if value == 1:
                    dst_node_id = destination_node[2:]
                    start.append(dst_node_id)
        elif src.startswith('o-'):
            for destination_node, value in flow.items():
                if destination_node == "t":
                    continue
                if value == 1:
                    src_node_id = src[2:]
                    dst_node_id = destination_node[2:]
                    matches.append((src_node_id, dst_node_id))
                    # print(str(src_node_id) + "," + str(dst_node_id))
    paths = []
    for u in start:
        # You do delete matches, so you need to pass a copy
        p = build_path(u, matches.copy())
        paths.append(p)

    # Build rooted Spanning Tree
    tree_based_network = rooted_spanning_tree(network, paths)
    diff = set(network.nodes()) - set(tree_based_network.nodes())
    n = len(diff)

    if draw:
        if name is None:
            draw_tree(network, "original network")
            draw_tree(tree_based_network, "tree-based network")
        else:
            draw_tree(network, name)
            draw_tree(tree_based_network, name + "-MAX-CST")
    return tree_based_network, n