Source code for regraph.hierarchies

"""Abstract hierarchies of graphs in ReGraph.

This module contains abstact data structures for
graph hierarchies. A graph hierarchy is a DAG, whose nodes
are graphs and whose directed edges represent
homomorphisms between graphs. In addition, hierarchies are
equipped with relations on graphs (which can be thought of as
undirected edges or, alternatively, spans).
"""

from abc import ABC, abstractmethod
import copy
import json

import os

from regraph.exceptions import (HierarchyError,
                                ReGraphError,
                                InvalidHomomorphism,
                                RewritingError)

from regraph.category_utils import (compose,
                                    check_homomorphism,
                                    relation_to_span,
                                    pushout,
                                    get_unique_map_to_pullback,
                                    get_unique_map_from_pushout,
                                    is_monic,
                                    pullback,
                                    image_factorization,
                                    get_unique_map_to_pullback_complement)
from regraph.rules import Rule
from regraph.utils import (attrs_from_json,
                           attrs_to_json,
                           keys_by_value,
                           normalize_typing_relation,
                           test_strictness)


[docs]class Hierarchy(ABC): """Abstract class for graph hierarchy objects in ReGraph. A graph hierarchy is a DAG, where nodes are graphs with attributes and edges are homomorphisms representing graph typing in the system. """
[docs] @abstractmethod def graphs(self, data=False): """Return a list of graphs in the hierarchy.""" pass
[docs] @abstractmethod def typings(self, data=False): """Return a list of graph typing edges in the hierarchy.""" pass
[docs] @abstractmethod def relations(self, data=False): """Return a list of relations.""" pass
[docs] @abstractmethod def successors(self, node_id): """Return the set of successors.""" pass
[docs] @abstractmethod def predecessors(self, node_id): """Return the set of predecessors.""" pass
[docs] @abstractmethod def get_graph(self, graph_id): """Get a graph object associated to the node 'graph_id'.""" pass
[docs] @abstractmethod def get_typing(self, source_id, target_id): """Get a typing dict associated to the edge 'source_id->target_id'.""" pass
[docs] @abstractmethod def get_relation(self, left_id, right_id): """Get a relation dict associated to the rel 'left_id->target_id'.""" pass
[docs] @abstractmethod def get_graph_attrs(self, graph_id): """Get attributes of a graph in the hierarchy. Parameters ---------- graph_id : hashable Id of the graph """ pass
[docs] @abstractmethod def set_graph_attrs(self, node_id, attrs): """Set attributes of a graph in the hierarchy. Parameters ---------- graph_id : hashable Id of the graph """ pass
[docs] @abstractmethod def get_typing_attrs(self, source, target): """Get attributes of a typing in the hierarchy. Parameters ---------- source : hashable Id of the source graph target : hashable Id of the target graph """ pass
[docs] @abstractmethod def set_typing_attrs(self, source, target, attrs): """Set attributes of a typing in the hierarchy. Parameters ---------- source : hashable Id of the source graph target : hashable Id of the target graph """ pass
[docs] @abstractmethod def get_relation_attrs(self, left, right): """Get attributes of a reltion in the hierarchy. Parameters ---------- left : hashable Id of the left graph right : hashable Id of the right graph """ pass
[docs] @abstractmethod def set_relation_attrs(self, left, right, attrs): """Set attributes of a relation in the hierarchy. Parameters ---------- left : hashable Id of the left graph right : hashable Id of the right graph """ pass
[docs] @abstractmethod def set_node_relation(self, left_graph, right_graph, left_node, right_node): """Set relation for a particular node. Parameters ---------- """ pass
[docs] @abstractmethod def add_graph(self, graph_id, graph, graph_attrs=None): """Add a new graph to the hierarchy. Parameters ---------- graph_id : hashable Id of a new node in the hierarchy graph : regraph.Graph Graph object corresponding to the new node of the hierarchy graph_attrs : dict, optional Dictionary containing attributes of the new node """ pass
[docs] @abstractmethod def add_graph_from_data(self, graph_id, node_list, edge_list, attrs=None): """Add a new graph to the hierarchy from the input node/edge lists. Parameters ---------- graph_id : hashable Id of a new node in the hierarchy node_list : iterable List of nodes (with attributes) edge_list : iterable List of edges (with attributes) graph_attrs : dict, optional Dictionary containing attributes of the new node """ pass
[docs] @abstractmethod def add_empty_graph(self, graph_id, attrs=None): """"Add a new empty graph to the hierarchy. Parameters ---------- graph_id : hashable Id of a new node in the hierarchy graph_attrs : dict, optional Dictionary containing attributes of the new node """ pass
[docs] @abstractmethod def add_typing(self, source, target, mapping, attrs=None): """Add homomorphism to the hierarchy. Parameters ---------- source : hashable Id of the source graph node of typing target : hashable Id of the target graph node of typing mapping : dict Dictionary representing a mapping of nodes from the source graph to target's nodes attrs : dict Dictionary containing attributes of the new typing edge Raises ------ HierarchyError This error is raised in the following cases: * source or target ids are not found in the hierarchy * a typing edge between source and target already exists * addition of an edge between source and target creates a cycle or produces paths that do not commute with some already existing paths InvalidHomomorphism If a homomorphisms from a graph at the source to a graph at the target given by `mapping` is not a valid homomorphism. """ pass
[docs] @abstractmethod def add_relation(self, left, right, relation, attrs=None): """Add relation to the hierarchy. This method adds a relation between two graphs in the hierarchy corresponding to the nodes with ids `left` and `right`, the relation itself is defined by a dictionary `relation`, where a key is a node in the `left` graph and its corresponding value is a set of nodes from the `right` graph to which the node is related. Relations in the hierarchy are symmetric (see example below). Parameters ---------- left Id of the hierarchy's node represening the `left` graph right Id of the hierarchy's node represening the `right` graph relation : dict Dictionary representing a relation of nodes from `left` to the nodes from `right`, a key of the dictionary is assumed to be a node from `left` and its value a set of ids of related nodes from `right` attrs : dict Dictionary containing attributes of the new relation Raises ------ HierarchyError This error is raised in the following cases: * node with id `left`/`right` is not defined in the hierarchy; * node with id `left`/`right` is not a graph; * a relation between `left` and `right` already exists; * some node ids specified in `relation` are not found in the `left`/`right` graph. """ pass
[docs] @abstractmethod def remove_graph(self, graph_id, reconnect=False): """Remove graph from the hierarchy. Removes a graph from the hierarchy, if the `reconnect` parameter is set to True, adds typing from the predecessors of the removed node to all its successors, by composing the homomorphisms (for every predecessor `p` and for every successor 's' composes two homomorphisms `p`->`node_id` and `node_id`->`s`, then removes `node_id` and all its incident edges, by which makes node's removal a procedure of 'forgetting' one level of 'abstraction'). Parameters ---------- node_id Id of a graph to remove reconnect : bool Reconnect the descendants of the removed node to its predecessors Raises ------ HierarchyError If graph with `node_id` is not defined in the hierarchy """ pass
[docs] @abstractmethod def remove_typing(self, s, t): """Remove a typing from the hierarchy.""" pass
[docs] @abstractmethod def remove_relation(self, left, right): """Remove a relation from the hierarchy.""" pass
[docs] @abstractmethod def bfs_tree(self, graph, reverse=False): """BFS tree from the graph to all other reachable graphs.""" pass
[docs] @abstractmethod def shortest_path(self, source, target): """Shortest path from 'source' to 'target'.""" pass
[docs] @abstractmethod def copy_graph(self, graph_id, new_graph_id, attach_graphs=[]): """Create a copy of a graph in a hierarchy.""" pass
[docs] @abstractmethod def relabel_graph_node(self, graph_id, node, new_name): """Rename a node in a graph of the hierarchy.""" pass
[docs] @abstractmethod def relabel_graph(self, graph_id, new_graph_id): """Relabel a graph in the hierarchy. Parameters ---------- graph_id : hashable Id of the graph to relabel new_graph_id : hashable New graph id to assign to this graph """ pass
[docs] @abstractmethod def relabel_graphs(self, mapping): """Relabel graphs in the hierarchy. Parameters ---------- mapping: dict A dictionary with keys being old graph ids and their values being new id's of the respective graphs. Raises ------ ReGraphError If new id's do not define a set of distinct graph id's. """ pass
@abstractmethod def _update_mapping(self, source, target, mapping): """Update the mapping dictionary from source to target.""" pass @abstractmethod def _update_relation(self, left, right, relation): """Update the relation dictionaries (left and right).""" pass # Implemented methods def __str__(self): """String representation of the hierarchy.""" res = "" res += "\nGraphs:\n" for n, attrs in self.graphs(True): res += "\n{} {}\n".format(n, attrs) res += "\nTyping homomorphisms: \n" for n1, n2, attrs in self.typings(True): res += "{} -> {}: {}\n".format( n1, n2, attrs) res += "\nRelations:\n" for n1, n2, attrs in self.relations(True): res += "{}-{}: {}\n".format( n1, n2, attrs) return res def __eq__(self, hierarchy): """Hierarchy equality test.""" for node, attrs in self.graphs(True): if node not in hierarchy.graphs(): return False if attrs != hierarchy.get_graph_attrs(node): return False for s, t, attrs in self.typings(True): if (s, t) not in hierarchy.edges(): return False if attrs != hierarchy.get_typing_attrs(s, t): return False for n1, n2, attrs in self.relations(True): if (n1, n2) not in hierarchy.relations() and\ (n2, n1) not in hierarchy.relations(): return False if attrs != hierarchy.get_relation_attrs(n1, n2): return False return True def __ne__(self, hierarchy): """Non-equality operator.""" return not (self == hierarchy)
[docs] def add_graph_from_json(self, graph_id, json_data, attrs=None): """Add a new graph to the hirarchy from its JSON-reprsentation. Parameters ---------- graph_id : hashable Id of the new graph json_data : dict JSON-like dictionary containing the representation of the graph attrs : dict Attributes to attach to the new graph """ node_list = [] edge_list = [] for n in json_data["nodes"]: node_list.append((n["id"], attrs_from_json(n["attrs"]))) for e in json_data["edges"]: edge_list.append((e["from"], e["to"], attrs_from_json(e["attrs"]))) self.add_graph_from_data(graph_id, node_list, edge_list, attrs)
[docs] def to_json(self, rename_nodes=None): """Return json representation of the hierarchy. Parameters ---------- rename_nodes : dict, optional Dictionary specifying mapping of node ids from the original graph to its JSON-representation """ json_data = { "graphs": [], "typing": [], "relations": [] } for node, attrs in self.graphs(True): if rename_nodes and node in rename_nodes.keys(): node_id = rename_nodes[node] else: node_id = node json_data["graphs"].append({ "id": node_id, "graph": self.get_graph(node).to_json(), "attrs": attrs_to_json(attrs) }) for s, t, attrs in self.typings(True): if rename_nodes and s in rename_nodes.keys(): s_id = rename_nodes[s] else: s_id = s if rename_nodes and t in rename_nodes.keys(): t_id = rename_nodes[t] else: t_id = t json_data["typing"].append({ "from": s_id, "to": t_id, "mapping": self.get_typing(s, t), "attrs": attrs_to_json(attrs) }) visited = set() for u, v, attrs in self.relations(True): if rename_nodes and u in rename_nodes.keys(): u_id = rename_nodes[u] else: u_id = u if rename_nodes and v in rename_nodes.keys(): v_id = rename_nodes[v] else: v_id = v if not (u, v) in visited and not (v, u) in visited: visited.add((u, v)) json_data["relations"].append({ "from": u_id, "to": v_id, "rel": { a: list(b) for a, b in self.get_relation(u, v).items() }, "attrs": attrs_to_json(attrs) }) return json_data
[docs] @classmethod def from_json(cls, json_data, ignore=None): """Create a hierarchy object from JSON-representation. Parameters ---------- json_data : dict JSON-like dict containing representation of a hierarchy ignore : dict, optional Dictionary containing components to ignore in the process of converting from JSON, dictionary should respect the following format: { "graphs": <collection of ids of graphs to ignore>, "rules": <collection of ids of rules to ignore>, "typing": <collection of tuples containing typing edges to ignore>, "rule_typing": <collection of tuples containing rule typing edges to ignore>>, "relations": <collection of tuples containing relations to ignore>, } Returns ------- hierarchy : regraph.hierarchies.Hierarchy """ hierarchy = cls() # add graphs for graph_data in json_data["graphs"]: if ignore is not None and\ "graphs" in ignore.keys() and\ graph_data["id"] in ignore["graphs"]: pass else: if "attrs" not in graph_data.keys(): attrs = dict() else: attrs = attrs_from_json(graph_data["attrs"]) hierarchy.add_graph_from_json( graph_data["id"], graph_data["graph"], attrs) # add typing for typing_data in json_data["typing"]: if ignore is not None and\ "typing" in ignore.keys() and\ (typing_data["from"], typing_data["to"]) in ignore["typing"]: pass else: if "attrs" not in typing_data.keys(): attrs = dict() else: attrs = attrs_from_json(typing_data["attrs"]) hierarchy.add_typing( typing_data["from"], typing_data["to"], typing_data["mapping"], attrs) # add relations for relation_data in json_data["relations"]: from_g = relation_data["from"] to_g = relation_data["to"] if ignore is not None and\ "relations" in ignore.keys() and\ ((from_g, to_g) in ignore["relations"] or (to_g, from_g) in ignore["relations"]): pass else: if "attrs" not in relation_data.keys(): attrs = dict() else: attrs = attrs_from_json(relation_data["attrs"]) if (from_g, to_g) not in hierarchy.relations(): hierarchy.add_relation( relation_data["from"], relation_data["to"], {a: set(b) for a, b in relation_data["rel"].items()}, attrs ) return hierarchy
[docs] @classmethod def load(cls, filename, ignore=None): """Load the hierarchy from a file. Parameters ---------- filename : str Path to the file containing JSON-representation of the hierarchy ignore : dict Dictionary with graph elemenets to ignore when loading Returns ------- hierarchy : regraph.hierarchies.Hierarchy """ if os.path.isfile(filename): with open(filename, "r+") as f: json_data = json.loads(f.read()) hierarchy = cls.from_json(json_data, ignore) return hierarchy else: raise ReGraphError("File '{}' does not exist!".format(filename))
[docs] def export(self, filename): """Export the hierarchy to a file.""" with open(filename, 'w') as f: j_data = self.to_json() json.dump(j_data, f)
[docs] def adjacent_relations(self, g): """Return a list of related graphs.""" if g not in self.graphs(): raise HierarchyError( "Graph node '{}' does not exist in the hierarchy!".format(g)) return [ r for l, r in self.relations() if l == g ] + [ l for l, r in self.relations() if r == g ]
[docs] def node_type(self, graph_id, node_id): """Get a list of the immediate types of a node.""" if graph_id not in self.graphs(): raise HierarchyError( "Graph '{}' is not defined in the hierarchy!".format(graph_id) ) if node_id not in self.get_graph(graph_id).nodes(): raise HierarchyError( "Graph '{}'' does not have a node with id '{}'!".format( graph_id, node_id) ) types = dict() for successor in self.successors(graph_id): mapping = self.get_typing(graph_id, successor) if node_id in mapping.keys(): types[successor] = mapping[node_id] return types
[docs] def get_ancestors(self, graph_id): """Return ancestors of a graph with the typing morphisms.""" ancestors = dict() for pred in self.predecessors(graph_id): typing = self.get_typing(pred, graph_id) pred_ancestors = self.get_ancestors(pred) if pred in ancestors.keys(): ancestors.update(pred_ancestors) else: ancestors[pred] = typing for anc, anc_typing in pred_ancestors.items(): if anc in ancestors.keys(): ancestors[anc].update(compose(anc_typing, typing)) else: ancestors[anc] = compose(anc_typing, typing) return ancestors
[docs] def get_descendants(self, graph_id, maybe=None): """Return descendants of a graph with the typing morphisms.""" descendants = dict() for successor in self.successors(graph_id): mapping = self.get_typing(graph_id, successor) typing_descendants = self.get_descendants(successor, maybe) if successor in descendants.keys(): descendants[successor].update(mapping) else: descendants[successor] = mapping for anc, typ in typing_descendants.items(): if anc in descendants.keys(): descendants[anc].update(compose(mapping, typ)) else: descendants[anc] = compose(mapping, typ) return descendants
[docs] def compose_path_typing(self, path): """Compose homomorphisms along the path. Parameters ---------- path : list List of nodes of the hierarchy forming a path Returns ------- If source node of the path is a graph homomorphism : dict Dictionary containg the typing of the nodes from the source graph of the path by the nodes of the target graph if source node of the path is a rule lhs_homomorphism : dict Dictionary containg the typing of the nodes from the left-hand side of the source rule of the path by the nodes of the target graph rhs_homomorphism : dict Dictionary containg the typing of the nodes from the right-hand side of the source rule of the path by the nodes of the target graph """ s = path[0] t = path[1] homomorphism = self.get_typing(s, t) for i in range(2, len(path)): s = path[i - 1] t = path[i] homomorphism = compose( homomorphism, self.get_typing(s, t) ) return homomorphism
[docs] def get_rule_hierarchy(self, origin_id, rule, instance=None, p_typing=None, rhs_typing=None): """Find rule hierarchy corresponding to the input rewriting. Parameters ---------- graph_id : hashable Id of the graph to rewrite rule : regraph.Rule Rewriting rule instance : dict, optional Instance of the rule in the graph. If not specified, the identity of the left-hand side is used p_typing : dict Relations controlling backward propagation. The keys are ancestors of the rewritten graph, values are dictionaries containing individual relations between the nodes of a given ancestor and the preserved part of the rule rhs_typing : dict Relation controlling forward propagation. The keys are descendants of the rewritten graph, values are dictionaries containing individual relations between the right-hand side of the rule and the nodes of a given descendant Returns ------- rule_hierarchy : dictionary Dictionary contains two keys: (1) `rules` whose value is a dictionary with id's of the graphs in the hierarchy and the computed propagation rules; (2) `rule_homomorphisms` whose value is a dictionary with pairs of graphs in the hierarchy and the computed homomorphisms between rules. """ instance, p_typing, rhs_typing = self._check_rule_instance_typing( origin_id, rule, instance, p_typing, rhs_typing, False) instances = {origin_id: instance} rule_hierarchy = { "rules": {origin_id: rule}, "rule_homomorphisms": {} } # Compute rules and their map to the original rule l_g_ls = {} # LHS's of rule liftings to LHS of the original rule p_g_ps = {} # interfaces of rule liftings to LHS of the original rule ancestors = self.get_ancestors(origin_id) descendants = self.get_descendants(origin_id) # Compute rule liftings for ancestor, origin_typing in ancestors.items(): # Compute L_G l_g, l_g_g, l_g_l = pullback( self.get_graph(ancestor), rule.lhs, self.get_graph(origin_id), origin_typing, instance) # Compute canonical P_G canonical_p_g, p_g_l_g, p_g_p = pullback( l_g, rule.p, rule.lhs, l_g_l, rule.p_lhs) # Remove controlled things from P_G if ancestor in p_typing.keys(): l_g_factorization = { keys_by_value(l_g_g, k)[0]: v for k, v in p_typing[ancestor].items() } p_g_nodes_to_remove = set() for n in canonical_p_g.nodes(): l_g_node = p_g_l_g[n] # If corresponding L_G node is specified in # the controlling relation, remove all # the instances of P nodes not mentioned # in this relations if l_g_node in l_g_factorization.keys(): p_nodes = l_g_factorization[l_g_node] if p_g_p[n] not in p_nodes: del p_g_p[n] del p_g_l_g[n] p_g_nodes_to_remove.add(n) for n in p_g_nodes_to_remove: canonical_p_g.remove_node(n) rule_hierarchy["rules"][ancestor] =\ Rule(p=canonical_p_g, lhs=l_g, p_lhs=p_g_l_g) instances[ancestor] = l_g_g l_g_ls[ancestor] = l_g_l p_g_ps[ancestor] = p_g_p l_l_ts = {} # Original rule LHS to the LHS of rule projections p_p_ts = {} # Original rule interface to the inter. of rule projections r_r_ts = {} # Original rule RHS to the RHS of rule projections # Compute rule projections for descendant, origin_typing in descendants.items(): # Compute canonical P_T l_t, l_l_t, l_t_t = image_factorization( rule.lhs, self.get_graph(descendant), compose(instance, origin_typing)) # Compute canonical R_T r_t, l_t_r_t, r_r_t = pushout( rule.p, l_t, rule.rhs, compose(rule.p_lhs, l_l_t), rule.p_rhs) # Modify P_T and R_T according to the controlling # relation rhs_typing if descendant in rhs_typing.keys(): r_t_factorization = { r_r_t[k]: v for k, v in rhs_typing[descendant].items() } added_t_nodes = set() for n in r_t.nodes(): if n in r_t_factorization.keys(): # If corresponding R_T node is specified in # the controlling relation add nodes of T # that type it to P t_nodes = r_t_factorization[n] for t_node in t_nodes: if t_node not in l_t_t.values() and\ t_node not in added_t_nodes: new_p_node = l_t.generate_new_node_id( t_node) l_t.add_node(new_p_node) added_t_nodes.add(t_node) l_t_r_t[new_p_node] = n l_t_t[new_p_node] = t_node else: l_t_r_t[keys_by_value(l_t_t, t_node)[0]] = n rule_hierarchy["rules"][descendant] =\ Rule(lhs=l_t, p=l_t, rhs=r_t, p_rhs=l_t_r_t) instances[descendant] = l_t_t l_l_ts[descendant] = l_l_t p_p_ts[descendant] = {k: l_l_t[v] for k, v in rule.p_lhs.items()} r_r_ts[descendant] = r_r_t # Compute homomorphisms between rules for graph_id, graph_rule in rule_hierarchy["rules"].items(): if graph_id in ancestors: for successor in self.successors(graph_id): old_typing = self.get_typing(graph_id, successor) if successor == origin_id: graph_lhs_successor_lhs = l_g_ls[graph_id] graph_p_successor_p = p_g_ps[graph_id] rule_hierarchy["rule_homomorphisms"][ (graph_id, successor)] = ( graph_lhs_successor_lhs, graph_p_successor_p, compose( graph_p_successor_p, rule_hierarchy["rules"][successor].p_rhs) ) else: l_graph_successor = compose( instances[graph_id], old_typing) # already lifted to the successor if successor in ancestors: graph_rule = rule_hierarchy["rules"][graph_id] suc_rule = rule_hierarchy["rules"][successor] graph_lhs_successor_lhs = get_unique_map_to_pullback( suc_rule.lhs.nodes(), instances[successor], l_g_ls[successor], compose( instances[graph_id], old_typing), l_g_ls[graph_id]) graph_p_successor_p = get_unique_map_to_pullback( suc_rule.p.nodes(), suc_rule.p_lhs, p_g_ps[successor], compose( graph_rule.p_lhs, graph_lhs_successor_lhs), p_g_ps[graph_id]) rule_hierarchy["rule_homomorphisms"][ (graph_id, successor)] = ( graph_lhs_successor_lhs, graph_p_successor_p, graph_p_successor_p ) elif successor in descendants: rule_hierarchy["rule_homomorphisms"][(graph_id, successor)] = ( compose(l_g_ls[graph_id], l_l_ts[successor]), compose(p_g_ps[graph_id], p_p_ts[successor]), compose( compose(p_g_ps[graph_id], rule.p_rhs), r_r_ts[successor]) ) # didn't touch the successor or projected to it else: pass if graph_id in descendants: for predecessor in self.predecessors(graph_id): old_typing = self.get_typing(predecessor, graph_id) if predecessor == origin_id: predecessor_l_graph_l = l_l_ts[graph_id] predecessor_p_graph_p = p_p_ts[graph_id] predecessor_rhs_graph_rhs = r_r_ts[graph_id] rule_hierarchy["rule_homomorphisms"][ (predecessor, graph_id)] = ( predecessor_l_graph_l, predecessor_p_graph_p, predecessor_rhs_graph_rhs ) else: # already projected to the predecessor if predecessor in descendants: l_pred_graph = compose( instances[predecessor], old_typing) predecessor_l_graph_l = {} for k, v in instances[ predecessor].items(): predecessor_l_graph_l[k] = keys_by_value( instances[graph_id], l_pred_graph[k])[0] predecessor_rhs_graph_rhs = get_unique_map_from_pushout( rule_hierarchy["rules"][predecessor].rhs.nodes(), rule_hierarchy["rules"][predecessor].p_rhs, r_r_ts[predecessor], compose( predecessor_l_graph_l, rule_hierarchy["rules"][graph_id].p_rhs), r_r_ts[graph_id]) rule_hierarchy["rule_homomorphisms"][ (predecessor, graph_id)] = ( predecessor_l_graph_l, predecessor_l_graph_l, predecessor_rhs_graph_rhs ) # didn't touch the predecessor or lifter to it else: pass return rule_hierarchy, instances
[docs] def refine_rule_hierarchy(self, rule_hierarchy, instances): """Refine the input rule hierarchy to its reversible version. Parameters ---------- rule_hierarchy : dict Rule hierarchy to refine instances : dict of dict Dictionary containing ids of the graphs in the hierarchy as keys and dictionaries represening instances of the corresponding rules Returns ------- new_instances : dict of dict Dictionary containing ids of the graphs in the hierarchy as keys and dictionaries represening new instances of the corresponding refined rules """ new_lhs_instances = {} new_rules = {} new_rule_homomorphisms = {} # Refine individual rules for graph, rule in rule_hierarchy["rules"].items(): # refine rule new_lhs_instance = rule.refine( self.get_graph(graph), instances[graph]) new_lhs_instances[graph] = new_lhs_instance # Update rule homomorphisms for (source, target), (lhs_h, p_h, rhs_h) in rule_hierarchy[ "rule_homomorphisms"].items(): typing = self.get_typing(source, target) source_rule = rule_hierarchy["rules"][source] target_rule = rule_hierarchy["rules"][target] for node in source_rule.lhs.nodes(): if node not in lhs_h.keys(): source_node = new_lhs_instances[source][node] target_node = typing[source_node] target_lhs_node = keys_by_value( new_lhs_instances[target], target_node)[0] lhs_h[node] = target_lhs_node if node in source_rule.p_lhs.values(): source_p_node = keys_by_value( source_rule.p_lhs, node)[0] target_p_node = keys_by_value( target_rule.p_lhs, target_lhs_node)[0] p_h[source_p_node] = target_p_node source_rhs_node = source_rule.p_rhs[source_p_node] target_rhs_node = target_rule.p_rhs[target_p_node] rhs_h[source_rhs_node] = target_rhs_node # Add necessary node attrs for node in source_rule.lhs.nodes(): t_node = lhs_h[node] target_rule._add_node_attrs_lhs(t_node, source_rule.lhs.get_node(node)) # Add necessary node edges for s_lhs_s, s_lhs_t in source_rule.lhs.edges(): t_lhs_s = lhs_h[s_lhs_s] t_lhs_t = lhs_h[s_lhs_t] if not target_rule.lhs.exists_edge(t_lhs_s, t_lhs_t): target_rule._add_edge_lhs( t_lhs_s, t_lhs_t, source_rule.lhs.get_edge(s_lhs_s, s_lhs_t)) else: target_rule._add_edge_attrs_lhs( t_lhs_s, t_lhs_t, source_rule.lhs.get_edge(s_lhs_s, s_lhs_t)) # If merge is performed, add all the instances of the merged nodes for (source, target), (lhs_h, p_h, rhs_h) in rule_hierarchy[ "rule_homomorphisms"].items(): source_rule = rule_hierarchy["rules"][source] target_rule = rule_hierarchy["rules"][target] typing = self.get_typing(source, target) if len(target_rule.merged_nodes()) > 0: for rhs_node, p_nodes in target_rule.merged_nodes().items(): # rhs_node_instances = keys_by_value(typing, rhs_node) for p_node in p_nodes: lhs_node_instance = new_lhs_instances[target][ target_rule.p_lhs[p_node]] source_nodes = [ n for n in keys_by_value(typing, lhs_node_instance) ] source_nodes_to_add = [ s for s in source_nodes if s not in new_lhs_instances[source].values() ] for n in source_nodes_to_add: lhs_s_node = source_rule.lhs.generate_new_node_id(n) p_s_node, rhs_s_node = source_rule._add_node_lhs( lhs_s_node) # Add the instances of the new lhs node new_lhs_instances[source][lhs_s_node] = n # Add maps to the target for the new lhs node lhs_h[lhs_s_node] = target_rule.p_lhs[p_node] p_h[p_s_node] = p_node rhs_h[rhs_s_node] = rhs_node # Add identity rules where needed if len(rule_hierarchy["rules"]) == 0: # if the are no rules in the rule hierarchy, # create the identity rule for every graph for graph in self.graphs(): rule_hierarchy["rules"][graph] = Rule.identity_rule() new_lhs_instances[graph] = dict() for (s, t) in self.typings(): rule_hierarchy["rule_homomorphisms"][(s, t)] = ( dict(), dict(), dict()) else: # add identity rules where needed # to preserve the info on p/rhs_typing # add ancestors that are not included in rule hierarchy for graph, rule in rule_hierarchy["rules"].items(): for ancestor, typing in self.get_ancestors(graph).items(): if ancestor not in rule_hierarchy["rules"] and\ ancestor not in new_rules: # Find a typing of ancestor by the graph l_pred, l_pred_pred, l_pred_l_graph = pullback( self.get_graph(ancestor), rule.lhs, self.get_graph(graph), typing, new_lhs_instances[graph]) new_rules[ancestor] = Rule(p=l_pred, lhs=l_pred) new_lhs_instances[ancestor] = l_pred_pred r_pred_r_graph = { v: rule.p_rhs[k] for k, v in l_pred_l_graph.items() } for successor in self.successors(ancestor): if successor in rule_hierarchy["rules"]: if successor == graph: new_rule_homomorphisms[ (ancestor, graph)] = ( l_pred_l_graph, l_pred_l_graph, r_pred_r_graph ) else: path = self.shortest_path(graph, successor) lhs_h, p_h, rhs_h = rule_hierarchy[ "rule_homomorphisms"][ (path[0], path[1])] for i in range(2, len(path)): new_lhs_h, new_p_h, new_rhs_h =\ rule_hierarchy[ "rule_homomorphisms"][ (path[i - 1], path[i])] lhs_h = compose(lhs_h, new_lhs_h) p_h = compose(p_h, new_p_h) rhs_h = compose(rhs_h, new_rhs_h) new_rule_homomorphisms[ (ancestor, successor)] = ( compose(l_pred_l_graph, lhs_h), compose(l_pred_l_graph, p_h), compose(r_pred_r_graph, rhs_h) ) if successor in new_rules: lhs_h = { k: keys_by_value( new_lhs_instances[successor], self.get_typing( ancestor, successor)[v])[0] for k, v in new_lhs_instances[ ancestor].items() } new_rule_homomorphisms[ (ancestor, successor)] = ( lhs_h, lhs_h, lhs_h ) for predecessor in self.predecessors(ancestor): if predecessor in rule_hierarchy["rules"] or\ predecessor in new_rules: lhs_h = { k: keys_by_value( new_lhs_instances[ancestor], self.get_typing( predecessor, ancestor)[v])[0] for k, v in new_lhs_instances[ predecessor].items() } new_rule_homomorphisms[ (predecessor, ancestor)] = ( lhs_h, lhs_h, lhs_h ) for descendant, typing in self.get_descendants(graph).items(): if descendant not in rule_hierarchy["rules"] and\ descendant not in new_rules: l_suc, l_graph_l_suc, l_suc_suc = image_factorization( rule.lhs, self.get_graph(descendant), compose( new_lhs_instances[graph], typing)) new_rules[descendant] = Rule(p=l_suc, lhs=l_suc) new_lhs_instances[descendant] = l_suc_suc p_graph_p_suc = { k: l_graph_l_suc[v] for k, v in rule.p_lhs.items() } for predecessor in self.predecessors(descendant): if predecessor in rule_hierarchy["rules"]: if predecessor == graph: new_rule_homomorphisms[ (predecessor, descendant)] = ( l_graph_l_suc, p_graph_p_suc, p_graph_p_suc ) else: path = self.shortest_path( predecessor, graph) lhs_h, p_h, rhs_h = rule_hierarchy[ "rule_homomorphisms"][ (path[0], path[1])] for i in range(2, len(path)): new_lhs_h, new_p_h, new_rhs_h =\ rule_hierarchy[ "rule_homomorphisms"][ (path[i - 1], path[i])] lhs_h = compose(lhs_h, new_lhs_h) p_h = compose(p_h, new_p_h) rhs_h = compose(rhs_h, new_rhs_h) new_rule_homomorphisms[ (predecessor, descendant)] = ( compose(lhs_h, l_graph_l_suc), compose(p_h, p_graph_p_suc), compose(rhs_h, p_graph_p_suc) ) if predecessor in new_rules: lhs_h = { k: keys_by_value( new_lhs_instances[descendant], self.get_typing( predecessor, descendant)[v])[0] for k, v in new_lhs_instances[ predecessor].items() } new_rule_homomorphisms[ (predecessor, descendant)] = ( lhs_h, lhs_h, lhs_h ) for successor in self.successors(descendant): if successor in rule_hierarchy["rules"] or\ successor in new_rules: lhs_h = { k: keys_by_value( new_lhs_instances[successor], self.get_typing( descendant, successor)[v])[0] for k, v in new_lhs_instances[ descendant].items() } new_rule_homomorphisms[ (descendant, successor)] = ( lhs_h, lhs_h, lhs_h ) rule_hierarchy["rules"].update(new_rules) rule_hierarchy["rule_homomorphisms"].update( new_rule_homomorphisms) return new_lhs_instances
[docs] def unique_graph_id(self, prefix): """Generate a new graph id starting with a prefix.""" if prefix not in self.graphs(): return prefix i = 0 while "{}_{}".format(prefix, i) in self.graphs(): i += 1 return "{}_{}".format(prefix, i)
[docs] def duplicate_subgraph(self, graph_dict, attach_graphs=[]): """Duplicate a subgraph induced by the set of nodes. Parameters ---------- graph_dict : dict Dictionary contaning names of graphs to duplicate as keys and their new IDs in the hierarchy as values attach_graphs : list, optional List of not duplicated graph IDs that should be reattached to the duplicated graphs, if empty, duplicated subgraph is disconnected from the rest of the hierarchy """ old_graphs = self.graphs() for original, new in graph_dict.items(): if new in old_graphs: raise HierarchyError( "Cannot duplicate the graph '{}' as '{}': ".format( original, new) + "the graph '{}' ".format(new) + "already exists in the hierarchy!") # copy graphs for original, new in graph_dict.items(): self.copy_graph(original, new, attach_graphs) # copy typing between duplicated graphs visited = set() for g in graph_dict.keys(): preds = [ p for p in self.predecessors(g) if p in graph_dict.keys() and (p, g) not in visited] sucs = [ p for p in self.successors(g) if p in graph_dict.keys() and (g, p) not in visited] for s in sucs: self.add_typing( graph_dict[g], graph_dict[s], self.get_typing(g, s)) visited.add((g, s)) for p in preds: self.add_typing( graph_dict[p], graph_dict[g], self.get_typing(p, g)) visited.add((p, g))
[docs] def relation_to_span(self, left, right, edges=False, attrs=False): """Convert relation to a span. This method computes the span of the form `left` <- `common` -> `right` from a binary symmetric relation between two graphs in the hierarchy. Parameters ---------- left Id of the hierarchy's node represening the `left` graph right Id of the hierarchy's node represening the `right` graph edges : bool, optional If True, maximal set of edges is added to the common part graph attrs : bool, optional If True, maximal dict of attrs is added to the nodes of the common part graph Returns ------- common : nx.(Di)Graph Graph representing the common part graph induced by the relation left_h : dict Homomorphism from the common part graph to the left graph of the relation right_h : dict Homomorphism from the common part graph to the right graph of the relation Raises ------ HierarchyError If nodes corresponding to either `left` or `right` ids do not exist in the hierarchy, or there is no relation between them. """ if left not in self.nodes(): raise HierarchyError( "Node '{}' is not defined in the hierarchy!".format(left)) if right not in self.nodes(): raise HierarchyError( "Node '{}' is not defined in the hierarchy!".format(right)) if (left, right) not in self.relations() and\ (right, left) not in self.relations(): raise HierarchyError( "Relation between graphs '{}' and '{}' is not defined".format( left, right) ) common, left_h, right_h = relation_to_span( self.get_graph(left), self.get_graph(right), self.get_relation(left, right), edges, attrs) return common, left_h, right_h
[docs] def find_matching(self, graph_id, pattern, pattern_typing=None, nodes=None): """Find an instance of a pattern in a specified graph. Parameters ---------- graph_id : hashable Id of a graph in the hierarchy to search for matches pattern : Graph object A pattern to match pattern_typing : dict A dictionary that specifies a typing of a pattern, keys of the dictionary -- graph id that types a pattern, this graph should be among parents of the `graph_id` graph; values are mappings of nodes from pattern to the typing graph; nodes : iterable Subset of nodes where matching should be performed Returns ------- instances : list of dict List of matched instances """ if pattern_typing is None: pattern_typing = dict() # Check that 'typing_graph' and 'pattern_typing' are correctly # specified descendants = self.get_descendants(graph_id) if pattern_typing is not None: for typing_graph, _ in pattern_typing.items(): if typing_graph not in descendants.keys(): raise HierarchyError( "Pattern typing graph '{}' is not in " "the (transitive) typing graphs of '{}'!".format( typing_graph, graph_id) ) # Check pattern typing is a valid homomorphism for typing_graph, mapping in pattern_typing.items(): try: check_homomorphism( pattern, self.get_graph(typing_graph), mapping ) except InvalidHomomorphism as e: raise ReGraphError( "Specified pattern is not valid in the " "hierarchy (it produces the following error: " "{}) ".format(e) ) graph_typing = { typing_graph: self.get_typing(graph_id, typing_graph) for typing_graph in pattern_typing.keys() } instances = self.get_graph(graph_id).find_matching( pattern, nodes, graph_typing, pattern_typing) return instances
[docs] def rewrite(self, graph_id, rule, instance, p_typing=None, rhs_typing=None, strict=False): """Rewrite and propagate the changes backward & forward. Rewriting in the hierarchy cosists of an application of the SqPO-rewriting rule (given by the 'rule' parameter) to a graph in the hierarchy. Such rewriting often triggers a set of changes that are applied to other graphs and homomorphisms in the hierarchy, which are necessary to ensure that the hierarchy stays consistent. If the rule is restrictive (deletes nodes/edges/attrs or clones nodes), in general, the respective changes to all the graphs (transitively) typed by the graph subject to rewriting are made. On the other hand, if the rule is relaxing (adds nodes/edges/attrs or merges nodes), in general, the respective changes to all the graphs that (tansitively) type the graph subject to rewriting are made. Parameters ---------- graph_id Id of the graph in the hierarchy to rewrite rule : regraph.rule.Rule Rule object to apply instance : dict, optional Dictionary containing an instance of the lhs of the rule in the graph subject to rewriting, by default, tries to construct identity morphism of the nodes of the pattern p_typing : dict, optional Dictionary containing typing of graphs in the hierarchy by the interface of the rule, keys are ids of hierarchy graphs, values are dictionaries containing the mapping of nodes from the hierarchy graphs to the inteface nodes (note that a node from a graph can be typed by a set of nodes in the interface of the rule, e.g. if we want to perform cloning of some types, etc). rhs_typing : dict, optional Dictionary containing typing of the rhs by graphs of the hierarchy, keys are ids of hierarchy graphs, values are dictionaries containing the mapping of nodes from the rhs to the nodes of the typing graph given by the respective key of the value (note that a node from the rhs can be typed by a set of nodes of some graph, e.g. if we want to perform merging of some types, etc). strict : bool, optional Rewriting is strict when propagation down is not allowed Raises ------ HierarchyError If the graph is not in the database RewritingError If the provided p and rhs typing are inconsistent """ # Type check the input rule, its instance and typing instance, p_typing, rhs_typing = self._check_rule_instance_typing( graph_id, rule, instance, p_typing, rhs_typing, strict) # Perform a restrictive rewrite p_g_m, g_m_g = self._restrictive_rewrite(graph_id, rule, instance) # Propagate backward and fix broken homomorphisms self._propagate_backward( graph_id, rule, instance, p_g_m, g_m_g, p_typing) # Propagate forward and fix broken homomorphisms rhs_g_prime = self._expansive_rewrite_and_propagate_forward( graph_id, rule, instance, p_g_m, rhs_typing) return rhs_g_prime
[docs] def new_apply_rule_hierarchy(self, rule_hierarchy, instances): """Apply rule hierarchy. Parameters ---------- rule_hierarchy : dict Dictionary containing the input rule hierarchy instances : dict Dictionary containing an instance for every rule in the hierarchy Returns ------- rhs_instances : dict Dictionary containing the RHS instances for every graph in the hierarchy """ # Check if the rule hierarchy is applicable self._check_applicability(rule_hierarchy, instances) pass
[docs] def apply_rule_hierarchy(self, rule_hierarchy, instances): """Apply rule hierarchy. Parameters ---------- rule_hierarchy : dict Dictionary containing the input rule hierarchy instances : dict Dictionary containing an instance for every rule in the hierarchy Returns ------- rhs_instances : dict Dictionary containing the RHS instances for every graph in the hierarchy """ # Check if the rule hierarchy is applicable self._check_applicability(rule_hierarchy, instances) updated_graphs = {} # Apply rules to the hierarchy for graph_id, rule in rule_hierarchy["rules"].items(): instance = instances[graph_id] graph_obj = self.get_graph(graph_id) if rule.is_restrictive(): p_g_m, g_m_g = self._restrictive_rewrite( graph_id, rule, instance) else: p_g_m = { k: instance[v] for k, v in rule.p_lhs.items() } g_m_g = { n: n for n in graph_obj.nodes() } updated_graphs[graph_id] = { "p_g_m": p_g_m, "g_m_g": g_m_g } # Restore homomorphisms after restrictive rewrite # and backward propagation for (source, target), (_, p_h, _) in rule_hierarchy[ "rule_homomorphisms"].items(): old_typing = self.get_typing(source, target) source_m_target_m = get_unique_map_to_pullback_complement( updated_graphs[target]["p_g_m"], updated_graphs[target]["g_m_g"], p_h, updated_graphs[source]["p_g_m"], compose( updated_graphs[source]["g_m_g"], old_typing) ) self._update_mapping(source, target, source_m_target_m) for graph_id, rule in rule_hierarchy["rules"].items(): graph_obj = self.get_graph(graph_id) if rule.is_relaxing(): r_g_prime, g_m_g_prime = self._expansive_rewrite( graph_id, rule, updated_graphs[graph_id]["p_g_m"]) else: g_m_g_prime = { n: n for n in graph_obj.nodes() } r_g_prime = { v: updated_graphs[graph_id]["p_g_m"][k] for k, v in rule.p_rhs.items() } updated_graphs[graph_id] = { "g_m_g_prime": g_m_g_prime, "r_g_prime": r_g_prime } # Restore homomorphisms after expansive rewrite # and forward propagation for (source, target), (_, _, rhs_h) in rule_hierarchy[ "rule_homomorphisms"].items(): old_typing = self.get_typing(source, target) source_p_target_p = get_unique_map_from_pushout( self.get_graph(source).nodes(), updated_graphs[source]["g_m_g_prime"], updated_graphs[source]["r_g_prime"], compose( old_typing, updated_graphs[target]["g_m_g_prime"]), compose( rhs_h, updated_graphs[target]["r_g_prime"]) ) self._update_mapping(source, target, source_p_target_p) return {k: v["r_g_prime"] for k, v in updated_graphs.items()}
def _check_rule_instance_typing(self, origin_id, rule, instance, p_typing, rhs_typing, strict): # Normalize the instance, p_typing and rhs_typing if instance is None: instance = { n: n for n in rule.lhs.nodes() } if p_typing is None: p_typing = dict() else: p_typing = normalize_typing_relation(p_typing) if rhs_typing is None: rhs_typing = dict() else: rhs_typing = normalize_typing_relation(rhs_typing) # Check that the instance is valid try: check_homomorphism( rule.lhs, self.get_graph(origin_id), instance, total=True ) except InvalidHomomorphism as e: raise RewritingError( "Homomorphism from the pattern to the instance subgraph " "is not valid, got: '{}'".format(e)) # Check that the instance is a mono if not is_monic(instance): raise RewritingError( "Homomorphism from the pattern to the instance subgraph " "is not injective") # Check p_typing does not retype nodes for graph_id, typing in p_typing.items(): graph_to_origin = self.get_typing(graph_id, origin_id) for k, v in typing.items(): for vv in v: if graph_to_origin[k] != instance[rule.p_lhs[vv]]: raise RewritingError( "The specified typing of '{}' ".format(graph_id) + "by the interface is not valid: " "node '{}' is typed by '{}' ".format( k, graph_to_origin[k]) + "in the origin of rewriting, while the interface " "node '{}' is typed by '{}'.".format( vv, instance[rule.p_lhs[vv]])) # Check composability of p_typing for graph_id, typing in p_typing.items(): predecessors = self.predecessors(graph_id) for pred in predecessors: if pred not in p_typing: # check that the typing of 'graph_id' is canonical canonical = False for graph_n, p_nodes in typing.items(): if len(p_nodes) > 0: lhs_n = rule.p_lhs[list(p_nodes)[0]] canonical_clones = set(keys_by_value(rule.p_lhs, lhs_n)) if p_nodes == canonical_clones: canonical = False if not canonical: raise RewritingError( "Typing of '{}' by the interface ".format( graph_id) + "is not composable with the " "typig of '{}': ".format(pred) + "propagation to '{}' ".format(pred) + "is canonical and produces instances for {}, ".format( canonical_clones) + "while propagation to '{}' ".format( graph_id) + "produces only for '{}' ".format( p_nodes) ) successors = self.successors(graph_id) for suc in successors: suc_typing = self.get_typing(graph_id, suc) # check p_typing for suc is composable for graph_n, p_nodes in typing.items(): suc_n = suc_typing[graph_n] if suc in p_typing and suc_n in p_typing[suc]: suc_p_nodes = p_typing[suc][suc_n] if not p_nodes.issubset(suc_p_nodes): raise RewritingError( "Typing of '{}' by the interface ".format( graph_id) + "is not composable with the " "typig of '{}': ".format(suc) + "propagation to the node " "'{}' of '{}' ".format(graph_n, graph_id) + "will produce instances for {} ".format( p_nodes) + "while propagation to '{}' ".format( suc_n) + "typing it produces only {} ".format( suc_p_nodes) ) else: # ok, because suc_n is canonically cloned pass # Autocomplete and check rhs_typing new_rhs_typing = {} for graph_id, typing in rhs_typing.items(): for descendant, descendant_typing in self.get_descendants( graph_id).items(): if descendant not in rhs_typing: # autocomplete descendant typing in the new rhs typing new_rhs_typing[descendant] = { rhs_n: { descendant_typing[graph_n] for graph_n in graph_ns } for rhs_n, graph_ns in typing.items() } else: # autocomplete descendant typing with missing rhs nodes # and check that already specified types for descendant # are composable with the typing for 'graph_id' descendant_rhs_typing = rhs_typing[descendant] for rhs_n, graph_ns in typing.items(): if rhs_n in descendant_rhs_typing: descendant_ns = descendant_rhs_typing[rhs_n] for graph_n in graph_ns: if descendant_typing[graph_n] not in descendant_ns: raise RewritingError( "Typing of the RHS " "by '{}' is not composable ".format( graph_id) + "with its typing by '{}': ".format( descendant) + "node '{}' is typed by '{}' ".format( rhs_n, graph_n) + "in '{}' that is not typed by ".format( graph_id) + "either of {} from '{}'".format( descendant_ns, descendant) ) else: new_rhs_typing[descendant] = rhs_typing[descendant] new_rhs_typing[descendant][rhs_n] = { descendant_typing[graph_n] for graph_n in graph_ns } for g, t in new_rhs_typing.items(): rhs_typing[g] = t # Check rhs_typing does not retype nodes for graph_id, typing in rhs_typing.items(): origin_to_graph = self.get_typing(origin_id, graph_id) for k, v in typing.items(): p_nodes = keys_by_value(rule.p_rhs, k) if len(p_nodes) > 0: graph_nodes = set([ origin_to_graph[instance[rule.p_lhs[p_node]]] for p_node in p_nodes]) if graph_nodes != v: raise RewritingError( "The specified typing of the RHS " "by the graph '{}' ".format(graph_id) + "is not valid: " "node '{}' is a typed by {} ".format( k, graph_nodes) + "in the origin of rewriting, while it is " "typed by {} in the typing.".format(v)) # If rewriting is strict, check p_typing types all clones, # there are no instances of removed elements and # and rhs typing types all new nodes and no different # types are merged if strict is True: test_strictness(self, origin_id, rule, instance, p_typing, rhs_typing) return instance, p_typing, rhs_typing def _restrictive_rewrite(self, graph_id, rule, instance): """Perform a restrictive rewrite of the specified graph. This method rewrites the graph and updates its typing by the immediate successors. Note that as the result of this update, some homomorphisms (from ancestors) are broken! """ # Extract the restrictive part of the rule restrictive_rule = Rule(p=rule.p, lhs=rule.lhs, p_lhs=rule.p_lhs) g = self.get_graph(graph_id) p_g_m = g.rewrite( restrictive_rule, instance) g_m_g = { v: instance[restrictive_rule.p_lhs[k]] for k, v in p_g_m.items() } g_m_g.update( { n: n for n in g.nodes() if n not in p_g_m.values() }) self._restrictive_update_incident_homs(graph_id, g_m_g) self._restrictive_update_incident_rels(graph_id, g_m_g) return p_g_m, g_m_g def _expansive_rewrite(self, graph_id, rule, instance): """Perform an expansive rewrite of the specified graph. This method rewrites the graph and updates its typing by the immediate predecessors. Note that as the result of this update, some homomorphisms (to descendants) are broken! """ # Extract the expansive part of the rule expansive_rule = Rule(p=rule.p, rhs=rule.rhs, p_rhs=rule.p_rhs) g = self.get_graph(graph_id) pred_typings = { p: self.get_typing(p, graph_id) for p in self.predecessors(graph_id) } adj_relations = { a: self.get_relation(graph_id, a) for a in self.adjacent_relations(graph_id) } r_g_prime = g.rewrite( expansive_rule, instance) g_m_g_prime = { v: r_g_prime[expansive_rule.p_rhs[k]] for k, v in instance.items() } g_m_g_prime.update( { n: n for n in g.nodes() if n not in instance.values() }) self._expansive_update_incident_homs(graph_id, g_m_g_prime, pred_typings) self._expansive_update_incident_rels(graph_id, g_m_g_prime, adj_relations) return r_g_prime, g_m_g_prime def _compose_backward(self, pred_id, graph_id, g_m_g, graph_m_origin_m, pred_m_origin_m, pred_typing): graph_nodes = self.get_graph(graph_id).nodes() return get_unique_map_to_pullback( graph_nodes, g_m_g, graph_m_origin_m, pred_typing, pred_m_origin_m) def _propagate_backward(self, origin_id, rule, instance, p_origin_m, origin_m_origin, p_typing): """Peform backward propagation of the original rewriting. Parameters ---------- Returns ------- """ g_m_gs = {origin_id: origin_m_origin} g_m_origin_ms = {} for graph_id in self.bfs_tree(origin_id, reverse=True): graph_p_typing = {} if graph_id in p_typing.keys(): graph_p_typing = p_typing[graph_id] origin_typing = self.get_typing(graph_id, origin_id) g_m_g = self._get_identity_map(graph_id) g_m_origin_m = copy.deepcopy(origin_typing) # Propagate node clones if len(rule.cloned_nodes()) > 0: self._propagate_clone( origin_id, graph_id, p_origin_m, origin_m_origin, graph_p_typing, g_m_g, g_m_origin_m) # Propagate node deletes if len(rule.removed_nodes()) > 0: self._propagate_node_removal( origin_id, graph_id, rule, instance, g_m_g, g_m_origin_m) # Propagate node attrs deletes if len(rule.removed_node_attrs()) > 0: self._propagate_node_attrs_removal( origin_id, graph_id, rule, p_origin_m, g_m_origin_m) # Propagate edge deletes if len(rule.removed_edges()) > 0: self._propagate_edge_removal( origin_id, graph_id, g_m_origin_m) # Propagate edge attrs deletes if len(rule.removed_edge_attrs()) > 0: self._propagate_edge_attrs_removal( origin_id, graph_id, rule, p_origin_m, g_m_origin_m) g_m_gs[graph_id] = g_m_g g_m_origin_ms[graph_id] = g_m_origin_m # Reconnect broken homomorphisms by composability for graph_id, g_m_g in g_m_gs.items(): for pred in self.predecessors(graph_id): pred_typing = self.get_typing(pred, graph_id) if graph_id != origin_id: pred_graph = self._compose_backward( pred, graph_id, g_m_g, g_m_origin_ms[graph_id], g_m_origin_ms[pred], pred_typing) else: pred_graph = g_m_origin_ms[pred] self._update_mapping(pred, graph_id, pred_graph) def _expansive_rewrite_and_propagate_forward(self, origin_id, rule, instance, p_origin_m, rhs_typing): bfs_tree = self.bfs_tree(origin_id) bfs_tree.reverse() g_g_primes = {} rhs_g_primes = {} for graph in bfs_tree: origin_typing = self.get_typing(origin_id, graph) rhs_graph_typing = {} if graph in rhs_typing.keys(): rhs_graph_typing = rhs_typing[graph] g_g_prime = { n: n for n in self.get_graph(graph).nodes() } rhs_g_prime = compose(p_origin_m, origin_typing) pred_typings = { p: self.get_typing(p, graph) for p in self.predecessors(graph) } adj_relations = { a: self.get_relation(graph, a) for a in self.adjacent_relations(graph) } # Propagate node merges if len(rule.merged_nodes()) > 0: self._propagate_merge( origin_id, graph, rule, p_origin_m, g_g_prime, rhs_g_prime) # Propagate node additions if len(rule.added_nodes()) > 0: self._propagate_node_addition( origin_id, graph, rule, rhs_graph_typing, g_g_prime, rhs_g_prime) self._expansive_update_incident_homs( graph, g_g_prime, pred_typings) self._expansive_update_incident_rels( graph, g_g_prime, adj_relations) keys_to_remove = set() keys_to_add = dict() for k, v in rhs_g_prime.items(): if k not in rule.rhs.nodes(): if k in rule.p.nodes(): keys_to_add[rule.p_rhs[k]] = v keys_to_remove.add(k) for k in keys_to_remove: del rhs_g_prime[k] for k, v in keys_to_add.items(): rhs_g_prime[k] = v # Propagate node attrs additions if len(rule.added_node_attrs()) > 0: self._propagate_node_attrs_addition( origin_id, graph, rule, rhs_g_prime) # Propagate edge additions if len(rule.added_edges()) > 0: self._propagate_edge_addition( origin_id, graph, rule, rhs_g_prime) # Propagate edge attrs additions if len(rule.added_edge_attrs()) > 0: self._propagate_edge_attrs_addition( origin_id, graph, rule, rhs_g_prime) g_g_primes[graph] = g_g_prime rhs_g_primes[graph] = rhs_g_prime # Perform an expansive rewrite rhs_origin_prime, origin_m_origin_prime = self._expansive_rewrite( origin_id, rule, p_origin_m) rhs_g_primes[origin_id] = rhs_origin_prime g_g_primes[origin_id] = origin_m_origin_prime # Reconnect broken homomorphisms by composability for graph_id, g_g_prime in g_g_primes.items(): graph_nodes = self.get_graph(graph_id).nodes() for suc in self.successors(graph_id): suc_typing = self.get_typing(graph_id, suc) for rhs_node in rule.rhs.nodes(): g_nodes = keys_by_value( g_g_prime, rhs_g_primes[graph_id][rhs_node]) suc_node = rhs_g_primes[suc][rhs_node] for n in g_nodes: suc_typing[n] = suc_node graph_suc = get_unique_map_from_pushout( graph_nodes, rhs_g_primes[graph_id], g_g_prime, rhs_g_primes[suc], suc_typing) self._update_mapping(graph_id, suc, graph_suc) return rhs_origin_prime @staticmethod def _produce_clones(origin_clones, p_origin_m, g, origin_typing, graph_p, local_g_m_g, local_g_m_origin_m): cloned_nodes = {} for n, clones in origin_clones.items(): nodes_to_clone = keys_by_value(origin_typing, n) for node in nodes_to_clone: if node in graph_p.keys(): p_nodes = graph_p[node] else: p_nodes = [ keys_by_value(p_origin_m, c)[0] for c in clones ] for i, p_node in enumerate(p_nodes): if i == 0: cloned_nodes[node] = p_node local_g_m_g[node] = node local_g_m_origin_m[node] = p_origin_m[p_node] else: new_name = g.clone_node(node) cloned_nodes[new_name] = p_node local_g_m_g[new_name] = node local_g_m_origin_m[new_name] = p_origin_m[p_node] return cloned_nodes def _propagate_clone(self, origin_id, node_id, p_origin_m, origin_m_origin, p_typing, g_m_g, g_m_origin_m): """Propagate clones from 'origin_id' to 'graph_id'. Perform a controlled propagation of clones to 'graph' Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed p_origin_m : dict Instance of rule's interface inside the updated origin origin_m_origin : dict Map from the updated origin to the initial origin p_typing : dict Controlling relation from the nodes of 'graph_id' to the nodes of the interfaces Returns ------- g_m_g : dict Map from the updated 'graph_id' to the 'graph_id' """ cloned_origin_nodes = {} for n in set(origin_m_origin.values()): clones = keys_by_value(origin_m_origin, n) if len(clones) > 1: cloned_origin_nodes[n] = clones graph = self.get_graph(node_id) origin_typing = self.get_typing(node_id, origin_id) cloned_origin_nodes = {} for n in set(origin_m_origin.values()): clones = keys_by_value(origin_m_origin, n) if len(clones) > 1: cloned_origin_nodes[n] = clones self._produce_clones( cloned_origin_nodes, p_origin_m, graph, origin_typing, p_typing, g_m_g, g_m_origin_m) self._restrictive_update_incident_homs(node_id, g_m_g) self._restrictive_update_incident_rels(node_id, g_m_g) return g_m_g, g_m_origin_m def _propagate_node_removal(self, origin_id, node_id, rule, instance, g_m_g, g_m_origin_m): """Propagate node removal from 'origin_id' to 'graph_id'. Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed origin_m_origin : dict Map from the updated origin to the initial origin Returns ------- g_m_g : dict Map from the updated 'graph_id' to the 'graph_id' """ graph = self.get_graph(node_id) origin_typing = self.get_typing(node_id, origin_id) for lhs_node in rule.removed_nodes(): origin_n = instance[lhs_node] graph_nodes = keys_by_value(origin_typing, origin_n) for node in graph_nodes: graph.remove_node(node) del g_m_g[node] del g_m_origin_m[node] for node in graph.nodes(): if node not in origin_typing.keys(): graph.remove_node(node) del g_m_g[node] del g_m_origin_m[node] self._restrictive_update_incident_homs(node_id, g_m_g) self._restrictive_update_incident_rels(node_id, g_m_g) return g_m_g, g_m_origin_m def _propagate_node_attrs_removal(self, origin_id, node_id, rule, p_origin_m, g_m_origin_m): """Propagate node attrs removal from 'origin_id' to 'graph_id'. Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed rule : regraph.Rule Original rewriting rule instance : dict Original instance """ graph = self.get_graph(node_id) # origin_typing = self.get_typing(node_id, origin_id) for p_node, attrs in rule.removed_node_attrs().items(): nodes_to_remove_attrs = keys_by_value( g_m_origin_m, p_origin_m[p_node]) for node in nodes_to_remove_attrs: graph.remove_node_attrs(node, attrs) def _propagate_edge_removal(self, origin_id, node_id, g_m_origin_m): """Propagate edge removal from 'origin_id' to 'graph_id'. Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting node_id : hashable ID of the node where propagation is performed p_origin_m : dict Instance of rule's interface inside the updated origin origin_m_origin : dict Map from the updated origin to the initial origin """ graph = self.get_graph(node_id) origin_graph = self.get_graph(origin_id) edges_to_remove = set() for s, t in graph.edges(): origin_s = g_m_origin_m[s] origin_t = g_m_origin_m[t] if (origin_s, origin_t) not in origin_graph.edges(): edges_to_remove.add((s, t)) for s, t in edges_to_remove: graph.remove_edge(s, t) def _propagate_edge_attrs_removal(self, origin_id, node_id, rule, p_origin_m, g_m_origin_m): """Propagate edge attrs removal from 'origin_id' to 'graph_id'. Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed rule : regraph.Rule Original rewriting rule p_origin_m : dict Instance of rule's interface inside the updated origin """ graph = self.get_graph(node_id) for (p_u, p_v), attrs in rule.removed_edge_attrs().items(): us = keys_by_value(g_m_origin_m, p_origin_m[p_u]) vs = keys_by_value(g_m_origin_m, p_origin_m[p_v]) for u in us: for v in vs: if (u, v) in graph.edges(): graph.remove_edge_attrs(u, v, attrs) def _propagate_merge(self, origin_id, graph_id, rule, p_origin_m, g_g_prime, rhs_g_prime): """Propagate merges from 'origin_id' to 'graph_id'. Perform a propagation of merges to 'graph' Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed rule : regraph.Rule Original rewriting rule p_origin_m : dict Instance of rule's interface inside the updated origin rhs_origin_prime : dict Instance of rule's rhs inside the updated origin g_g_prime : dict Map from the nodes of the graph 'graph_id' to the updated graph rhs_g_prime : dict Map from the RHS to the updated graph with 'graph_id' """ graph = self.get_graph(graph_id) for rhs_node, p_nodes in rule.merged_nodes().items(): graph_nodes = set([ rhs_g_prime[p_node] for p_node in p_nodes ]) if len(graph_nodes) > 1: merged_node_id = graph.merge_nodes(graph_nodes) for n in graph_nodes: g_g_prime[n] = merged_node_id for p_node in p_nodes: if p_node in rhs_g_prime: del rhs_g_prime[p_node] rhs_g_prime[rhs_node] = merged_node_id map_to_merge_node = set() for rhs_n, g_prime_n in rhs_g_prime.items(): if g_prime_n in graph_nodes: map_to_merge_node.add(rhs_n) for n in map_to_merge_node: rhs_g_prime[n] = merged_node_id else: for p_node in p_nodes: if p_node in rhs_g_prime: del rhs_g_prime[p_node] rhs_g_prime[rhs_node] = list(graph_nodes)[0] def _propagate_node_addition(self, origin_id, graph_id, rule, rhs_typing, g_g_prime, rhs_g_prime): """Propagate node additions from 'origin_id' to 'graph_id'. Perform a propagation of additions to 'graph' Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed rule : regraph.Rule Original rewriting rule rhs_origin_prime : dict Instance of rule's rhs inside the updated origin rhs_typing : dict Typing of the nodes from the rhs in 'graph_id' rhs_g_prime : dict Map from the RHS to the updated graph with 'graph_id' """ graph = self.get_graph(graph_id) for rhs_node in rule.added_nodes(): if rhs_node in rhs_typing: if len(rhs_typing[rhs_node]) == 1: rhs_g_prime[rhs_node] = list( rhs_typing[rhs_node])[0] else: nodes_to_merge = rhs_typing[rhs_node] new_node_id = graph.merge_nodes(nodes_to_merge) for n in nodes_to_merge: g_g_prime[n] = new_node_id p_nodes = keys_by_value(rhs_g_prime, n) for p_node in p_nodes: if p_node in rhs_g_prime: del rhs_g_prime[p_node] rhs_g_prime[rhs_node] = new_node_id map_to_merge_node = set() for rhs_n, g_prime_n in rhs_g_prime.items(): if g_prime_n in nodes_to_merge: map_to_merge_node.add(rhs_n) for n in map_to_merge_node: rhs_g_prime[n] = new_node_id else: new_node_id = graph.generate_new_node_id(rhs_node) graph.add_node(new_node_id) rhs_g_prime[rhs_node] = new_node_id def _propagate_node_attrs_addition(self, origin_id, graph_id, rule, rhs_g_prime): """Propagate node attrs additions from 'origin_id' to 'graph_id'. Perform a propagation of additions to 'graph' Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed rule : regraph.Rule Original rewriting rule rhs_origin_prime : dict Instance of rule's rhs inside the updated origin origin_prime_g_prime : dict Map from the updated origin to the updated graph with 'graph_id' """ graph = self.get_graph(graph_id) for rhs_node, attrs in rule.added_node_attrs().items(): graph.add_node_attrs( rhs_g_prime[rhs_node], attrs) def _propagate_edge_addition(self, origin_id, graph_id, rule, rhs_g_prime): """Propagate edge additions from 'origin_id' to 'graph_id'. Perform a propagation of additions to 'graph' Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed rule : regraph.Rule Original rewriting rule rhs_origin_prime : dict Instance of rule's rhs inside the updated origin origin_prime_g_prime : dict Map from the updated origin to the updated graph with 'graph_id' """ graph = self.get_graph(graph_id) for s, t in rule.added_edges(): g_s = rhs_g_prime[s] g_t = rhs_g_prime[t] if (g_s, g_t) not in graph.edges(): graph.add_edge(g_s, g_t) def _propagate_edge_attrs_addition(self, origin_id, graph_id, rule, rhs_g_prime): """Propagate edge attrs additions from 'origin_id' to 'graph_id'. Perform a propagation of additions to 'graph' Parameters ---------- origin_id : hashable ID of the graph corresponding to the origin of rewriting graph_id : hashable ID of the graph where propagation is performed """ graph = self.get_graph(graph_id) for (s, t), attrs in rule.added_edge_attrs().items(): g_s = rhs_g_prime[s] g_t = rhs_g_prime[t] graph.add_edge_attrs(g_s, g_t, attrs) def _get_identity_map(self, graph_id): return { n: n for n in self.get_graph(graph_id).nodes() } def _restrictive_update_incident_homs(self, node_id, g_m_g): for suc in self.successors(node_id): typing = self.get_typing(node_id, suc) self._update_mapping(node_id, suc, compose(g_m_g, typing)) def _restrictive_update_incident_rels(self, graph_id, g_m_g): for related_g in self.adjacent_relations(graph_id): rel = self.get_relation(graph_id, related_g) new_rel = dict() for node in self.get_graph(graph_id).nodes(): old_node = g_m_g[node] if old_node in rel.keys(): new_rel[node] = rel[old_node] self._update_relation(graph_id, related_g, new_rel) def _expansive_update_incident_homs(self, graph_id, g_m_g_prime, pred_typings): for pred, typing in pred_typings.items(): self._update_mapping( pred, graph_id, compose(typing, g_m_g_prime)) def _expansive_update_incident_rels(self, graph_id, g_m_g_prime, adj_relations): for related_g, rel in adj_relations.items(): new_rel = dict() for node in self.get_graph(graph_id).nodes(): if node in g_m_g_prime: new_node = g_m_g_prime[node] if node in rel.keys(): new_rel[new_node] = rel[node] self._update_relation(graph_id, related_g, new_rel) def _check_applicability(self, rule_hierarchy, instances): """Check if a rule hierarchy is applicable.""" # Check that instances commute for (s, t) in self.typings(): typing = self.get_typing(s, t) lhs_s_t1 = compose(instances[s], typing) lhs_s_t2 = compose( rule_hierarchy["rule_homomorphisms"][(s, t)][0], instances[t]) if (lhs_s_t1 != lhs_s_t2): raise RewritingError( "Instance of a specified rule for '{}' ({}) ".format( s, instances[s]) + "is not compatible with such instance for '{}' ({})".format( t, instances[t])) # Check the applicability t_rule = rule_hierarchy["rules"][t] if t_rule.is_restrictive(): # Check the square L_S -> S -> T and L_S -> L_T -> T is a PB for lhs_t_node in t_rule.lhs.nodes(): t_node = instances[t][lhs_t_node] s_nodes = keys_by_value(typing, t_node) if lhs_t_node in t_rule.removed_nodes(): for s_node in s_nodes: error = False if s_node not in instances[s].values(): error = True elif keys_by_value(instances[s], s_node)[ 0] not in rule_hierarchy[ "rules"][s].removed_nodes(): error = True if error: raise RewritingError( "Specified rule hierarchy is not applicable " "the typing of the node '{}' ".format(s_node) + "from the graph '{}' is removed ".format(s) + "by rewriting of '{}', but ".format(t) + "this node is not removed by the rule " + "applied to '{}'".format(s) ) if lhs_t_node in t_rule.cloned_nodes(): for s_node in s_nodes: if s_node not in instances[s].values(): raise RewritingError( "Specified rule hierarchy is not applicable " "the typing of the node '{}' ".format(s_node) + "from the graph '{}' is cloned ".format(s) + "by rewriting of '{}', but the ".format(t) + "retyping of this node is not specified, i.e. " + "'{}' is not in the instances of ".format(s_node) + "the rule applied to '{}'".format(s) )
[docs] def relabel_nodes(self, graph, mapping): """Relabel nodes of a graph in the hierarchy.""" graph = self.get_graph(graph) graph.relabel_nodes(mapping)
[docs] def graphs_typed_by_node(self, graph_id, node_id): """Get graphs typed by 'node_id' in 'graph_id'.""" graphs = [] for p in self.predecessors(graph_id): p_typing = self.get_typing(p, graph_id) if node_id in p_typing.values(): graphs.append(p) return graphs