Source code for regraph.backends.networkx.hierarchies

"""NetworkX-based in-memory graph hierarchy objects.

This module contains a data structure implementing
graph hierarchies based on NetworkX graphs.

* `NXHierarchy` -- class for in-memort graph hierarchies.
import copy
import networkx as nx
import warnings

from regraph.exceptions import (HierarchyError,
from regraph.hierarchies import Hierarchy
from regraph.backends.networkx.graphs import NXGraph
from regraph.category_utils import (compose,
from regraph.utils import (normalize_attrs,

[docs]class NXHierarchy(Hierarchy, NXGraph): """Class for in-memory hierarchies. Attributes ---------- """ rel_dict_factory = dict # Implementation of abstract methods
[docs] def graphs(self, data=False): """Return a list of graphs in the hierarchy.""" if data: return [ (n, n_data["attrs"]) for n, n_data in self.nodes(True) if "graph" in n_data] else: return [n for n, n_data in self.nodes(True) if "graph" in n_data]
[docs] def typings(self, data=False): """Return a list of graph typing edges in the hierarchy.""" if data: return [ (s, t, e_data["attrs"]) for s, t, e_data in self.edges(True) if "mapping" in e_data] else: return [ (s, t) for s, t, e_data in self.edges(True) if "mapping" in e_data]
[docs] def relations(self, data=False): """Return a list of relations.""" if data: return [ (l, r, attrs) for (l, r), attrs in self.relation_edges.items() ] else: return list(set(self.relation_edges.keys()))
[docs] def successors(self, node_id): """Return the set of successors.""" return self._graph.successors(node_id)
[docs] def predecessors(self, node_id): """Return the set of predecessors.""" return self._graph.predecessors(node_id)
[docs] def get_graph(self, graph_id): """Get a graph object associated to the node 'graph_id'.""" if graph_id not in self.nodes(): raise HierarchyError( "Hierarchy node '{}' does not exist!".format(graph_id)) if not self.is_graph(graph_id): raise HierarchyError( "Hierarchy node '{}' is a rule!".format(graph_id)) return self.node[graph_id]["graph"]
[docs] def get_typing(self, source, target): """Get a typing dict associated to the edge 'source->target'.""" if (source, target) in self.edges(): if self.is_graph(source): return self.get_edge(source, target)["mapping"] else: edge = self.get_edge(source, target) return (edge["lhs_mapping"], edge["rhs_mapping"]) else: try: path = nx.shortest_path(self._graph, source, target) except: raise HierarchyError( "No path from '{}' to '{}' in the hierarchy".format( source, target)) return self.compose_path_typing(path)
[docs] def get_relation(self, left, right): """Get a relation dict associated to the rel 'left-right'.""" return self.relation_edges[(left, right)]["rel"]
[docs] def get_graph_attrs(self, graph_id): """Get attributes of a graph in the hierarchy. graph_id : hashable Id of the graph """ return self.get_node(graph_id)["attrs"]
[docs] def set_graph_attrs(self, node_id, attrs): """Set attributes of a graph in the hierarchy. graph_id : hashable Id of the graph """ return self.set_node_attrs(node_id, {"attrs": attrs})
[docs] def get_typing_attrs(self, source, target): """Get attributes of a typing in the hierarchy. source : hashable Id of the source graph target : hashable Id of the target graph """ return self.get_edge(source, target)["attrs"]
[docs] def set_typing_attrs(self, source, target, attrs): """Set attributes of a typing in the hierarchy. source : hashable Id of the source graph target : hashable Id of the target graph """ return self.set_edge_attrs(source, target, {"attrs": attrs})
[docs] def get_relation_attrs(self, left, right): """Get attributes of a reltion in the hierarchy. left : hashable Id of the left graph right : hashable Id of the right graph """ return self.relation_edges[(left, right)]["attrs"]
[docs] def set_relation_attrs(self, left, right, attrs): """Set attributes of a relation in the hierarchy. left : hashable Id of the left graph right : hashable Id of the right graph """ normalize_attrs(attrs) for k, v in attrs.items(): self.relation_edges[(left, right)]["attrs"][k] = v self.relation_edges[(right, left)]["attrs"][k] = v
[docs] def set_node_relation(self, left_graph, right_graph, left_node, right_node): """Set relation to a particular node.""" if left_node in self.relation_edges[ left_graph, right_graph]["rel"].keys(): self.relation_edges[left_graph, right_graph]["rel"][left_node].add( right_node) else: self.relation_edges[left_graph, right_graph]["rel"][left_node] = { right_node} if right_node in self.relation_edges[ right_graph, left_graph]["rel"].keys(): self.relation_edges[ right_graph, left_graph]["rel"][right_node].add(left_node) else: self.relation_edges[right_graph, left_graph]["rel"][right_node] = { left_node}
[docs] def add_graph(self, graph_id, graph, graph_attrs=None): """Add a new graph to the hierarchy. Parameters ---------- graph_id : hashable Id of a new node in the hierarchy graph : regraph.Graph Graph object corresponding to the new node of the hierarchy graph_attrs : dict, optional Dictionary containing attributes of the new node """ if graph_id in self.nodes(): raise HierarchyError( "Node '{}' already exists in the hierarchy!".format(graph_id)) self.add_node(graph_id) if graph_attrs is not None: normalize_attrs(graph_attrs) else: graph_attrs = dict() self.update_node_attrs( graph_id, { "graph": graph, "attrs": graph_attrs }, normalize=False) return
[docs] def add_empty_graph(self, graph_id, attrs=None): """"Add a new empty graph to the hierarchy. Parameters ---------- graph_id : hashable Id of a new node in the hierarchy graph_attrs : dict, optional Dictionary containing attributes of the new node """ g = NXGraph() self.add_graph(graph_id, g, attrs)
[docs] def add_graph_from_data(self, graph_id, node_list, edge_list, attrs=None): """Add a new graph to the hierarchy from the input node/edge lists. Parameters ---------- graph_id : hashable Id of a new node in the hierarchy node_list : iterable List of nodes (with attributes) edge_list : iterable List of edges (with attributes) graph_attrs : dict, optional Dictionary containing attributes of the new node """ self.add_empty_graph(graph_id, attrs) g = self.get_graph(graph_id) if node_list is not None: g.add_nodes_from(node_list) if edge_list is not None: g.add_edges_from(edge_list)
[docs] def add_typing(self, source, target, mapping=None, attrs=None): """Add homomorphism to the hierarchy. Parameters ---------- source : hashable Id of the source graph node of typing target : hashable Id of the target graph node of typing mapping : dict Dictionary representing a mapping of nodes from the source graph to target's nodes attrs : dict, optional Dictionary containing attributes of the new typing edge. Empty by default Raises ------ HierarchyError This error is raised in the following cases: * source or target ids are not found in the hierarchy * a typing edge between source and target already exists * addition of an edge between source and target creates a cycle or produces paths that do not commute with some already existing paths InvalidHomomorphism If a homomorphisms from a graph at the source to a graph at the target given by `mapping` is not a valid homomorphism. """ if source not in self.nodes(): raise HierarchyError( "Node '{}' is not defined in the hierarchy!".format(source)) if target not in self.nodes(): raise HierarchyError( "Node '{}' is not defined in the hierarchy!".format(target)) if mapping is None: mapping = dict() if (source, target) in self.edges(): raise HierarchyError( "Edge '{}->{}' already exists in the hierarchy: " "no muliple edges allowed!".format(source, target) ) if not self.is_graph(source): raise HierarchyError( "Source of a typing should be a graph," " '{}' is provided!".format( type(self.node[source])) ) if not self.is_graph(target): raise HierarchyError( "Target of a typing should be a graph, " "'{}' is provided!".format( type(self.node[target])) ) # check no cycles are produced self.add_edge(source, target) if not nx.is_directed_acyclic_graph(self._graph): self.remove_edge(source, target) raise HierarchyError( "Edge '{}->{}' creates a cycle in the hierarchy!".format( source, target) ) self.remove_edge(source, target) # check if the homomorphism is valid check_homomorphism( self.get_graph(source), self.get_graph(target), mapping, ) # check if newly created path commutes with existing shortest paths self._check_consistency(source, target, mapping) self.add_edge(source, target) if attrs is not None: normalize_attrs(attrs) else: attrs = dict() self.set_edge( source, target, { "mapping": mapping, "attrs": attrs }, normalize=False) return
[docs] def add_relation(self, left, right, relation, attrs=None): """Add relation to the hierarchy. This method adds a relation between two graphs in the hierarchy corresponding to the nodes with ids `left` and `right`, the relation itself is defined by a dictionary `relation`, where a key is a node in the `left` graph and its corresponding value is a set of nodes from the `right` graph to which the node is related. Relations in the hierarchy are symmetric (see example below). Parameters ---------- left Id of the hierarchy's node represening the `left` graph right Id of the hierarchy's node represening the `right` graph relation : dict Dictionary representing a relation of nodes from `left` to the nodes from `right`, a key of the dictionary is assumed to be a node from `left` and its value a set of ids of related nodes from `right` attrs : dict Dictionary containing attributes of the new relation Raises ------ HierarchyError This error is raised in the following cases: * node with id `left`/`right` is not defined in the hierarchy; * node with id `left`/`right` is not a graph; * a relation between `left` and `right` already exists; * some node ids specified in `relation` are not found in the `left`/`right` graph. """ if left not in self.nodes(): raise HierarchyError( "Node '{}' is not defined in the hierarchy!".format(left)) if right not in self.nodes(): raise HierarchyError( "Node '{}' is not defined in the hierarchy!".format(right)) if not self.is_graph(left): raise HierarchyError( "Relation can be defined only on graph objects" ) if not self.is_graph(right): raise HierarchyError( "Relation can be defined only on graph objects" ) if (left, right) in self.relations(): raise HierarchyError( "Relation '{}-{}' already exists in the hierarchy " "multiple edges are not allowed!".format( left, right) ) # normalize relation dict relation = normalize_relation(relation) # check relation is well-defined on left and right side for key, values in relation.items(): if key not in self.get_graph(left).nodes(): raise HierarchyError( "Relation is not valid: node '{}' does not " "exist in a graph '{}'".format(key, left) ) for v in values: if v not in self.get_graph(right).nodes(): raise HierarchyError( "Relation is not valid: node '{}' does not " "exist in a graph '{}'".format(v, right) ) if attrs is not None: normalize_attrs(attrs) else: attrs = dict() pairs = set() for k, values in relation.items(): for v in values: pairs.add((k, v)) right_relation = right_relation_dict(pairs) rel_ab_dict = { "rel": relation, "attrs": attrs } rel_ba_dict = { "rel": right_relation, "attrs": attrs } self.relation_edges.update({(left, right): rel_ab_dict}) self.relation_edges.update({(right, left): rel_ba_dict}) return
[docs] def remove_graph(self, graph_id, reconnect=False): """Remove graph from the hierarchy. Removes a graph from the hierarchy, if the `reconnect` parameter is set to True, adds typing from the predecessors of the removed node to all its successors, by composing the homomorphisms (for every predecessor `p` and for every successor 's' composes two homomorphisms `p`->`node_id` and `node_id`->`s`, then removes `node_id` and all its incident edges, by which makes node's removal a procedure of 'forgetting' one level of 'abstraction'). Parameters ---------- graph_id Id of a graph to remove reconnect : bool Reconnect the descendants of the removed node to its predecessors Raises ------ HierarchyError If graph with `node_id` is not defined in the hierarchy """ if not self.is_graph(graph_id): raise HierarchyError( "Hierarchy node '{}' is a rule! ".format(graph_id) + "Use, 'remove_rule' method instead") self.remove_node(graph_id, reconnect)
[docs] def remove_typing(self, s, t): """Remove a typing from the hierarchy.""" self.remove_edge(s, t)
[docs] def remove_relation(self, left, right): """Remove a relation from the hierarchy.""" if (left, right) not in self.relations() and\ (right, left) not in self.relations(): raise HierarchyError( "Relation '{}-{}' is not defined in the hierarchy".format( left, right) ) del self.relation_edges[left, right] del self.relation_edges[right, left]
[docs] def bfs_tree(self, graph, reverse=False): """BFS tree from the graph to all other reachable graphs.""" return list(nx.bfs_tree(self._graph, graph, reverse=reverse))[1:]
[docs] def shortest_path(self, source, target): """Shortest path from 'source' to 'target'.""" return nx.shortest_path(self._graph, source, target)
[docs] def copy_graph(self, graph_id, new_graph_id, attach_graphs=None): """Create a copy of a graph in a hierarchy.""" if attach_graphs is None: attach_graphs = [] if new_graph_id in self.graphs(): raise HierarchyError( "Graph with id '{}' already exists in the hierarchy".format( new_graph_id)) graph_copy = NXGraph.copy(self.get_graph(graph_id)) self.add_graph( new_graph_id, graph_copy, self.get_graph_attrs(graph_id)) # copy all typings to/from attach_graphs for g in attach_graphs: if g in self.successors(graph_id): self.add_typing(new_graph_id, g, self.get_typing(graph_id, g)) if g in self.predecessors(graph_id): self.add_typing(g, new_graph_id, self.get_typing(g, graph_id)) if g in self.adjacent_relations(graph_id): self.add_relation(g, new_graph_id, self.get_relation( g, graph_id))
[docs] def relabel_graph_node(self, graph_id, node, new_name): """Rename a node in a graph of the hierarchy.""" if new_name in self.get_graph(graph_id).nodes(): raise ReGraphError( "Node '{}' already exists in the graph '{}'".format( new_name, graph_id) ) if node not in self.get_graph(graph_id).nodes(): raise ReGraphError( "Node '{}' does not exist in the graph '{}".format( node, graph_id) ) self.get_graph(graph_id).relabel_node(node, new_name) for pred in self.predecessors(graph_id): mapping = self.get_typing(pred, graph_id) new_mapping = dict() for k, v in mapping.items(): if v == node: new_mapping[k] = new_name else: new_mapping[k] = v self._update_mapping(pred, graph_id, new_mapping) for suc in self.successors(graph_id): mapping = self.get_typing(graph_id, suc) new_mapping = dict() for k, v in mapping.items(): if k == node: new_mapping[new_name] = v else: new_mapping[k] = v self._update_mapping(graph_id, suc, new_mapping)
[docs] def relabel_graph(self, graph_id, new_graph_id): """Relabel a graph in the hierarchy. Parameters ---------- graph_id : hashable Id of the graph to relabel new_graph_id : hashable New graph id to assign to this graph """ self.relabel_node(graph_id, new_graph_id)
[docs] def relabel_graphs(self, mapping): """Relabel graphs in the hierarchy. Parameters ---------- mapping: dict A dictionary with keys being old graph ids and their values being new id's of the respective graphs. Raises ------ ReGraphError If new id's do not define a set of distinct graph id's. """ unique_names = set(mapping.values()) if len(unique_names) != len(self.nodes()): raise ReGraphError( "Attempt to relabel nodes failed: the IDs are not unique!") temp_names = {} # Relabeling of the nodes: if at some point new ID conflicts # with already existing ID - assign temp ID for key, value in mapping.items(): if key != value: if value not in self.nodes(): new_name = value else: new_name = self.generate_new_node_id(value) temp_names[new_name] = value self.relabel_node(key, new_name) # Relabeling the nodes with the temp ID to their new IDs for key, value in temp_names.items(): if key != value: self.relabel_node(key, value) return
def _check_consistency(self, source, target, mapping=None): all_paths = dict(nx.all_pairs_shortest_path(self._graph)) paths_to_source = {} paths_from_target = {} for s in self.nodes(): if source in all_paths[s].keys(): paths_to_source[s] = all_paths[s][source] if s == target: for key in all_paths[target].keys(): paths_from_target[key] = all_paths[target][key] for s in paths_to_source.keys(): if self.is_rule(paths_to_source[s][0]): for t in paths_from_target.keys(): # find homomorphism from s to t via new path if s == source: raise HierarchyError( "Found a rule typing some node in the self!" ) new_lhs_h, new_rhs_h = self.compose_path_typing( paths_to_source[s]) new_lhs_h = compose(new_lhs_h, mapping) new_rhs_h = compose(new_rhs_h, mapping) if t != target: new_lhs_h = compose( new_lhs_h, self.compose_path_typing(paths_from_target[t]) ) new_rhs_h = compose( new_rhs_h, self.compose_path_typing(paths_from_target[t]), ) try: # find homomorphisms from s to t via other paths s_t_paths = nx.all_shortest_paths(self._graph, s, t) for path in s_t_paths: lhs_h, rhs_h = self.compose_path_typing(path) if lhs_h != new_lhs_h: raise HierarchyError( "Invalid lhs typing: homomorphism does " "not commute with an existing " + "path from '{}' to '{}'!".format(s, t) ) if rhs_h != new_rhs_h: raise HierarchyError( "Invalid rhs typing: homomorphism does " "not commute with an existing " + "path from '{}' to '{}'!".format(s, t) ) except(nx.NetworkXNoPath): pass else: for t in paths_from_target.keys(): # find homomorphism from s to t via new path if s != source: new_homomorphism = self.compose_path_typing( paths_to_source[s]) else: new_homomorphism = dict([(key, key) for key, _ in mapping.items()]) new_homomorphism = compose( new_homomorphism, mapping) if t != target: new_homomorphism = compose( new_homomorphism, self.compose_path_typing(paths_from_target[t]) ) # find homomorphisms from s to t via other paths s_t_paths = nx.all_shortest_paths(self._graph, s, t) try: # check only the first path for path in s_t_paths: path_homomorphism = self.compose_path_typing(path) if path_homomorphism != new_homomorphism: raise HierarchyError( "Homomorphism does not commute with an " + "existing path from '{}' to '{}'!".format( s, t) ) except(nx.NetworkXNoPath): pass def _propagate_clone(self, origin_id, node_id, p_origin_m, origin_m_origin, p_typing, g_m_g, g_m_origin_m): """Propagate clones from 'origin_id' to 'graph_id'. Perform a controlled propagation of clones to 'graph' Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed p_origin_m : dict Instance of rule's interface inside the updated origin origin_m_origin : dict Map from the updated origin to the initial origin p_typing : dict Controlling relation from the nodes of 'graph_id' to the nodes of the interfaces Returns ------- g_m_g : dict Map from the updated 'graph_id' to the 'graph_id' """ if self.is_graph(node_id): g_m_g, g_m_origin_m = super()._propagate_clone( origin_id, node_id, p_origin_m, origin_m_origin, p_typing, g_m_g, g_m_origin_m) else: cloned_origin_nodes = {} for n in set(origin_m_origin.values()): clones = keys_by_value(origin_m_origin, n) if len(clones) > 1: cloned_origin_nodes[n] = clones rule = self.get_rule(node_id) origin_typing = self.get_typing(node_id, origin_id) lhs_m_lhs = g_m_g[0] p_m_p = g_m_g[1] rhs_m_rhs = g_m_g[2] lhs_m_origin_m = g_m_origin_m[0] rhs_m_origin_m = g_m_origin_m[1] if p_typing: warnings.warn( "Non-empty typing of the rule '{}' by ".format( node_id), "the interface: non-canonical propagation to rules is not " "implemented, typing is ignored", ReGraphWarning) p_origin_typing = { n: origin_typing[0][rule.p_lhs[n]] for n in rule.p.nodes() } cloned_lhs_nodes = self._produce_clones( cloned_origin_nodes, p_origin_m, rule.lhs, origin_typing[0], {}, lhs_m_lhs, lhs_m_origin_m) cloned_p_nodes = self._produce_clones( cloned_origin_nodes, p_origin_m, rule.p, p_origin_typing, {}, p_m_p, { n: lhs_m_origin_m[rule.p_lhs[n]] for n in rule.p.nodes() }) cloned_rhs_nodes = self._produce_clones( cloned_origin_nodes, p_origin_m, rule.rhs, origin_typing[1], {}, rhs_m_rhs, rhs_m_origin_m) # restore rule homomorphisms for n, p_node in cloned_p_nodes.items(): original_p_node = p_m_p[n] original_lhs_node = rule.p_lhs[original_p_node] original_rhs_node = rule.p_rhs[original_p_node] lhs_clones = keys_by_value(cloned_lhs_nodes, p_node) rhs_clones = keys_by_value(cloned_rhs_nodes, p_node) for lhs_clone in lhs_clones: if lhs_m_lhs[lhs_clone] == original_lhs_node: rule.p_lhs[n] = lhs_clone break for rhs_clone in rhs_clones: if rhs_m_rhs[rhs_clone] == original_rhs_node: rule.p_rhs[n] = rhs_clone break self._restrictive_update_incident_homs(node_id, g_m_g) self._restrictive_update_incident_rels(node_id, g_m_g) return g_m_g, g_m_origin_m def _propagate_node_removal(self, origin_id, node_id, rule, instance, g_m_g, g_m_origin_m): """Propagate node removal from 'origin_id' to 'graph_id'. Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed origin_m_origin : dict Map from the updated origin to the initial origin Returns ------- g_m_g : dict Map from the updated 'graph_id' to the 'graph_id' """ if self.is_graph(node_id): g_m_g, g_m_origin_m = super()._propagate_node_removal( origin_id, node_id, rule, instance, g_m_g, g_m_origin_m) else: object_rule = self.get_rule(node_id) origin_typing = self.get_typing(node_id, origin_id) for lhs_node in rule.removed_nodes(): origin_n = instance[lhs_node] lhs_nodes = keys_by_value(origin_typing[0], origin_n) p_nodes = [ n for n in object_rule.p.nodes() if object_rule.p_lhs[n] in lhs_nodes] rhs_nodes = keys_by_value(origin_typing[1], origin_n) lhs_m_lhs = g_m_g[0] p_m_p = g_m_g[1] rhs_m_rhs = g_m_g[1] lhs_m_origin_m = g_m_origin_m[0] p_m_origin_m = g_m_origin_m[1] rhs_m_origin_m = g_m_origin_m[1] for node in lhs_nodes: object_rule.lhs.remove_node(node) del lhs_m_lhs[node] del lhs_m_origin_m[node] for node in p_nodes: object_rule.p.remove_node(node) del p_m_p[node] del p_m_origin_m[node] for node in rhs_nodes: object_rule.rhs.remove_node(node) del rhs_m_rhs[node] del rhs_m_origin_m[node] self._restrictive_update_incident_homs(node_id, g_m_g) self._restrictive_update_incident_rels(node_id, g_m_g) return g_m_g, g_m_origin_m def _propagate_node_attrs_removal(self, origin_id, node_id, rule, p_origin_m, g_m_origin_m): """Propagate node attrs removal from 'origin_id' to 'graph_id'. Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed rule : regraph.Rule Original rewriting rule instance : dict Original instance """ if self.is_graph(node_id): super()._propagate_node_attrs_removal( origin_id, node_id, rule, p_origin_m, g_m_origin_m) else: pass # object_rule = self.get_rule(node_id) # origin_typing = self.get_typing(node_id, origin_id) # for lhs_node, attrs in rule.removed_node_attrs().items(): # lhs_nodes_to_remove_attrs = keys_by_value( # origin_typing[0], instance[lhs_node]) # for node in lhs_nodes_to_remove_attrs: # object_rule.lhs.remove_node_attrs(node, attrs) # p_nodes_to_remove_attrs = [ # n for n in object_rule.p.nodes() # if object_rule.p_lhs[n] in lhs_nodes_to_remove_attrs] # for node in p_nodes_to_remove_attrs: # object_rule.p.remove_node_attrs(node, attrs) # rhs_nodes_to_remove_attrs = keys_by_value( # origin_typing[1], instance[lhs_node]) # for node in rhs_nodes_to_remove_attrs: # object_rule.rhs.remove_node_attrs(node, attrs) def _propagate_edge_removal(self, origin_id, node_id, g_m_origin_m): """Propagate edge removal from 'origin_id' to 'graph_id'. Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting node_id : hashable ID of the node where propagation is performed p_origin_m : dict Instance of rule's interface inside the updated origin origin_m_origin : dict Map from the updated origin to the initial origin """ if self.is_graph(node_id): super()._propagate_edge_removal(origin_id, node_id, g_m_origin_m) else: rule = self.get_rule(node_id) origin_graph = self.get_graph(origin_id) for s, t in rule.lhs.edges(): origin_s = g_m_origin_m[0][s] origin_t = g_m_origin_m[0][t] if (origin_s, origin_t) not in origin_graph.edges(): rule.lhs.remove_edge(s, t) for s, t in rule.p.edges(): origin_s = g_m_origin_m[0][rule.p_lhs[s]] origin_t = g_m_origin_m[0][rule.p_lhs[t]] if (origin_s, origin_t) not in origin_graph.edges(): rule.p.remove_edge(s, t) for s, t in rule.rhs.edges(): origin_s = g_m_origin_m[1][s] origin_t = g_m_origin_m[1][t] if (origin_s, origin_t) not in origin_graph.edges(): rule.rhs.remove_edge(s, t) def _propagate_edge_attrs_removal(self, origin_id, node_id, rule, p_origin_m): """Propagate edge attrs removal from 'origin_id' to 'graph_id'. Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed rule : regraph.Rule Original rewriting rule p_origin_m : dict Instance of rule's interface inside the updated origin """ if self.is_graph(node_id): super()._propagate_edge_attrs_removal( origin_id, node_id, rule, p_origin_m) else: object_rule = self.get_rule(node_id) origin_typing = self.get_typing(node_id, origin_id) for (p_u, p_v), attrs in rule.removed_edge_attrs().items(): lhs_us = keys_by_value(origin_typing[0], p_origin_m[p_u]) lhs_vs = keys_by_value(origin_typing[0], p_origin_m[p_v]) for u in lhs_us: for v in lhs_vs: if (u, v) in object_rule.lhs.edges(): object_rule.lhs.removed_edge_attrs(u, v, attrs) p_us = [ n for n in object_rule.p.nodes if object_rule.p_lhs[n] in lhs_us] p_vs = [ n for n in object_rule.p.nodes if object_rule.p_lhs[n] in lhs_vs] for u in p_us: for v in p_vs: if (u, v) in object_rule.p.edges(): object_rule.p.removed_edge_attrs(u, v, attrs) rhs_us = keys_by_value(origin_typing[1], p_origin_m[p_u]) rhs_vs = keys_by_value(origin_typing[1], p_origin_m[p_v]) for u in rhs_us: for v in rhs_vs: if (u, v) in object_rule.rhs.edges(): object_rule.rhs.removed_edge_attrs(u, v, attrs) def _get_rule_liftings(self, graph_id, rule, instance, p_typing): pass def _get_rule_projections(self, graph_id, rule, instance, rhs_typing): pass # Implementation of the NXHierarchy-specific methods def __init__(self, attrs=None): """Initialize in-memory hierarchy.""" NXGraph.__init__(self) if attrs is None: attrs = dict() self.attrs = attrs self.rel_dict_factory = reldf = self.rel_dict_factory self.relation_edges = reldf()
[docs] def rules(self, data=True): """Return a list of rules in the hierarchy.""" if data: return [ (n, n_data["attrs"]) for n, n_data in self.nodes(True).items() if "rule" in n_data] else: return [n for n, n_data in self.nodes(True) if "rule" in n_data]
[docs] def rule_typings(self, data=True): """Return a list of rule typing edges in the hierarchy.""" if data: return [ (s, t, e_data["attrs"]) for s, t, e_data in self.edges(True).items() if "lhs_mapping" in e_data] else: return [ (s, t) for s, t, e_data in self.edges(True).items() if "lhs_mapping" in e_data]
[docs] def get_node_attrs(self, node_id): """Get attributes of a node in the hierarchy. node_id : hashable Id of the node """ return self.get_node(node_id)["attrs"]
[docs] def get_rule(self, rule_id): """Get a rule object associated to the node 'graph_id'.""" if rule_id not in self.nodes(): pass if not self.is_rule(rule_id): pass return self.node[rule_id]["rule"]
[docs] def add_rule(self, rule_id, rule, attrs=None): """Add rule to the hierarchy. Parameters ---------- rule_id : hashable Id of a new node in the hierarchy rule : regraph.rules.Rule Rule object corresponding to the new node of the hierarchy attrs : dict Dictionary containing attributes of the new node Raises ------ HierarchyError If the rule object is defined for directed/undirected graphs while the hierarchy's parameter `directed` is False/True (the hierarchy accommodates undirected/directed graphs) or if node with provided id already exists in the hierarchy """ if rule_id in self.nodes(): raise HierarchyError( "Node '{}' already exists in the hierarchy!".format( rule_id) ) self.add_node(rule_id) if attrs is not None: normalize_attrs(attrs) else: attrs = dict() self.update_node_attrs( rule_id, { "rule": rule, "attrs": attrs }, normalize=False) return
[docs] def add_rule_typing(self, rule_id, graph_id, lhs_mapping, rhs_mapping=None, lhs_total=False, rhs_total=False, attrs=None): """Add typing of a rule. source Id of a rule node to type target Id of a target graph node of typing lhs_mapping : dict Dictionary representing a mapping of nodes from the left-hand side of the rule to target's nodes rhs_mapping : dict Dictionary representing a mapping of nodes from the right-hand side of the rule to target's nodes lhs_total : bool True if left-hand side typing is total, False otherwise rhs_total : bool True if right-hand side typing is total, False otherwise attrs : dict Dictionary containing attributes of the new typing edge Raises ------ HierarchyError This error is raised in the following cases: * source or target ids are not found in the hierarchy * a typing edge between source and target already exists * a source node is not a rule * a target node is not a graph * addition of an edge produces paths that do not commute with some already existing paths InvalidHomomorphism If a homomorphisms from the left(right)-hand side to a graph at the target given by `lhs(rhs)_mapping` is not a valid homomorphism. """ if rule_id not in self.nodes(): raise HierarchyError( "Node '{}' is not defined in the hierarchy!".format(rule_id)) if graph_id not in self.nodes(): raise HierarchyError( "Node '{}' is not defined in the hierarchy!".format(graph_id)) if not self.is_rule(rule_id): raise HierarchyError( "Source of a rule typing should be a rule, " "`{}` is provided!".format( type(self.node[rule_id])) ) if not self.is_graph(graph_id): raise HierarchyError( "Target of a rule typing should be a graph, " "'{}' is provided!".format( type(self.node[graph_id]))) # check if an lhs typing is valid check_homomorphism( self.node[rule_id]["rule"].lhs, self.node[graph_id]["graph"], lhs_mapping, total=lhs_total ) new_rhs_mapping = rhs_mapping if new_rhs_mapping is None: new_rhs_mapping = dict() rule = self.node[rule_id]["rule"] for node in rule.rhs.nodes(): p_keys = keys_by_value(rule.p_rhs, node) if len(p_keys) == 1: l = rule.p_lhs[p_keys[0]] if l in lhs_mapping.keys(): new_rhs_mapping[node] = lhs_mapping[l] if len(p_keys) > 1: type_set = set() for p in p_keys: l = rule.p_lhs[p] if l in lhs_mapping.keys(): type_set.add(lhs_mapping[l]) if len(type_set) > 1: raise HierarchyError( "Invalid rule typing: rule merges nodes of different " "types (types that being merged: {})!".format(type_set) ) elif len(type_set) == 1: new_rhs_mapping[node] = list(type_set)[0] # check if an rhs typing is valid check_homomorphism( self.node[rule_id]["rule"].rhs, self.node[graph_id]["graph"], new_rhs_mapping, total=rhs_total ) # check if newly created path commutes with existing shortest paths self._check_rule_typing(rule_id, graph_id, lhs_mapping, new_rhs_mapping) self.add_edge(rule_id, graph_id) if attrs is not None: normalize_attrs(attrs) else: attrs = dict() self.set_edge( rule_id, graph_id, { "lhs_mapping": lhs_mapping, "rhs_mapping": new_rhs_mapping, "lhs_total": lhs_total, "rhs_total": rhs_total, "attrs": attrs }, normalize=False) return
[docs] def get_rule_typing(self, rule_id, graph_id): """Get typing dict of `source` by `target` (`source` is rule).""" if not self.is_rule(rule_id): raise HierarchyError("Hierarchy node '{}' is not a rule".format( rule_id)) if graph_id not in self.successors(rule_id): raise HierarchyError( "Rule '{}' is not typed by the graph '{}'".format( rule_id, graph_id)) rule = self.get_node(rule_id)["rule"] lhs_typing = self.get_edge(rule_id, graph_id)["lhs_mapping"] rhs_typing = self.get_edge(rule_id, graph_id)["rhs_mapping"] p_typing = {n: rhs_typing[rule.p_rhs[n]] for n in rule.p.nodes()} return (lhs_typing, p_typing, rhs_typing)
[docs] def is_graph(self, node_id): """Test if a hierarchy node is a graph.""" return "graph" in self.get_node(node_id)
[docs] def is_rule(self, node_id): """Test if a hierarchy node is a rule.""" return "rule" in self.get_node(node_id)
[docs] def is_typing(self, s, t): """Test if a hierarchy edge is a typing.""" return "mapping" in self.get_edge(s, t)
[docs] def is_rule_typing(self, s, t): """Test if a hierarchy edge is a rule typing.""" return "lhs_mapping" in self.get_edge(s, t)
[docs] def remove_node(self, node_id, reconnect=False): """Remove node from the hierarchy. Removes a node from the hierarchy, if the `reconnect` parameter is set to True, adds typing from the predecessors of the removed node to all its successors, by composing the homomorphisms (for every predecessor `p` and for every successor 's' composes two homomorphisms `p`->`node_id` and `node_id`->`s`, then removes `node_id` and all its incident edges, by which makes node's removal a procedure of 'forgetting' one level of 'abstraction'). Parameters ---------- node_id Id of a node to remove reconnect : bool Reconnect the descendants of the removed node to its predecessors Raises ------ HierarchyError If node with `node_id` is not defined in the hierarchy """ if node_id not in self.nodes(): raise HierarchyError( "Node '{}'' is not defined in the hierarchy!".format(node_id)) if reconnect: out_graphs = self.successors(node_id) in_graphs = self.predecessors(node_id) for source in in_graphs: for target in out_graphs: if self.is_rule_typing(source, node_id): lhs_map, rhs_map = self.get_rule_typing( source, node_id) new_lhs_map = compose( lhs_map, self.get_typing(node_id, target) ) new_rhs_map = compose( rhs_map, self.get_typing(node_id, target) ) if (source, target) not in self.edges(): self.add_rule_typing( source, target, new_lhs_map, new_rhs_map ) elif self.is_typing(source, node_id): # compose two homomorphisms mapping = compose( self.get_typing(source, node_id), self.get_typing(node_id, target) ) if (source, target) not in self.edges(): self.add_typing( source, target, mapping) nx.DiGraph.remove_node(self._graph, node_id) # Update dicts representing relations for u, v in self.relation_edges.keys(): if u == node_id or v == node_id: del self.relation_edges[u, v] return
[docs] def remove_rule(self, rule_id, reconnect=False): """Remove graph from the hierarchy. Removes a rule from the hierarchy, if the `reconnect` parameter is set to True, adds typing from the predecessors of the removed node to all its successors, by composing the homomorphisms (for every predecessor `p` and for every successor 's' composes two homomorphisms `p`->`node_id` and `node_id`->`s`, then removes `node_id` and all its incident edges, by which makes node's removal a procedure of 'forgetting' one level of 'abstraction'). Parameters ---------- rule_id Id of a rule to remove reconnect : bool Reconnect the descendants of the removed node to its predecessors Raises ------ HierarchyError If graph with `node_id` is not defined in the hierarchy """ if not self.is_rule(rule_id): raise HierarchyError( "Hierarchy node '{}' is a graph! ".format(rule_id) + "Use, 'remove_graph' method instead") self.remove_node(rule_id, reconnect)
[docs] def compose_path_typing(self, path): """Compose homomorphisms along the path. Parameters ---------- path : list List of nodes of the hierarchy forming a path Returns ------- If source node of the path is a graph homomorphism : dict Dictionary containg the typing of the nodes from the source graph of the path by the nodes of the target graph if source node of the path is a rule lhs_homomorphism : dict Dictionary containg the typing of the nodes from the left-hand side of the source rule of the path by the nodes of the target graph rhs_homomorphism : dict Dictionary containg the typing of the nodes from the right-hand side of the source rule of the path by the nodes of the target graph """ s = path[0] t = path[1] if self.is_graph(s): homomorphism = self.get_typing(s, t) for i in range(2, len(path)): s = path[i - 1] t = path[i] homomorphism = compose( homomorphism, self.get_typing(s, t) ) return homomorphism else: lhs_typing, p_typing, rhs_typing = self.get_rule_typing(s, t) for i in range(2, len(path)): s = path[i - 1] t = path[i] lhs_typing = compose( lhs_typing, self.get_typing(s, t) ) p_typing = compose( p_typing, self.get_typing(s, t) ) rhs_typing = compose( rhs_typing, self.get_typing(s, t) ) return (lhs_typing, p_typing, rhs_typing)
[docs] @classmethod def copy(cls, hierarchy): """Copy the hierarchy object.""" return copy.deepcopy(hierarchy)
[docs] def find_rule_matching(self, graph_id, rule_id): """Find matching of a rule `rule_id` form the hierarchy.""" if self.is_rule(graph_id): raise ReGraphError( "Pattern matching in a rule is not implemented!") if not self.is_rule(rule_id): raise HierarchyError("Invalid rule '{}' to match!".format(rule_id)) rule = self.node[rule_id]["rule"] lhs_typing = dict() rhs_typing = dict() rule_successors = self.successors(rule_id) for suc in rule_successors: lhs_typing[suc] = self.adj[rule_id][suc]["lhs_mapping"] rhs_typing[suc] = self.adj[rule_id][suc]["rhs_mapping"] instances = self.find_matching( graph_id, rule.lhs, lhs_typing ) return instances
def _update_graph(self, graph_id, graph_obj): """Update the graph object stored at the node of with id 'graph_id'.""" self.set_node_attrs( graph_id, { "graph": graph_obj, "attrs": self.node[graph_id]["attrs"] }, normalize=False ) def _update_mapping(self, source, target, mapping): """Update the mapping dictionary from source to target.""" if self.is_graph(source): self.update_edge_attrs( source, target, { "mapping": mapping, "attrs": self.get_typing_attrs(source, target) }, normalize=False ) else: lhs, rhs = mapping self._update_rule_homomorphism( source, target, lhs, rhs) def _update_relation(self, left, right, relation): """Update the relation dictionaries (left and right).""" old_attrs = copy.deepcopy(self.relation_edges[left, right]["attrs"]) self.remove_relation(left, right) self.add_relation(left, right, relation, old_attrs) def _update_rule(self, rule_id, rule_obj): """Update the rule object stored at the node of with id 'rule_id'.""" self.update_node_attrs( rule_id, { "rule": rule_obj, "attrs": self.node[rule_id]["attrs"] }, normalize=False ) def _update_rule_homomorphism(self, source, target, lhs_h, rhs_h): self.update_edge_attrs( source, target, { "lhs_mapping": lhs_h, "rhs_mapping": rhs_h, "attrs": self.get_typing_attrs(source, target) }, normalize=False ) def _update(self, graphs=None, homomorphisms=None, relations=None, rules=None, rule_homomorphisms=None): """Update parts of hierarchy with new objects.""" if graphs is None: graphs = dict() if homomorphisms is None: homomorphisms = dict() if relations is None: relations = dict() if rules is None: rules = dict() if rule_homomorphisms is None: rule_homomorphisms = dict() for graph, graph_obj in graphs.items(): self._update_graph(graph, graph_obj) for (s, t), mapping in homomorphisms.items(): self._update_mapping(s, t, mapping) for (l, r), relation in relations.items(): self._update_relation(l, r, relation) for rule, rule_obj in rules.items(): self._update_rule(rule, rule_obj) for (s, t), (lhs_h, rhs_h) in rule_homomorphisms.items(): self._update_rule_homomorphism(s, t, lhs_h, rhs_h) def _restrictive_update_incident_homs(self, node_id, g_m_g): """Update incident homomorphisms after a restrictive change.""" for suc in self.successors(node_id): typing = self.get_typing(node_id, suc) if self.is_graph(node_id): self._update_mapping(node_id, suc, compose(g_m_g, typing)) else: self._update_rule_homomorphism( node_id, suc, compose(g_m_g[0], typing[0]), compose(g_m_g[2], typing[1])) def _restrictive_update_incident_rels(self, graph_id, g_m_g): """Update incident relations after a restrictive change.""" if self.is_graph(graph_id): for related_g in self.adjacent_relations(graph_id): rel = self.get_relation(graph_id, related_g) new_rel = dict() for node in self.get_graph(graph_id).nodes(): old_node = g_m_g[node] if old_node in rel.keys(): new_rel[node] = rel[old_node] self._update_relation(graph_id, related_g, new_rel) def _expansive_update_incident_homs(self, graph_id, g_m_g_prime, pred_typings): """Update incident homomorphisms after an expansive change.""" for pred, typing in pred_typings.items(): if self.is_graph(pred): self._update_mapping( pred, graph_id, compose(typing, g_m_g_prime)) else: self._update_rule_homomorphism( pred, graph_id, compose(typing[0], g_m_g_prime), compose(typing[1], g_m_g_prime)) def _expansive_update_incident_rels(self, graph_id, g_m_g_prime, adj_relations): """Update incident relations after an expansive change.""" for related_g in self.adjacent_relations(graph_id): if self.is_graph(related_g): super()._expansive_update_incident_rels( graph_id, g_m_g_prime, adj_relations) else: pass
[docs] def apply_rule(self, graph_id, rule_id, instance): """Apply rule from the hierarchy.""" if self.is_rule(graph_id): raise ReGraphError("Rewriting of a rule is not implemented!") if not self.is_rule(rule_id): raise RewritingError( "Invalid rewriting rule '{}'!".format(rule_id)) rule = self.get_rule(rule_id) lhs_typing = dict() rhs_typing = dict() rule_successors = self.successors(rule_id) for suc in rule_successors: lhs_typing[suc] =\ self.adj[rule_id][suc]["lhs_mapping"] rhs_typing[suc] =\ self.adj[rule_id][suc]["rhs_mapping"] return self.rewrite( graph_id, rule, instance, rhs_typing=rhs_typing)
[docs] def to_json(self, rename_nodes=None): """Convert hierarchy to its json representation.""" return super().to_json(rename_nodes)
def _check_rule_typing(self, rule_id, graph_id, lhs_mapping, rhs_mapping): all_paths = dict(nx.all_pairs_shortest_path(self._graph)) paths_from_target = {} for s in self.nodes(): if s == graph_id: for key in all_paths[graph_id].keys(): paths_from_target[key] = all_paths[graph_id][key] for t in paths_from_target.keys(): if t != graph_id: new_lhs_h = compose( lhs_mapping, self.compose_path_typing(paths_from_target[t])) new_rhs_h = compose( rhs_mapping, self.compose_path_typing(paths_from_target[t])) try: # find homomorphisms from s to t via other paths s_t_paths = nx.all_shortest_paths(self._graph, rule_id, t) for path in s_t_paths: lhs_h, _, rhs_h = self.compose_path_typing(path) if lhs_h != new_lhs_h: raise HierarchyError( "Invalid lhs typing: homomorphism does not " "commute with an existing " "path from '{}' to '{}'!".format(s, t) ) if rhs_h != new_rhs_h: raise HierarchyError( "Invalid rhs typing: homomorphism does not " "commute with an existing " + "path from '{}' to '{}'!".format(s, t) ) except(nx.NetworkXNoPath): pass return def _get_identity_map(self, node_id): if self.is_graph(node_id): return { n: n for n in self.get_graph(node_id).nodes() } else: lhs_map = { n: n for n in self.get_rule(node_id).lhs.nodes() } p_map = { n: n for n in self.get_rule(node_id).p.nodes() } rhs_map = { n: n for n in self.get_rule(node_id).rhs.nodes() } return (lhs_map, p_map, rhs_map) def _restore_by_backward_composability(self, target_id, g_m_g, g_m_origin_ms): if self.is_graph(target_id): graph_nodes = self.get_graph(target_id).nodes() for pred in self.predecessors(target_id): pred_typing = self.get_typing(pred, target_id) if self.is_graph(pred): pred_graph = get_unique_map_to_pullback( graph_nodes, g_m_g, g_m_origin_ms[target_id], pred_typing, g_m_origin_ms[pred]) self.remove_typing(pred, target_id) self.add_typing(pred, target_id, pred_graph) else: lhs_typing, rhs_typing = pred_typing lhs_pred_graph = get_unique_map_to_pullback( graph_nodes, g_m_g, g_m_origin_ms[target_id], lhs_typing, g_m_origin_ms[pred][0]) rhs_pred_graph = get_unique_map_to_pullback( graph_nodes, g_m_g, g_m_origin_ms[target_id], rhs_typing, g_m_origin_ms[pred][1]) self.remove_typing(pred, target_id) self.add_rule_typing( pred, target_id, lhs_pred_graph, rhs_pred_graph)
[docs] def relabel_nodes(self, graph, mapping): """Relabel nodes of a graph in the hierarchy.""" graph_obj = self.get_graph(graph) graph_obj.relabel_nodes(mapping) # update homomorphisms for predecessor in self.predecessors(graph): old_typing = self.get_typing(predecessor, graph) self._update_mapping( predecessor, graph, compose(old_typing, mapping)) for successor in self.successors(graph): old_typing = self.get_typing(graph, successor) self._update_mapping( graph, successor, { mapping[k]: v for k, v in old_typing.items() } ) # update relations for adj in self.adjacent_relations(graph): old_rel = self.get_relation(graph, adj) self._update_relation( graph, adj, { mapping[k]: v for k, v in old_rel.items() } )
# def _compose_backward(self, pred_id, graph_id, g_m_g, graph_m_origin_m, # pred_m_origin_m, pred_typing): # graph_nodes = self.get_graph(graph_id).nodes() # return get_unique_map_to_pullback( # graph_nodes, # g_m_g, # graph_m_origin_m, # pred_typing, # pred_m_origin_m) def _compose_backward(self, pred_id, graph_id, g_m_g, graph_m_origin_m, pred_m_origin_m, pred_typing): if self.is_graph(pred_id): return super()._compose_backward( pred_id, graph_id, g_m_g, graph_m_origin_m, pred_m_origin_m, pred_typing) else: # rule = self.get_rule(graph_id) # lhs_nodes = rule.lhs.nodes() # rhs_nodes = rule.rhs.nodes() graph_nodes = self.get_graph(graph_id).nodes() lhs_typing, rhs_typing = pred_typing pred_m_origin_m_lhs, _, pred_m_origin_m_rhs = pred_m_origin_m lhs_hom = get_unique_map_to_pullback( graph_nodes, g_m_g, graph_m_origin_m, lhs_typing, pred_m_origin_m_lhs) rhs_hom = get_unique_map_to_pullback( graph_nodes, g_m_g, graph_m_origin_m, rhs_typing, pred_m_origin_m_lhs) return (lhs_hom, rhs_hom)