"""Audit trails for graphs and graph hierarchies.
This module containes a collection of utils for audit trails that
provide version control for transformations of graphs and graph hierarchies:
* `Versioning`, abstract class for in-memory versioning of objects;
* `VersionedGraph`, wrapper around graph objects in ReGraph that allows to track their audit trail;
* `VersionedHierarchy`, wrapper around hierarchy objects in ReGraph that allows to track their audit trail;
"""
from abc import ABC, abstractmethod
import copy
import datetime
import uuid
import warnings
import networkx as nx
from regraph.exceptions import RevisionError, RevisionWarning
from regraph.rules import (compose_rules, Rule,
_create_merging_rule,
_create_merging_rule_hierarchy,
compose_rule_hierarchies,
invert_rule_hierarchy)
from regraph.utils import keys_by_value
def _generate_new_commit_meta_data():
time = datetime.datetime.now()
commit_id = str(uuid.uuid4())
return time, commit_id
[docs]class Versioning(ABC):
"""Class for version control.
Attributes
----------
_current_branch
Name of the current branch
_deltas : dict
Dictionary with delta's to all other branches
_heads : dict
_revision_graph : networkx.DiGraph
Methods
-------
branches()
current_branch()
commit(graph, rule, instance)
branch(new_branch)
switch_branch(branch)
merge(branch1, branch2)
_compose_deltas
_invert_delta
_merge_into_current_branch
_create_identity_delta
_compose_delta_path
"""
def __init__(self, init_branch="master", current_branch=None,
deltas=None, heads=None, revision_graph=None):
"""Initialize revision object."""
if current_branch is None:
self._current_branch = init_branch
else:
self._current_branch = current_branch
if deltas is None:
self._deltas = {}
else:
self._deltas = deltas
if heads is None:
# Create initial commit
time, commit_id = _generate_new_commit_meta_data()
self._heads = {}
self._heads[init_branch] = commit_id
self._revision_graph = nx.DiGraph()
self._revision_graph.add_node(
commit_id,
branch=self._current_branch,
message="Initial commit",
time=time
)
else:
self._heads = heads
self._revision_graph = revision_graph
[docs] def initial_commit(self):
"""Return the id of the initial commit."""
for n in self._revision_graph.nodes():
if len(list(self._revision_graph.predecessors(n))) == 0:
commit = n
break
return commit
@abstractmethod
def _compose_deltas(self, delta1, delta2):
"""Abstract method for composing deltas."""
pass
@staticmethod
@abstractmethod
def _invert_delta(self, delta1):
"""Abstract method for inverting deltas."""
pass
@staticmethod
@abstractmethod
def _merge_into_current_branch(self, delta):
"""Abstract method for merging a branch into the current one."""
pass
@abstractmethod
def _create_identity_delta(self):
"""Abstract method for creating an identity-delta."""
pass
def _compose_delta_path(self, path):
if len(path) > 1:
result_delta = self._revision_graph.adj[
path[0]][path[1]]["delta"]
previous_commit = path[1]
for current_commit in path[2:]:
result_delta = self._compose_deltas(
result_delta,
self._revision_graph.adj[
previous_commit][current_commit]["delta"])
d = self._revision_graph.adj[previous_commit][current_commit]["delta"]
previous_commit = current_commit
return result_delta
else:
return self._create_identity_delta()
[docs] def branches(self):
"""Return list of branches."""
return list(self._heads.keys())
[docs] def current_branch(self):
"""Return the name of the current branch."""
return self._current_branch
[docs] def print_history(self):
"""Print the history of commits."""
for n in self._revision_graph.nodes():
print(
str(self._revision_graph.node[n]["time"]),
n, self._revision_graph.node[n]["branch"],
self._revision_graph.node[n]["message"])
[docs] def commit(self, delta, message=None, previous_commit=None):
"""Add a commit."""
time, commit_id = _generate_new_commit_meta_data()
if previous_commit is None:
previous_commit = self._heads[self._current_branch]
# Update heads and revision graph
self._heads[self._current_branch] = commit_id
self._revision_graph.add_node(
commit_id,
branch=self._current_branch,
time=time,
message=message if message is not None else "")
self._revision_graph.add_edge(
previous_commit, commit_id, delta=delta)
d = self._revision_graph.adj[previous_commit][commit_id]["delta"]
# Update deltas
for branch, branch_delta in self._deltas.items():
self._deltas[branch] = self._compose_deltas(
self._invert_delta(delta), branch_delta)
self._refine_delta(self._deltas[branch])
return commit_id
[docs] def switch_branch(self, branch):
"""Switch branches."""
if branch not in self.branches():
raise RevisionError(
"Branch '{}' does not exist".format(branch))
if branch == self._current_branch:
warnings.warn("Already in branch '{}'".format(branch), RevisionWarning)
# Set as the current branch
previous_branch = self._current_branch
self._current_branch = branch
# Apply delta to the versioned object
delta = self._deltas[branch]
self._apply_delta(delta)
self._deltas[previous_branch] = self._invert_delta(delta)
# Recompute deltas
for name, another_delta in self._deltas.items():
if name != previous_branch:
self._deltas[name] = self._compose_deltas(
self._deltas[previous_branch],
another_delta
)
del self._deltas[self._current_branch]
[docs] def branch(self, new_branch, message=None):
"""Create a new branch with identity commit."""
if new_branch in self.branches():
raise RevisionError(
"Branch '{}' already exists".format(new_branch))
if message is None:
message = "Created branch '{}'".format(new_branch)
# Set this as a current branch
previous_branch = self._current_branch
previous_commit = self._heads[self._current_branch]
self._current_branch = new_branch
identity_delta = self._create_identity_delta()
# Add a new delta
self._deltas[previous_branch] = identity_delta
# Create a new identity commit
commit_id = self.commit(
identity_delta,
message=message,
previous_commit=previous_commit)
self._heads[self._current_branch] = commit_id
return commit_id
[docs] def merge_with(self, branch, message=None):
"""Merge the current branch with the specified one."""
if branch not in self.branches():
raise RevisionError(
"Branch '{}' does not exist".format(branch))
if message is None:
message = "Merged branch '{}' into '{}'".format(
branch, self._current_branch)
delta = self._deltas[branch]
delta_to_current, delta_to_branch = self._merge_into_current_branch(
delta)
commit_id = self.commit(delta_to_current, message=message)
self._revision_graph.add_edge(
self._heads[branch], commit_id,
delta=delta_to_branch)
del self._heads[branch]
del self._deltas[branch]
return commit_id
[docs] def rollback(self, rollback_commit, message=None):
"""Rollback the current branch to a specific commit."""
if rollback_commit not in self._revision_graph.nodes():
raise RevisionError(
"Commit '{}' does not exist in the revision graph".format(
rollback_commit))
# Find paths from the last commit of the current branch
# to the commit with id 'rollback_commit'
try:
shortest_path = list(nx.shortest_path(
self._revision_graph, rollback_commit, self._heads[self._current_branch]))
except nx.NetworkXNoPath:
raise RevisionError(
"Branch '{}' does not contain a path to the commit '{}'".format(
self._current_branch, rollback_commit))
if message is None:
message = "Rollback to commit '{}'".format(rollback_commit)
# Generate a big rollback commit
rollback_delta = self._invert_delta(
self._compose_delta_path(shortest_path))
# Apply the rollback commit
self._apply_delta(rollback_delta)
# Compute all paths from every head to the commit
head_paths = {}
for h in self._heads.values():
head_paths[h] = list(nx.all_simple_paths(
self._revision_graph, rollback_commit, h))
# Compute new head commits (commits whose successors
# are merge commits to be removed)
new_heads = {}
removed_commits = set(
[n for pp in head_paths.values() for p in pp for n in p if n != rollback_commit])
for n in self._revision_graph.nodes():
for s in self._revision_graph.successors(n):
if n not in removed_commits and s in removed_commits:
new_heads[self._revision_graph.node[n]["branch"]] = (n, s)
# Recompute deltas
new_current_branch = self._revision_graph.node[rollback_commit]["branch"]
self._current_branch = new_current_branch
self._heads[self._current_branch] = rollback_commit
# Find a branching point from the rollback commit
rollback_bfs_from_commit = nx.bfs_tree(
self._revision_graph, rollback_commit, reverse=True)
rollback_branching_point = None
for n in rollback_bfs_from_commit.nodes():
if self._revision_graph.node[n]["branch"] !=\
self._current_branch:
rollback_branching_point = n
break
# Update deltas of the preserved heads
for head, commit in self._heads.items():
if head != self._current_branch:
# Find a branching point from the head
head_bfs_from_commit = nx.bfs_tree(
self._revision_graph, commit, reverse=True)
head_branching_point = None
for n in head_bfs_from_commit.nodes():
if self._revision_graph.node[n]["branch"] != head:
head_branching_point = n
break
if rollback_branching_point:
# Rollback in a branched part of the revision graph
try:
# Rollback happened before head
branching_to_head = nx.shortest_path(
self._revision_graph, rollback_branching_point, commit)
branching_to_rollback = nx.shortest_path(
self._revision_graph, rollback_branching_point, rollback_commit)
self._deltas[head] = self._compose_deltas(
self._invert_delta(self._compose_delta_path(branching_to_rollback)),
self._compose_delta_path(branching_to_head)
)
except nx.NetworkXNoPath:
if head_branching_point:
try:
# Rollback happened after head
branching_to_rollback = nx.shortest_path(
self._revision_graph,
head_branching_point, rollback_commit)
branching_to_head = nx.shortest_path(
self._revision_graph,
head_branching_point, commit)
self._deltas[head] = self._compose_deltas(
self._invert_delta(self._compose_delta_path(
branching_to_rollback)),
self._compose_delta_path(branching_to_head)
)
except:
# Rollback and head are disjoint,
# so no delta to compute (no undirected path)
pass
else:
# Rollback in an unbranched part of the revision graph
# So head can be only in a branched part and only before
# the rollback commit (otherwise removed)
if head_branching_point:
branching_to_head = nx.shortest_path(
self._revision_graph, head_branching_point, commit)
branching_to_rollback = nx.shortest_path(
self._revision_graph,
head_branching_point, rollback_commit)
delta_branching_to_rollback = self._compose_delta_path(
branching_to_rollback)
delta_branching_to_head = self._compose_delta_path(
branching_to_head)
self._deltas[head] = self._compose_deltas(
self._invert_delta(delta_branching_to_rollback),
delta_branching_to_head
)
if head in self._deltas:
self._refine_delta(self._deltas[head])
# Compute deltas of the new heads
for branch, (head_commit, merge_commit)in new_heads.items():
path_to_merge = nx.shortest_path(
self._revision_graph, rollback_commit, merge_commit)
delta_to_merge = self._compose_delta_path(path_to_merge)
head_to_merge = self._revision_graph.adj[
head_commit][merge_commit]["delta"]
self._deltas[branch] = self._compose_deltas(
delta_to_merge,
self._invert_delta(head_to_merge))
self._refine_delta(self._deltas[branch])
self._heads[branch] = head_commit
print("Created the new head for '{}'".format(branch))
# All paths to the heads originating from the commit to
# which we rollaback are removed
for c in removed_commits:
if c != rollback_commit:
self._revision_graph.remove_node(c)
if c in self._heads.values():
for h in keys_by_value(self._heads, c):
print("Removed a head for '{}'".format(h))
del self._heads[h]
def _revision_graph_to_json(self):
data = {
"nodes": [],
"edges": []
}
for n in self._revision_graph.nodes():
data["nodes"].append({
"id": n,
"branch": self._revision_graph.node[n]["branch"],
"time": self._revision_graph.node[n]["time"],
"message": self._revision_graph.node[n]["message"]
})
for (s, t) in self._revision_graph.edges():
data["edges"].append({
"from": s,
"to": t,
"delta": self._delta_to_json(
self._revision_graph.adj[s][t]["delta"])
})
return data
@classmethod
def _revision_graph_from_json(cls, json_data):
revision_graph = nx.DiGraph()
for node_json in json_data["nodes"]:
revision_graph.add_node(
node_json["id"],
branch=node_json["branch"],
time=node_json["time"],
message=node_json["message"])
for edge_json in json_data["edges"]:
revision_graph.add_edge(
edge_json["from"],
edge_json["to"],
delta=cls._delta_from_json(edge_json["delta"]))
return revision_graph
@staticmethod
@abstractmethod
def _delta_to_json(delta):
pass
@staticmethod
@abstractmethod
def _delta_from_json(json_data):
pass
[docs] def to_json(self):
"""Convert versioning object to JSON."""
data = {}
data["current_branch"] = self._current_branch
data["deltas"] = {}
for k, v in self._deltas.items():
data["deltas"][k] = self._delta_to_json(v)
data["heads"] = {}
data["heads"] = self._heads
data["revision_graph"] = self._revision_graph_to_json()
return data
[docs] def from_json(self, json_data):
"""Retrieve versioning object from JSON."""
self._current_branch = json_data["current_branch"]
self._deltas = {
k: self._delta_from_json(v)
for k, v in json_data["deltas"].items()}
self._heads = json_data["heads"]
self._revision_graph = self._revision_graph_from_json(
json_data["revision_graph"])
[docs]class VersionedGraph(Versioning):
"""Class for versioned hierarchies."""
def __init__(self, graph, init_branch="master", current_branch=None,
deltas=None, heads=None, revision_graph=None):
"""Initialize versioned graph object."""
self.graph = graph
super().__init__(init_branch=init_branch,
current_branch=current_branch,
deltas=deltas, heads=heads,
revision_graph=revision_graph)
def _refine_delta(self, delta):
lhs = delta["rule"].refine(self.graph, delta["lhs_instance"])
delta["lhs_instance"] = lhs
new_rhs = dict()
for n in delta["rule"].rhs.nodes():
if n not in delta["rhs_instance"].keys():
new_rhs[n] = lhs[delta["rule"].p_lhs[
keys_by_value(delta["rule"].p_rhs, n)[0]]]
else:
new_rhs[n] = delta["rhs_instance"][n]
delta["rhs_instance"] = new_rhs
def _compose_deltas(self, delta1, delta2):
"""Computing composition of two deltas."""
rule, lhs, rhs = compose_rules(
delta1["rule"],
delta1["lhs_instance"],
delta1["rhs_instance"],
delta2["rule"],
delta2["lhs_instance"],
delta2["rhs_instance"])
return {
"rule": rule,
"lhs_instance": lhs,
"rhs_instance": rhs
}
@staticmethod
def _invert_delta(delta):
"""Reverse the direction of delta."""
return {
"rule": delta["rule"].get_inverted_rule(),
"lhs_instance": copy.deepcopy(delta["rhs_instance"]),
"rhs_instance": copy.deepcopy(delta["lhs_instance"])
}
@staticmethod
def _create_identity_delta():
"""Create an identity-delta."""
rule = Rule.identity_rule()
identity_delta = {
"rule": rule,
"lhs_instance": {},
"rhs_instance": {}
}
return identity_delta
def _apply_delta(self, delta, relabel=True):
"""Apply delta to the current graph version."""
rhs_instance = self.graph.rewrite(
delta["rule"], delta["lhs_instance"])
if relabel:
# Relabel nodes to correspond to the stored rhs
new_labels = {
v: delta["rhs_instance"][k]
for k, v in rhs_instance.items()
}
for n in self.graph.nodes():
if n not in new_labels.keys():
new_labels[n] = n
self.graph.relabel_nodes(new_labels)
rhs_instance = {
k: new_labels[v]
for k, v in rhs_instance.items()
}
return rhs_instance
def _merge_into_current_branch(self, delta):
"""Merge branch with delta into the current branch."""
current_to_merged_rule, other_to_merged_rule =\
_create_merging_rule(
delta["rule"], delta["lhs_instance"], delta["rhs_instance"])
rhs_instance = self.graph.rewrite(
current_to_merged_rule, delta["lhs_instance"])
current_to_merged_delta = {
"rule": current_to_merged_rule,
"lhs_instance": delta["lhs_instance"],
"rhs_instance": rhs_instance
}
other_to_merged_delta = {
"rule": other_to_merged_rule,
"lhs_instance": delta["rhs_instance"],
"rhs_instance": rhs_instance
}
return current_to_merged_delta, other_to_merged_delta
[docs] def rewrite(self, rule, instance=None, message=None):
"""Rewrite the versioned graph and commit."""
# Refine a rule to be side-effect free
refined_instance = rule.refine(self.graph, instance)
rhs_instance = self.graph.rewrite(
rule, refined_instance)
commit_id = self.commit({
"rule": rule,
"lhs_instance": refined_instance,
"rhs_instance": rhs_instance
}, message=message)
return rhs_instance, commit_id
@staticmethod
def _delta_to_json(delta):
data = {}
data["rule"] = delta["rule"].to_json()
data["lhs_instance"] = delta["lhs_instance"]
data["rhs_instance"] = delta["rhs_instance"]
return data
@staticmethod
def _delta_from_json(json_data):
delta = {}
delta["rule"] = Rule.from_json(json_data["rule"])
delta["lhs_instance"] = json_data["lhs_instance"]
delta["rhs_instance"] = json_data["rhs_instance"]
return delta
[docs] @classmethod
def from_json(cls, graph, json_data):
"""Retrieve versioning object from JSON."""
obj = cls(graph)
super(VersionedGraph, cls).from_json(obj, json_data)
return obj
[docs]class VersionedHierarchy(Versioning):
"""Class for versioned hierarchies."""
def __init__(self, hierarchy, init_branch="master", current_branch=None,
deltas=None, heads=None, revision_graph=None):
"""Initialize versioned hierarchy object."""
self.hierarchy = hierarchy
super().__init__(init_branch=init_branch, current_branch=current_branch,
deltas=deltas, heads=heads,
revision_graph=revision_graph)
def _refine_delta(self, delta):
lhs_instances = self.hierarchy.refine_rule_hierarchy(
delta["rule_hierarchy"],
delta["lhs_instances"])
delta["lhs_instances"] = lhs_instances
for graph in delta["rule_hierarchy"]["rules"]:
if graph not in delta["rhs_instances"]:
delta["rhs_instances"][graph] = delta[
"lhs_instances"][graph]
for graph, rule in delta["rule_hierarchy"]["rules"].items():
rule = delta["rule_hierarchy"]["rules"][graph]
rhs_instance = delta["rhs_instances"][graph]
for n in rule.rhs.nodes():
if n not in rhs_instance.keys():
rhs_instance[n] = delta["lhs_instances"][graph][
rule.p_lhs[keys_by_value(rule.p_rhs, n)[0]]]
delta["rhs_instances"][graph] = rhs_instance
def _compose_deltas(self, delta1, delta2):
"""Computing composition of two deltas."""
rule, lhs, rhs = compose_rule_hierarchies(
delta1["rule_hierarchy"],
delta1["lhs_instances"],
delta1["rhs_instances"],
delta2["rule_hierarchy"],
delta2["lhs_instances"],
delta2["rhs_instances"])
return {
"rule_hierarchy": rule,
"lhs_instances": lhs,
"rhs_instances": rhs
}
@staticmethod
def _invert_delta(delta):
"""Reverse the direction of delta."""
return {
"rule_hierarchy": invert_rule_hierarchy(
delta["rule_hierarchy"]),
"lhs_instances": delta["rhs_instances"],
"rhs_instances": delta["lhs_instances"]
}
@staticmethod
def _create_identity_delta():
"""Create an identity-delta."""
identity_delta = {
"rule_hierarchy": {
"rules": {},
"rule_homomorphisms": {}
},
"lhs_instances": {},
"rhs_instances": {}
}
return identity_delta
def _apply_delta(self, delta, relabel=True):
"""Apply delta to the current hierarchy version."""
rhs_instances = self.hierarchy.apply_rule_hierarchy(
delta["rule_hierarchy"], delta["lhs_instances"])
if relabel:
# Relabel nodes to correspond to the stored rhs
for graph, rhs_instance in delta["rhs_instances"].items():
old_rhs = rhs_instance
new_rhs = rhs_instances[graph]
new_labels = {
v: old_rhs[k]
for k, v in new_rhs.items()
if v != old_rhs[k]
}
if len(new_labels) > 0:
for n in self.hierarchy.get_graph(graph).nodes():
if n not in new_labels.keys():
new_labels[n] = n
self.hierarchy.relabel_nodes(graph, new_labels)
rhs_instances[graph] = old_rhs
return rhs_instances
def _merge_into_current_branch(self, delta):
"""Merge branch with delta into the current branch."""
current_to_merged, other_to_merged =\
_create_merging_rule_hierarchy(
delta["rule_hierarchy"],
delta["lhs_instances"],
delta["rhs_instances"])
rhs_instances = self.hierarchy.apply_rule_hierarchy(
current_to_merged,
delta["lhs_instances"])
current_to_merged_delta = {
"rule_hierarchy": current_to_merged,
"lhs_instances": delta["lhs_instances"],
"rhs_instances": rhs_instances
}
other_to_merged_delta = {
"rule_hierarchy": other_to_merged,
"lhs_instances": delta["rhs_instances"],
"rhs_instances": rhs_instances
}
return current_to_merged_delta, other_to_merged_delta
[docs] def rewrite(self, graph_id, rule, instance=None,
p_typing=None, rhs_typing=None,
strict=False, message=""):
"""Rewrite the versioned hierarchy and commit."""
rule_hierarchy, lhs_instances = self.hierarchy.get_rule_hierarchy(
graph_id, rule, instance, p_typing, rhs_typing)
lhs_instances = self.hierarchy.refine_rule_hierarchy(
rule_hierarchy, lhs_instances)
rhs_instances = self.hierarchy.apply_rule_hierarchy(
rule_hierarchy, lhs_instances)
commit_id = self.commit({
"rule_hierarchy": rule_hierarchy,
"lhs_instances": lhs_instances,
"rhs_instances": rhs_instances
}, message=message)
return rhs_instances, commit_id
@staticmethod
def _delta_to_json(delta):
data = {}
data["rule_hierarchy"] = {
"rules": {},
"rule_homomorphisms": delta["rule_hierarchy"]["rule_homomorphisms"]
}
for graph, rule in delta["rule_hierarchy"]["rules"].items():
data["rule_hierarchy"]["rules"][graph] = rule.to_json()
data["lhs_instances"] = delta["lhs_instances"]
data["rhs_instances"] = delta["rhs_instances"]
return data
@staticmethod
def _delta_from_json(json_data):
delta = {}
delta["rule_hierarchy"] = {
"rules": {},
"rule_homomorphisms": json_data["rule_hierarchy"]["rule_homomorphisms"]
}
for graph, rule in json_data["rule_hierarchy"]["rules"].items():
delta["rule_hierarchy"]["rules"][graph] = Rule.from_json(rule)
delta["lhs_instances"] = json_data["lhs_instances"]
delta["rhs_instances"] = json_data["rhs_instances"]
return delta
[docs] @classmethod
def from_json(cls, hierarchy, json_data):
"""Retrieve versioning object from JSON."""
obj = cls(hierarchy)
super(VersionedHierarchy, cls).from_json(obj, json_data)
return obj