Source code for regraph.graphs

"""Abstract graphs in ReGraph.

This module contains abstract classes for graph objects in ReGraph. Such
graph objects represent simple graphs with dictionary-like attributes
on nodes and edges.
"""
import json
import os

import warnings

from abc import ABC, abstractmethod

from regraph.exceptions import (ReGraphError,
                                GraphError,
                                GraphAttrsWarning,
                                )
from regraph.utils import (load_nodes_from_json,
                           load_edges_from_json,
                           generate_new_id,
                           normalize_attrs,
                           safe_deepcopy_dict,
                           set_attrs,
                           add_attrs,
                           remove_attrs,
                           merge_attributes,
                           keys_by_value,
                           )


[docs]class Graph(ABC): """Abstract class for graph objects in ReGraph."""
[docs] @abstractmethod def nodes(self, data=False): """Return the list of nodes.""" pass
[docs] @abstractmethod def edges(self, data=False): """Return the list of edges.""" pass
[docs] @abstractmethod def get_node(self, n): """Get node attributes. Parameters ---------- s : hashable Source node id. t : hashable, Target node id. """ pass
[docs] @abstractmethod def get_edge(self, s, t): """Get edge attributes. Parameters ---------- s : hashable Source node id. t : hashable Target node id. """ pass
[docs] @abstractmethod def add_node(self, node_id, attrs=None): """Abstract method for adding a node. Parameters ---------- node_id : hashable Prefix that is prepended to the new unique name. attrs : dict, optional Node attributes. """ pass
[docs] @abstractmethod def remove_node(self, node_id): """Remove node. Parameters ---------- node_id : hashable Node to remove. """ pass
[docs] @abstractmethod def add_edge(self, s, t, attrs=None, **attr): """Add an edge to a graph. Parameters ---------- s : hashable Source node id. t : hashable Target node id. attrs : dict Edge attributes. """ pass
[docs] @abstractmethod def remove_edge(self, source_id, target_id): """Remove edge from the graph. Parameters ---------- s : hashable Source node id. t : hashable Target node id. """ pass
[docs] @abstractmethod def update_node_attrs(self, node_id, attrs, normalize=True): """Update attributes of a node. Parameters ---------- node_id : hashable Node to update. attrs : dict New attributes to assign to the node """ pass
[docs] @abstractmethod def update_edge_attrs(self, s, t, attrs, normalize=True): """Update attributes of a node. Parameters ---------- s : hashable Source node of the edge to update. t : hashable Target node of the edge to update. attrs : dict New attributes to assign to the node """ pass
[docs] @abstractmethod def successors(self, node_id): """Return the set of successors.""" pass
[docs] @abstractmethod def predecessors(self, node_id): """Return the set of predecessors.""" pass
[docs] @abstractmethod def find_matching(self, pattern, nodes=None): """Find matching of a pattern in a graph.""" pass
[docs] def print_graph(self): """Pretty-print the graph.""" print("\nNodes:\n") for n in self.nodes(): print(n, " : ", self.get_node(n)) print("\nEdges:\n") for (n1, n2) in self.edges(): print(n1, '->', n2, ' : ', self.get_edge(n1, n2)) return
def __str__(self): """String representation of the graph.""" return "Graph({} nodes, {} edges)".format( len(self.nodes()), len(self.edges())) def __eq__(self, graph): """Eqaulity operator. Parameters ---------- graph : regraph.Graph Another graph object Returns ------- bool True if two graphs are equal, False otherwise. """ if set(self.nodes()) != set(graph.nodes()): return False if set(self.edges()) != set(graph.edges()): return False for node in self.nodes(): if self.get_node(node) != graph.get_node(node): return False for s, t in self.edges(): if self.get_edge(s, t) != graph.get_edge(s, t): return False return True def __ne__(self, graph): """Non-equality operator.""" return not (self == graph)
[docs] def get_node_attrs(self, n): """Get node attributes. Parameters ---------- n : hashable Node id. """ return self.get_node(n)
[docs] def get_edge_attrs(self, s, t): """Get edge attributes. Parameters ---------- graph : networkx.(Di)Graph s : hashable, source node id. t : hashable, target node id. """ return self.get_edge(s, t)
[docs] def in_edges(self, node_id): """Return the set of in-coming edges.""" return [(p, node_id) for p in self.predecessors(node_id)]
[docs] def out_edges(self, node_id): """Return the set of out-going edges.""" return [(node_id, s) for s in self.successors(node_id)]
[docs] def add_nodes_from(self, node_list): """Add nodes from a node list. Parameters ---------- node_list : iterable Iterable containing a collection of nodes, optionally, with their attributes """ for n in node_list: if type(n) != str: try: node_id, node_attrs = n self.add_node(node_id, node_attrs) except (TypeError, ValueError): self.add_node(n) else: self.add_node(n)
[docs] def add_edges_from(self, edge_list): """Add edges from an edge list. Parameters ---------- edge_list : iterable Iterable containing a collection of edges, optionally, with their attributes """ for e in edge_list: if len(e) == 2: self.add_edge(e[0], e[1]) elif len(e) == 3: self.add_edge(e[0], e[1], e[2]) else: raise ReGraphError( "Was expecting 2 or 3 elements per tuple, got %s." % str(len(e)) )
[docs] def exists_edge(self, s, t): """Check if an edge exists. Parameters ---------- s : hashable Source node id. t : hashable Target node id. """ # print("\n\n\n\n\n\n\n\n\n\n", s, t, self.edges()) return((s, t) in self.edges())
[docs] def set_node_attrs(self, node_id, attrs, normalize=True, update=True): """Set node attrs. Parameters ---------- node_id : hashable Id of the node to update attrs : dict Dictionary with new attributes to set normalize : bool, optional Flag, when set to True attributes are normalized to be set-valued. True by default update : bool, optional Flag, when set to True attributes whose keys are not present in attrs are removed, True by default Raises ------ GraphError If a node `node_id` does not exist. """ if node_id not in self.nodes(): raise GraphError("Node '{}' does not exist!".format(node_id)) node_attrs = safe_deepcopy_dict(self.get_node(node_id)) set_attrs(node_attrs, attrs, normalize, update) self.update_node_attrs(node_id, node_attrs, normalize)
[docs] def add_node_attrs(self, node, attrs): """Add new attributes to a node. Parameters ---------- node : hashable Id of a node to add attributes to. attrs : dict Attributes to add. Raises ------ GraphError If a node `node_id` does not exist. """ if node not in self.nodes(): raise GraphError("Node '{}' does not exist!".format(node)) node_attrs = safe_deepcopy_dict(self.get_node(node)) add_attrs(node_attrs, attrs, normalize=True) self.update_node_attrs(node, node_attrs)
[docs] def remove_node_attrs(self, node_id, attrs): """Remove attrs of a node specified by attrs_dict. Parameters ---------- node_id : hashable Node whose attributes to remove. attrs : dict Dictionary with attributes to remove. Raises ------ GraphError If a node with the specified id does not exist. """ if node_id not in self.nodes(): raise GraphError("Node '%s' does not exist!" % str(node_id)) elif attrs is None: warnings.warn( "You want to remove attrs from '{}' with an empty attrs_dict!".format( node_id), GraphAttrsWarning ) node_attrs = safe_deepcopy_dict(self.get_node(node_id)) remove_attrs(node_attrs, attrs, normalize=True) self.update_node_attrs(node_id, node_attrs)
[docs] def set_edge_attrs(self, s, t, attrs, normalize=True, update=True): """Set edge attrs. Parameters ---------- attrs : dict Dictionary with new attributes to set normalize : bool, optional Flag, when set to True attributes are normalized to be set-valued. True by default update : bool, optional Flag, when set to True attributes whose keys are not present in attrs are removed, True by default Raises ------ GraphError If an edge between `s` and `t` does not exist. """ if not self.exists_edge(s, t): raise GraphError( "Edge {}->{} does not exist".format(s, t)) edge_attrs = safe_deepcopy_dict(self.get_edge(s, t)) set_attrs(edge_attrs, attrs, normalize, update) self.update_edge_attrs(s, t, edge_attrs, normalize=normalize)
[docs] def set_edge(self, s, t, attrs, normalize=True, update=True): """Set edge attrs. Parameters ---------- s : hashable Source node id. t : hashable Target node id. attrs : dictionary Dictionary with attributes to set. Raises ------ GraphError If an edge between `s` and `t` does not exist. """ self.set_edge_attrs(s, t, attrs, normalize, update)
[docs] def add_edge_attrs(self, s, t, attrs): """Add attributes of an edge in a graph. Parameters ---------- s : hashable Source node id. t : hashable Target node id. attrs : dict Dictionary with attributes to remove. Raises ------ GraphError If an edge between `s` and `t` does not exist. """ if not self.exists_edge(s, t): raise GraphError( "Edge {}->{} does not exist".format(s, t)) edge_attrs = safe_deepcopy_dict(self.get_edge(s, t)) add_attrs(edge_attrs, attrs, normalize=True) self.update_edge_attrs(s, t, edge_attrs)
[docs] def remove_edge_attrs(self, s, t, attrs): """Remove attrs of an edge specified by attrs. Parameters ---------- s : hashable Source node id. t : hashable Target node id. attrs : dict Dictionary with attributes to remove. Raises ------ GraphError If an edge between `s` and `t` does not exist. """ if not self.exists_edge(s, t): raise GraphError( "Edge {}->{} does not exist".format(s, t)) edge_attrs = safe_deepcopy_dict(self.get_edge(s, t)) remove_attrs(edge_attrs, attrs, normalize=True) self.update_edge_attrs(s, t, edge_attrs)
[docs] def clone_node(self, node_id, name=None): """Clone node. Create a new node, a copy of a node with `node_id`, and reconnect it with all the adjacent nodes of `node_id`. Parameters ---------- node_id : hashable, Id of a node to clone. name : hashable, optional Id for the clone, if is not specified, new id will be generated. Returns ------- new_node : hashable Id of the new node corresponding to the clone Raises ------ GraphError If node wiht `node_id` does not exists or a node with `name` (clone's name) already exists. """ if node_id not in self.nodes(): raise GraphError("Node '{}' does not exist!".format(node_id)) # generate new name for a clone if name is None: i = 1 new_node = str(node_id) + str(i) while new_node in self.nodes(): i += 1 new_node = str(node_id) + str(i) else: if name in self.nodes(): raise GraphError("Node '{}' already exists!".format(name)) else: new_node = name self.add_node(new_node, self.get_node(node_id)) # Connect all the edges self.add_edges_from( set([(n, new_node) for n, _ in self.in_edges(node_id) if (n, new_node) not in self.edges()])) self.add_edges_from( set([(new_node, n) for _, n in self.out_edges(node_id) if (new_node, n) not in self.edges()])) # Copy the attributes of the edges for s, t in self.in_edges(node_id): self.set_edge( s, new_node, safe_deepcopy_dict(self.get_edge(s, t))) for s, t in self.out_edges(node_id): self.set_edge( new_node, t, safe_deepcopy_dict(self.get_edge(s, t))) return new_node
[docs] def relabel_node(self, node_id, new_id): """Relabel a node in the graph. Parameters ---------- node_id : hashable Id of the node to relabel. new_id : hashable New label of a node. """ if new_id in self.nodes(): raise ReGraphError( "Cannot relabel '{}' to '{}', '{}' ".format( node_id, new_id, new_id) + "already exists in the graph") self.clone_node(node_id, new_id) self.remove_node(node_id)
[docs] def merge_nodes(self, nodes, node_id=None, method="union", edge_method="union"): """Merge a list of nodes. Parameters ---------- nodes : iterable Collection of node id's to merge. node_id : hashable, optional Id of a new node corresponding to the result of merge. method : optional Method of node attributes merge: if `"union"` the resulting node will contain the union of all attributes of the merged nodes, if `"intersection"`, the resulting node will contain their intersection. Default value is `"union"`. edge_method : optional Method of edge attributes merge: if `"union"` the edges that were merged will contain the union of all attributes, if `"intersection"` -- their ntersection. Default value is `"union"`. """ if len(nodes) > 1: if method is None: method = "union" if edge_method is None: method = "union" # Generate name for new node if node_id is None: node_id = "_".join(sorted([str(n) for n in nodes])) if node_id in self.nodes(): node_id = self.generate_new_node_id(node_id) elif node_id in self.nodes() and (node_id not in nodes): raise GraphError( "New name for merged node is not valid: " "node with name '%s' already exists!" % node_id ) # Merge data attached to node according to the method specified # restore proper connectivity if method == "union": attr_accumulator = {} elif method == "intersection": attr_accumulator = safe_deepcopy_dict( self.get_node(nodes[0])) else: raise ReGraphError("Merging method '{}' is not defined!".format( method)) self_loop = False self_loop_attrs = {} source_nodes = set() target_nodes = set() source_dict = {} target_dict = {} for node in nodes: attr_accumulator = merge_attributes( attr_accumulator, self.get_node(node), method) in_edges = self.in_edges(node) out_edges = self.out_edges(node) # manage self loops for s, t in in_edges: if s in nodes: self_loop = True if len(self_loop_attrs) == 0: self_loop_attrs = self.get_edge(s, t) else: self_loop_attrs = merge_attributes( self_loop_attrs, self.get_edge(s, t), edge_method) for s, t in out_edges: if t in nodes: self_loop = True if len(self_loop_attrs) == 0: self_loop_attrs = self.get_edge(s, t) else: self_loop_attrs = merge_attributes( self_loop_attrs, self.get_edge(s, t), edge_method) source_nodes.update( [n if n not in nodes else node_id for n, _ in in_edges]) target_nodes.update( [n if n not in nodes else node_id for _, n in out_edges]) for edge in in_edges: if not edge[0] in source_dict.keys(): attrs = self.get_edge(edge[0], edge[1]) source_dict.update({edge[0]: attrs}) else: attrs = merge_attributes( source_dict[edge[0]], self.get_edge(edge[0], edge[1]), edge_method) source_dict.update({edge[0]: attrs}) for edge in out_edges: if not edge[1] in target_dict.keys(): attrs = self.get_edge(edge[0], edge[1]) target_dict.update({edge[1]: attrs}) else: attrs = merge_attributes( target_dict[edge[1]], self.get_edge(edge[0], edge[1]), edge_method) target_dict.update({edge[1]: attrs}) self.remove_node(node) self.add_node(node_id, attr_accumulator) if self_loop: self.add_edges_from([(node_id, node_id)]) self.set_edge(node_id, node_id, self_loop_attrs) for n in source_nodes: if not self.exists_edge(n, node_id): self.add_edge(n, node_id) for n in target_nodes: if not self.exists_edge(node_id, n): self.add_edge(node_id, n) # Attach accumulated attributes to edges for node, attrs in source_dict.items(): if node not in nodes: self.set_edge(node, node_id, attrs) for node, attrs in target_dict.items(): if node not in nodes: self.set_edge(node_id, node, attrs) return node_id else: raise ReGraphError( "More than two nodes should be specified for merging!")
[docs] def copy_node(self, node_id, copy_id=None): """Copy node. Create a copy of a node in a graph. A new id for the copy is generated by regraph.primitives.unique_node_id. Parameters ---------- node_id : hashable Node to copy. Returns ------- new_name Id of the copy node. """ if copy_id is None: copy_id = self.generate_new_node_id(node_id) if copy_id in self.nodes(): raise ReGraphError( "Cannot create a copy of '{}' with id '{}', ".format( node_id, copy_id) + "node '{}' already exists in the graph".format(copy_id)) attrs = self.get_node(node_id) self.add_node(copy_id, attrs) return copy_id
[docs] def relabel_nodes(self, mapping): """Relabel graph nodes inplace given a mapping. Similar to networkx.relabel.relabel_nodes: https://networkx.github.io/documentation/development/_modules/networkx/relabel.html Parameters ---------- mapping: dict A dictionary with keys being old node ids and their values being new id's of the respective nodes. Raises ------ ReGraphError If new id's do not define a set of distinct node id's. """ unique_names = set(mapping.values()) if len(unique_names) != len(self.nodes()): raise ReGraphError( "Attempt to relabel nodes failed: the IDs are not unique!") temp_names = {} # Relabeling of the nodes: if at some point new ID conflicts # with already existing ID - assign temp ID for key, value in mapping.items(): if key != value: if value not in self.nodes(): new_name = value else: new_name = self.generate_new_node_id(value) temp_names[new_name] = value self.relabel_node(key, new_name) # Relabeling the nodes with the temp ID to their new IDs for key, value in temp_names.items(): if key != value: self.relabel_node(key, value) return
[docs] def generate_new_node_id(self, basename): """Generate new unique node identifier.""" return generate_new_id(self.nodes(), basename)
[docs] def filter_edges_by_attributes(self, attr_key, attr_cond): """Filter graph edges by attributes. Removes all the edges of the graph (inplace) that do not satisfy `attr_cond`. Parameters ---------- attrs_key : hashable Attribute key attrs_cond : callable Condition for an attribute to satisfy: callable that returns `True` if condition is satisfied, `False` otherwise. """ for (s, t) in self.edges(): edge_attrs = self.get_edge(s, t) if (attr_key not in edge_attrs.keys() or not attr_cond(edge_attrs[attr_key])): self.remove_edge(s, t)
[docs] def to_json(self): """Create a JSON representation of a graph.""" j_data = {"edges": [], "nodes": []} # dump nodes for node in self.nodes(): node_data = {} node_data["id"] = node node_attrs = self.get_node(node) if node_attrs is not None: attrs = {} for key, value in node_attrs.items(): attrs[key] = value.to_json() node_data["attrs"] = attrs j_data["nodes"].append(node_data) # dump edges for s, t in self.edges(): edge_data = {} edge_data["from"] = s edge_data["to"] = t edge_attrs = self.get_edge(s, t) if edge_attrs is not None: attrs = {} for key, value in edge_attrs.items(): attrs[key] = value.to_json() edge_data["attrs"] = attrs j_data["edges"].append(edge_data) return j_data
[docs] def to_d3_json(self, attrs=True, node_attrs_to_attach=None, edge_attrs_to_attach=None, nodes=None): """Create a JSON representation of a graph.""" j_data = {"links": [], "nodes": []} if nodes is None: nodes = self.nodes() # dump nodes for node in nodes: node_data = {} node_data["id"] = node if attrs: node_attrs = self.get_node(node) normalize_attrs(node_attrs) attrs_json = dict() for key, value in node_attrs.items(): attrs_json[key] = value.to_json() node_data["attrs"] = attrs_json else: node_attrs = self.get_node(node) if node_attrs_to_attach is not None: for key in node_attrs_to_attach: if key in node_attrs.keys(): node_data[key] = list(node_attrs[key]) j_data["nodes"].append(node_data) # dump edges for s, t in self.edges(): if s in nodes and t in nodes: edge_data = {} edge_data["source"] = s edge_data["target"] = t if attrs: edge_attrs = self.get_edge(s, t) normalize_attrs(edge_attrs) attrs_json = dict() for key, value in edge_attrs.items(): attrs_json[key] = value.to_json() edge_data["attrs"] = attrs_json else: if edge_attrs_to_attach is not None: for key in edge_attrs_to_attach: edge_attrs = self.get_edge(s, t) if key in edge_attrs.keys(): edge_data[key] = list(edge_attrs[key]) j_data["links"].append(edge_data) return j_data
[docs] def export(self, filename): """Export graph to JSON file. Parameters ---------- filename : str Name of the file to save the json serialization of the graph """ with open(filename, 'w') as f: j_data = self.to_json() json.dump(j_data, f) return
[docs] @classmethod def from_json(cls, json_data): """Create a NetworkX graph from a json-like dictionary. Parameters ---------- json_data : dict JSON-like dictionary with graph representation """ graph = cls() graph.add_nodes_from(load_nodes_from_json(json_data)) graph.add_edges_from(load_edges_from_json(json_data)) return graph
[docs] @classmethod def load(cls, filename): """Load a graph from a JSON file. Create a `networkx.(Di)Graph` object from a JSON representation stored in a file. Parameters ---------- filename : str Name of the file to load the json serialization of the graph Returns ------- Graph object Raises ------ ReGraphError If was not able to load the file """ if os.path.isfile(filename): with open(filename, "r+") as f: j_data = json.loads(f.read()) return cls.from_json(j_data) else: raise ReGraphError( "Error loading graph: file '{}' does not exist!".format( filename) )
[docs] def rewrite(self, rule, instance=None): """Perform SqPO rewiting of the graph with a rule. Parameters ---------- rule : regraph.Rule SqPO rewriting rule instance : dict, optional Instance of the input rule. If not specified, the identity map of the rule's left-hand side is used """ if instance is None: instance = { n: n for n in self.lhs.nodes() } # Restrictive phase p_g = dict() cloned_lhs_nodes = set() # Clone nodes for lhs, p_nodes in rule.cloned_nodes().items(): for i, p in enumerate(p_nodes): if i == 0: p_g[p] = instance[lhs] cloned_lhs_nodes.add(lhs) else: clone_id = self.clone_node(instance[lhs]) p_g[p] = clone_id # Delete nodes and add preserved nodes to p_g dictionary removed_nodes = rule.removed_nodes() for n in rule.lhs.nodes(): if n in removed_nodes: self.remove_node(instance[n]) elif n not in cloned_lhs_nodes: p_g[keys_by_value(rule.p_lhs, n)[0]] =\ instance[n] # Delete edges for u, v in rule.removed_edges(): self.remove_edge(p_g[u], p_g[v]) # Remove node attributes for p_node, attrs in rule.removed_node_attrs().items(): self.remove_node_attrs( p_g[p_node], attrs) # Remove edge attributes for (u, v), attrs in rule.removed_edge_attrs().items(): self.remove_edge_attrs(p_g[u], p_g[v], attrs) # Expansive phase rhs_g = dict() merged_nodes = set() # Merge nodes for rhs, p_nodes in rule.merged_nodes().items(): merge_id = self.merge_nodes( [p_g[p] for p in p_nodes]) merged_nodes.add(rhs) rhs_g[rhs] = merge_id # Add nodes and add preserved nodes to rhs_g dictionary added_nodes = rule.added_nodes() for n in rule.rhs.nodes(): if n in added_nodes: if n in self.nodes(): new_id = self.generate_new_node_id(n) else: new_id = n new_id = self.add_node(new_id) rhs_g[n] = new_id elif n not in merged_nodes: rhs_g[n] = p_g[keys_by_value(rule.p_rhs, n)[0]] # Add edges for u, v in rule.added_edges(): if (rhs_g[u], rhs_g[v]) not in self.edges(): self.add_edge(rhs_g[u], rhs_g[v]) # Add node attributes for rhs_node, attrs in rule.added_node_attrs().items(): self.add_node_attrs( rhs_g[rhs_node], attrs) # Add edge attributes for (u, v), attrs in rule.added_edge_attrs().items(): self.add_edge_attrs( rhs_g[u], rhs_g[v], attrs) return rhs_g
[docs] def number_of_edges(self, u, v): """Return number of directed edges from u to v.""" return 1