132 lines
3.9 KiB
Python
132 lines
3.9 KiB
Python
|
import linecache
|
||
|
import textwrap
|
||
|
from ast import _splitlines_no_ff as splitlines
|
||
|
from contextlib import contextmanager
|
||
|
from itertools import count
|
||
|
|
||
|
from .codetools import CodeFile, CodeFileOperation, LineDefinition, ModuleCode
|
||
|
from .register import registry
|
||
|
from .utils import EventSource
|
||
|
|
||
|
_count = count(1)
|
||
|
|
||
|
|
||
|
class OutOfSyncException(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
def virtual_file(name, contents):
|
||
|
filename = f"<{name}#{next(_count)}>"
|
||
|
linecache.cache[filename] = (None, None, splitlines(contents), filename)
|
||
|
return filename
|
||
|
|
||
|
|
||
|
class Recoder:
|
||
|
def __init__(self, name, codefile, deletable=False, focus=None):
|
||
|
self.name = name
|
||
|
self.codefile = codefile
|
||
|
self.deletable = deletable
|
||
|
self.focus = focus
|
||
|
self.watched = [] if focus is None else [focus]
|
||
|
self.status = "live"
|
||
|
self.on_status = EventSource()
|
||
|
self.codefile.activity.register(self._listen)
|
||
|
self._current_patch = None
|
||
|
self._listening = True
|
||
|
|
||
|
def set_status(self, status):
|
||
|
self.status = status
|
||
|
self.on_status.emit(self, self.status)
|
||
|
|
||
|
def _listen(self, event):
|
||
|
if self._listening:
|
||
|
if isinstance(event, CodeFileOperation):
|
||
|
if event.defn in self.watched:
|
||
|
self.set_status("out-of-sync")
|
||
|
|
||
|
@contextmanager
|
||
|
def _patching(self, new_code):
|
||
|
new_code = new_code.strip()
|
||
|
|
||
|
filename = virtual_file(self.name, new_code)
|
||
|
cf = CodeFile(
|
||
|
filename=filename,
|
||
|
source=new_code,
|
||
|
module_name=self.codefile.module_name,
|
||
|
)
|
||
|
registry.cache[filename] = self.codefile
|
||
|
|
||
|
yield cf
|
||
|
|
||
|
self._listening = False
|
||
|
(same, changes, additions, deletions) = self.codefile.merge(
|
||
|
cf, allow_deletions=self.deletable and self.focus and [self.focus]
|
||
|
)
|
||
|
self.watched = [*same, *changes, *additions]
|
||
|
self.set_status("live")
|
||
|
self._current_patch = new_code
|
||
|
self._listening = True
|
||
|
|
||
|
def patch(self, new_code):
|
||
|
def _encompasses(defn):
|
||
|
for x in self.focus.hierarchy():
|
||
|
if x.correspond(defn).corresponds:
|
||
|
return True
|
||
|
for x in defn.hierarchy():
|
||
|
if x.correspond(self.focus).corresponds:
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
if self.focus is None:
|
||
|
return self.patch_module(new_code)
|
||
|
|
||
|
for parent in list(self.focus.hierarchy())[1:]:
|
||
|
if not isinstance(parent, ModuleCode):
|
||
|
new_code = textwrap.indent(new_code, " ")
|
||
|
new_code = f"{parent.header()}{new_code}"
|
||
|
|
||
|
with self._patching(new_code) as cf:
|
||
|
(
|
||
|
same,
|
||
|
changes,
|
||
|
additions,
|
||
|
deletions,
|
||
|
) = self.codefile.root.correspond(cf.root).summary()
|
||
|
seq = [*changes, *additions]
|
||
|
seq = [d for d in seq if not isinstance(d, LineDefinition)]
|
||
|
if not all(_encompasses(culprit := d) for d in seq):
|
||
|
raise ValueError(
|
||
|
f"Recoder for {self.focus.name} cannot be used to define {culprit.name}" # noqa: F821
|
||
|
)
|
||
|
|
||
|
def patch_module(self, new_code):
|
||
|
with self._patching(new_code):
|
||
|
pass
|
||
|
|
||
|
def repatch(self):
|
||
|
if self.status == "out-of-sync" and self._current_patch is not None:
|
||
|
self.patch_module(self._current_patch)
|
||
|
|
||
|
def commit(self):
|
||
|
if self.status == "out-of-sync":
|
||
|
raise OutOfSyncException(
|
||
|
f"File {self.codefile.filename} is out of sync with the patch"
|
||
|
)
|
||
|
else:
|
||
|
self.codefile.commit()
|
||
|
self.set_status("saved")
|
||
|
|
||
|
def revert(self):
|
||
|
self.codefile.refresh()
|
||
|
|
||
|
|
||
|
def make_recoder(obj, deletable=False):
|
||
|
cf, defn = registry.find(obj)
|
||
|
return (
|
||
|
cf
|
||
|
and defn
|
||
|
and Recoder(
|
||
|
name=defn.dotpath(), codefile=cf, focus=defn, deletable=deletable
|
||
|
)
|
||
|
)
|