wd-rating/jurigged/recode.py

132 lines
3.9 KiB
Python

import linecache
import textwrap
from ast import _splitlines_no_ff as splitlines
from contextlib import contextmanager
from itertools import count
from .codetools import CodeFile, CodeFileOperation, LineDefinition, ModuleCode
from .register import registry
from .utils import EventSource
_count = count(1)
class OutOfSyncException(Exception):
pass
def virtual_file(name, contents):
filename = f"<{name}#{next(_count)}>"
linecache.cache[filename] = (None, None, splitlines(contents), filename)
return filename
class Recoder:
def __init__(self, name, codefile, deletable=False, focus=None):
self.name = name
self.codefile = codefile
self.deletable = deletable
self.focus = focus
self.watched = [] if focus is None else [focus]
self.status = "live"
self.on_status = EventSource()
self.codefile.activity.register(self._listen)
self._current_patch = None
self._listening = True
def set_status(self, status):
self.status = status
self.on_status.emit(self, self.status)
def _listen(self, event):
if self._listening:
if isinstance(event, CodeFileOperation):
if event.defn in self.watched:
self.set_status("out-of-sync")
@contextmanager
def _patching(self, new_code):
new_code = new_code.strip()
filename = virtual_file(self.name, new_code)
cf = CodeFile(
filename=filename,
source=new_code,
module_name=self.codefile.module_name,
)
registry.cache[filename] = self.codefile
yield cf
self._listening = False
(same, changes, additions, deletions) = self.codefile.merge(
cf, allow_deletions=self.deletable and self.focus and [self.focus]
)
self.watched = [*same, *changes, *additions]
self.set_status("live")
self._current_patch = new_code
self._listening = True
def patch(self, new_code):
def _encompasses(defn):
for x in self.focus.hierarchy():
if x.correspond(defn).corresponds:
return True
for x in defn.hierarchy():
if x.correspond(self.focus).corresponds:
return True
return False
if self.focus is None:
return self.patch_module(new_code)
for parent in list(self.focus.hierarchy())[1:]:
if not isinstance(parent, ModuleCode):
new_code = textwrap.indent(new_code, " ")
new_code = f"{parent.header()}{new_code}"
with self._patching(new_code) as cf:
(
same,
changes,
additions,
deletions,
) = self.codefile.root.correspond(cf.root).summary()
seq = [*changes, *additions]
seq = [d for d in seq if not isinstance(d, LineDefinition)]
if not all(_encompasses(culprit := d) for d in seq):
raise ValueError(
f"Recoder for {self.focus.name} cannot be used to define {culprit.name}" # noqa: F821
)
def patch_module(self, new_code):
with self._patching(new_code):
pass
def repatch(self):
if self.status == "out-of-sync" and self._current_patch is not None:
self.patch_module(self._current_patch)
def commit(self):
if self.status == "out-of-sync":
raise OutOfSyncException(
f"File {self.codefile.filename} is out of sync with the patch"
)
else:
self.codefile.commit()
self.set_status("saved")
def revert(self):
self.codefile.refresh()
def make_recoder(obj, deletable=False):
cf, defn = registry.find(obj)
return (
cf
and defn
and Recoder(
name=defn.dotpath(), codefile=cf, focus=defn, deletable=deletable
)
)