添加jurigged

This commit is contained in:
wcq 2023-03-13 16:27:35 +08:00
parent 76ad531a57
commit 001426841d
15 changed files with 3331 additions and 0 deletions

8
jurigged/__init__.py Normal file
View File

@ -0,0 +1,8 @@
from codefind import ConformException, code_registry as db
from .codetools import CodeFile
from .live import Watcher, watch
from .recode import Recoder, make_recoder, virtual_file
from .register import registry
from .utils import glob_filter
from .version import version as __version__

4
jurigged/__main__.py Normal file
View File

@ -0,0 +1,4 @@
from .live import cli
if __name__ == "__main__":
cli()

1150
jurigged/codetools.py Normal file

File diff suppressed because it is too large Load Diff

373
jurigged/live.py Normal file
View File

@ -0,0 +1,373 @@
import argparse
import code
import importlib
import logging
import os
import sys
import threading
import traceback
from dataclasses import dataclass
from types import ModuleType
import blessed
from ovld import ovld
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserverVFS
from . import codetools, runpy
from .register import registry
from .utils import EventSource, glob_filter
from .version import version
log = logging.getLogger(__name__)
T = blessed.Terminal()
DEFAULT_DEBOUNCE = 0.05
@dataclass
class WatchOperation:
filename: str
def __str__(self):
return f"Watch {self.filename}"
@ovld
def default_logger(event: codetools.UpdateOperation):
if isinstance(event.defn, codetools.FunctionDefinition):
print(T.bold_yellow(str(event)))
@ovld
def default_logger(event: codetools.AddOperation):
print(T.bold_green(str(event)))
@ovld
def default_logger(event: codetools.DeleteOperation):
print(T.bold_red(str(event)))
@ovld
def default_logger(event: WatchOperation):
pass
# print(T.bold(str(event)))
@ovld
def default_logger(exc: Exception):
# lines = traceback.format_exception(type(exc), exc, exc.__traceback__)
traceback.print_exception(*sys.exc_info())
print("出现错误")
# print(T.bold_red("".join(lines)))
# 修改了
@ovld
def default_logger(exc: SyntaxError):
lines = traceback.format_exception(
type(exc), exc, exc.__traceback__, limit=0
)
print(T.bold_red("".join(lines)))
@ovld
def default_logger(event: object):
print(event)
def conservative_logger(event):
if isinstance(event, Exception):
default_logger(event)
class Watcher:
def __init__(self, registry, debounce=DEFAULT_DEBOUNCE, poll=False):
if poll:
self.observer = PollingObserverVFS(
stat=os.stat, listdir=os.scandir, polling_interval=poll
)
else:
self.observer = Observer()
self.registry = registry
self.registry.precache_activity.register(self.on_prepare)
self.debounce = debounce
self.poll = poll
self.prerun = EventSource()
self.postrun = EventSource()
def on_prepare(self, module_name, filename):
JuriggedHandler(self, filename).schedule(self.observer)
self.registry.log(WatchOperation(filename))
def refresh(self, path):
cf = self.registry.get(path)
try:
self.prerun.emit(path, cf)
cf.refresh()
self.postrun.emit(path, cf)
except Exception as exc:
self.registry.log(exc)
def start(self):
self.observer.start()
def stop(self):
self.observer.stop()
def join(self):
self.observer.join()
class JuriggedHandler(FileSystemEventHandler):
def __init__(self, watcher, filename):
self.watcher = watcher
self.filename = filename
self.normalized_filename = os.path.normpath(filename)
self.mtime = 0
self.timer = None
def _refresh(self):
self.watcher.refresh(self.filename)
self.timer = None
def on_modified(self, event):
if event.src_path == self.normalized_filename:
mtime = os.path.getmtime(event.src_path)
# The modified event sometimes fires twice for no reason
# even though the mtime is the same
if mtime != self.mtime:
self.mtime = mtime
if self.watcher.debounce:
if self.timer is not None:
self.timer.cancel()
self.timer = threading.Timer(
self.watcher.debounce, self._refresh
)
self.timer.start()
else:
self._refresh()
on_created = on_modified
def schedule(self, observer):
# Watch the directory, because when watching a file, the watcher stops when
# it is deleted and will not pick back up if the file is recreated. This happens
# when some editors save.
observer.schedule(self, os.path.dirname(self.filename))
def watch(
pattern="./*.py",
logger=default_logger,
registry=registry,
autostart=True,
debounce=DEFAULT_DEBOUNCE,
poll=False,
):
registry.auto_register(
filter=glob_filter(pattern) if isinstance(pattern, str) else pattern
)
registry.set_logger(logger)
watcher = Watcher(
registry,
debounce=debounce,
poll=poll,
)
if autostart:
watcher.start()
return watcher
def _loop_module(): # pragma: no cover
try:
from . import loop
return loop
except ModuleNotFoundError as exc:
print("ModuleNotFoundError:", exc, file=sys.stderr)
sys.exit("To use --loop or --xloop, install jurigged[develoop]")
def find_runner(opts, pattern, prepare=None): # pragma: no cover
if opts.module:
module_spec, *rest = opts.module
assert opts.script is None
sys.argv[1:] = rest
if ":" in module_spec:
module_name, func = module_spec.split(":", 1)
mod = importlib.import_module(module_name)
return mod, getattr(mod, func)
else:
_, spec, code = runpy._get_module_details(module_spec)
if pattern(spec.origin):
registry.prepare("__main__", spec.origin)
mod = ModuleType("__main__")
def run():
runpy.run_module(
module_spec, module_object=mod, prepare=prepare
)
return mod, run
elif opts.script:
path = os.path.abspath(opts.script)
if pattern(path):
# It won't auto-trigger through runpy, probably some idiosyncracy of
# module resolution
registry.prepare("__main__", path)
sys.argv[1:] = opts.rest
mod = ModuleType("__main__")
def run():
runpy.run_path(path, module_object=mod, prepare=prepare)
return mod, run
else:
mod = ModuleType("__main__")
return mod, None
def cli(): # pragma: no cover
sys.path.insert(0, os.path.abspath(os.curdir))
parser = argparse.ArgumentParser(
description="Run a Python script so that it is live-editable."
)
parser.add_argument(
"script", metavar="SCRIPT", help="Path to the script to run", nargs="?"
)
parser.add_argument(
"--interactive",
"-i",
action="store_true",
help="Run an interactive session after the program ends",
)
parser.add_argument(
"--watch",
"-w",
metavar="PATH",
help="Wildcard path/directory for which files to watch",
)
parser.add_argument(
"--debounce",
"-d",
type=float,
help="Interval to wait for to refresh a modified file, in seconds",
)
parser.add_argument(
"--poll",
type=float,
help="Poll for changes using the given interval",
)
parser.add_argument(
"-m",
dest="module",
metavar="MODULE",
nargs=argparse.REMAINDER,
help="Module or module:function to run",
)
parser.add_argument(
"--loop",
"-l",
action="append",
type=str,
help="Name of the function(s) to loop on",
)
parser.add_argument(
"--loop-interface",
type=str,
choices=("rich", "basic"),
default="rich",
help="Interface to use for --loop",
)
parser.add_argument(
"--xloop",
"-x",
action="append",
type=str,
help="Name of the function(s) to loop on if they raise an error",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Show watched files and changes as they happen",
)
parser.add_argument(
"--version",
action="store_true",
help="Print version",
)
parser.add_argument(
"rest", metavar="...", nargs=argparse.REMAINDER, help="Script arguments"
)
opts = parser.parse_args()
pattern = glob_filter(opts.watch or ".")
watch_args = {
"pattern": pattern,
"logger": default_logger if opts.verbose else conservative_logger,
"debounce": opts.debounce or DEFAULT_DEBOUNCE,
"poll": opts.poll,
}
banner = ""
if opts.version:
print(version)
sys.exit()
prepare = None
if opts.loop or opts.xloop:
import codefind
loopmod = _loop_module()
def prepare(glb):
from .rescript import redirect_code
filename = glb["__file__"]
def _getcode(ref):
if ref.startswith("/"):
_, module, *hierarchy = ref.split("/")
return codefind.find_code(*hierarchy, module=module)
elif ":" in ref:
module, hierarchy_s = ref.split(":")
hierarchy = hierarchy_s.split(".")
return codefind.find_code(*hierarchy, module=module)
else:
hierarchy = ref.split(".")
return codefind.find_code(*hierarchy, filename=filename)
for ref in opts.loop or []:
redirect_code(
_getcode(ref), loopmod.loop(interface=opts.loop_interface)
)
for ref in opts.xloop or []:
redirect_code(
_getcode(ref), loopmod.xloop(interface=opts.loop_interface)
)
mod, run = find_runner(opts, pattern, prepare=prepare)
watch(**watch_args)
if run is None:
banner = None
opts.interactive = True
else:
banner = ""
run()
if opts.interactive:
code.interact(banner=banner, local=vars(mod), exitmsg="")

64
jurigged/loop/__init__.py Normal file
View File

@ -0,0 +1,64 @@
import builtins
import functools
from types import SimpleNamespace
from giving import give, given
from .basic import BasicDeveloopRunner
from .develoop import Develoop, DeveloopRunner, RedirectDeveloopRunner
def keyword_decorator(deco):
"""Wrap a decorator to optionally takes keyword arguments."""
@functools.wraps(deco)
def new_deco(fn=None, **kwargs):
if fn is None:
@functools.wraps(deco)
def newer_deco(fn):
return deco(fn, **kwargs)
return newer_deco
else:
return deco(fn, **kwargs)
return new_deco
@keyword_decorator
def loop(fn, interface=None, only_on_error=False):
if interface is None:
try:
import rich
interface = "rich"
except ModuleNotFoundError:
interface = "basic"
if interface == "rich":
from .richloop import RichDeveloopRunner
interface = RichDeveloopRunner
elif interface == "basic":
interface = BasicDeveloopRunner
elif isinstance(interface, str):
raise Exception(f"Unknown develoop interface: '{interface}'")
return Develoop(fn, on_error=only_on_error, runner_class=interface)
loop_on_error = functools.partial(loop, only_on_error=True)
xloop = loop_on_error
__ = SimpleNamespace(
loop=loop,
loop_on_error=loop_on_error,
xloop=xloop,
give=give,
given=given,
)
def inject():
builtins.__ = __

142
jurigged/loop/basic.py Normal file
View File

@ -0,0 +1,142 @@
import re
import select
import sys
import termios
import traceback
import tty
from contextlib import contextmanager
from functools import partial
from .develoop import Abort, DeveloopRunner
ANSI_ESCAPE = re.compile(r"\x1b\[[;\d]*[A-Za-z]")
ANSI_ESCAPE_INNER = re.compile(r"[\x1b\[;\d]")
ANSI_ESCAPE_END = re.compile(r"[A-Za-z~]")
@contextmanager
def cbreak():
old_attrs = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin)
try:
yield
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attrs)
def read_chars():
esc = None
try:
while True:
ready, _, _ = select.select([sys.stdin], [], [], 0.02)
if ready:
# Sometimes, e.g. when pressing an up arrow, multiple
# characters are buffered, and read1() is the only way
# I found to read precisely what was buffered. select
# seems unreliable in these cases, probably because the
# buffer fools it into thinking there is nothing else
# to read. So read(1) would leave some characters dangling
# in the buffer until the next keypress.
for ch in sys.stdin.buffer.read1():
ch = chr(ch)
if esc is not None:
if ANSI_ESCAPE_INNER.match(ch):
esc += ch
elif ANSI_ESCAPE_END.match(ch):
yield {"char": esc + ch, "escape": True}
esc = None
else:
yield {"char": esc, "escape": True}
esc = None
yield {"char": ch}
elif ch == "\x1b":
esc = ""
else:
yield {"char": ch}
except Abort:
pass
class BasicDeveloopRunner(DeveloopRunner):
def __init__(self, fn, args, kwargs):
super().__init__(fn, args, kwargs)
self._status = "running"
self._walltime = 0
def _pad(self, text, total):
text = f"#{self.num}: {text}"
rest = total - len(text) - 6
return f"---- {text} " + "-" * rest
def _finish(self, status, result):
print(self._pad(status, 50))
if status == "ERROR":
traceback.print_exception(
type(result), result, result.__traceback__
)
else:
print(f"{result}")
footer = [
"(c)ontinue",
"(r)erun",
"(q)uit",
]
print(self._pad(" | ".join(footer), 50))
with cbreak():
for c in read_chars():
if c["char"] == "c":
self.command("cont")()
break
elif c["char"] == "r":
self.command("go")()
break
elif c["char"] == "q":
self.command("quit")()
break
def register_updates(self, gv):
print(self._pad(self.signature(), 50))
gv["?#result"] >> partial(self._finish, "RESULT")
gv["?#error"] >> partial(self._finish, "ERROR")
gv.filter(
lambda d: not any(
k.startswith("#") and not k.startswith("$") for k in d.keys()
)
).display()
def _on(key):
# black and vscode's syntax highlighter both choke on parsing the following
# as a decorator, that's why I made a function
return gv.getitem(key, strict=False).subscribe
@_on("#status")
def _(status):
self._status = status
@_on("#walltime")
def _(walltime):
self._walltime = walltime
def readable_duration(t):
if t < 0.001:
return "<1ms"
elif t < 1:
t = int(t * 1000)
return f"{t}ms"
elif t < 10:
return f"{t:.3f}s"
elif t < 60:
return f"{t:.1f}s"
else:
s = t % 60
m = (t // 60) % 60
if t < 3600:
return f"{m:.0f}m{s:.0f}s"
else:
h = t // 3600
return f"{h:.0f}h{m:.0f}m{s:.0f}s"

235
jurigged/loop/develoop.py Normal file
View File

@ -0,0 +1,235 @@
import ctypes
import linecache
import sys
import threading
import time
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from queue import Queue
from types import FunctionType
from typing import Union
from executing import Source
from giving import SourceProxy, give, given
from ovld import ovld
from ..register import registry
NoneType = type(None)
@ovld
def pstr(x: Union[int, float, bool, NoneType]):
return str(x)
@ovld
def pstr(x: str):
if len(x) > 15:
return repr(x[:12] + "...")
else:
return repr(x)
@ovld
def pstr(x: FunctionType):
name = x.__qualname__
return f"<function {name}>"
@ovld
def pstr(x: object):
name = type(x).__qualname__
return f"<{name}>"
@registry.activity.append
def _(evt):
# Patch to ensure the executing module's cache is invalidated whenever
# a source file is changed.
cache = Source._class_local("__source_cache", {})
filename = evt.codefile.filename
if filename in cache:
del cache[filename]
linecache.checkcache(filename)
@give.variant
def givex(data):
return {f"#{k}": v for k, v in data.items()}
def itemsetter(coll, key):
def setter(value):
coll[key] = value
return setter
def itemappender(coll, key):
def appender(value):
coll[key] += value
return appender
class FileGiver:
def __init__(self, name):
self.name = name
def write(self, x):
give(**{self.name: x})
def flush(self):
pass
class Abort(Exception):
pass
def kill_thread(thread, exctype=Abort):
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(exctype)
)
@contextmanager
def watching_changes():
src = SourceProxy()
registry.activity.append(src._push)
try:
yield src
finally:
registry.activity.remove(src._push)
class DeveloopRunner:
def __init__(self, fn, args, kwargs):
self.fn = fn
self.args = args
self.kwargs = kwargs
self.num = 0
self._q = Queue()
def setcommand(self, cmd):
while not self._q.empty():
self._q.get()
self._q.put(cmd)
def command(self, name, aborts=False):
def perform(_=None):
if aborts:
# Asynchronously sends the Abort exception to the
# thread in which the function runs.
kill_thread(self._loop_thread)
self.setcommand(name)
return perform
def signature(self):
name = getattr(self.fn, "__qualname__", str(self.fn))
parts = [pstr(arg) for arg in self.args]
parts += [f"{k}={pstr(v)}" for k, v in self.kwargs.items()]
args = ", ".join(parts)
return f"{name}({args})"
@contextmanager
def wrap_loop(self):
yield
@contextmanager
def wrap_run(self):
yield
def register_updates(self, gv):
raise NotImplementedError()
def run(self):
self.num += 1
outcome = [None, None] # [result, error]
with given() as gv, self.wrap_run():
t0 = time.time()
gv["?#result"] >> itemsetter(outcome, 0)
gv["?#error"] >> itemsetter(outcome, 1)
self.register_updates(gv)
try:
givex(result=self.fn(*self.args, **self.kwargs), status="done")
except Abort:
givex(status="aborted")
raise
except Exception as error:
givex(error, status="error")
givex(walltime=time.time() - t0)
return outcome
def loop(self, from_error=None):
self._loop_thread = threading.current_thread()
result = None
err = None
if from_error:
self.setcommand("from_error")
else:
self.setcommand("go")
with self.wrap_loop(), watching_changes() as chgs:
chgs.debounce(0.05) >> self.command("go", aborts=True)
while True:
try:
cmd = self._q.get()
if cmd == "go":
result, err = self.run()
elif cmd == "cont":
break
elif cmd == "abort":
pass
elif cmd == "quit":
sys.exit(1)
elif cmd == "from_error":
with given() as gv:
self.register_updates(gv)
givex(error=from_error, status="error")
result, err = None, from_error
except Abort:
continue
if err is not None:
raise err
else:
return result
class RedirectDeveloopRunner(DeveloopRunner):
@contextmanager
def wrap_run(self):
out = FileGiver("#stdout")
err = FileGiver("#stderr")
with redirect_stdout(out), redirect_stderr(err):
yield
class Develoop:
def __init__(self, fn, on_error, runner_class):
self.fn = fn
self.on_error = on_error
self.runner_class = runner_class
def __get__(self, obj, cls):
return type(self)(
self.fn.__get__(obj, cls),
on_error=self.on_error,
runner_class=self.runner_class,
)
def __call__(self, *args, **kwargs):
exc = None
if self.on_error:
try:
return self.fn(*args, **kwargs)
except Exception as _exc:
exc = _exc
return self.runner_class(self.fn, args, kwargs).loop(from_error=exc)

465
jurigged/loop/richloop.py Normal file
View File

@ -0,0 +1,465 @@
import re
import sys
from collections import deque
from contextlib import contextmanager
from dataclasses import dataclass
import reactivex as rx
from giving import ObservableProxy
from pygments import token
from rich._loop import loop_last
from rich.cells import cell_len
from rich.console import Console, Group
from rich.constrain import Constrain
from rich.highlighter import ReprHighlighter
from rich.live import Live
from rich.markup import render as markup
from rich.panel import Panel
from rich.pretty import Pretty
from rich.segment import Segment
from rich.style import Style
from rich.table import Table
from rich.text import Text
from rich.theme import Theme
from rich.traceback import Traceback
from .basic import ANSI_ESCAPE, cbreak, read_chars, readable_duration
from .develoop import RedirectDeveloopRunner, itemappender, kill_thread
REAL_STDOUT = sys.stdout
TEMP_CONSOLE = Console(color_system="standard")
class TracebackNoFrame(Traceback):
"""Variant of rich.traceback.Traceback that does not draw a frame around the traceback."""
def __rich_console__(self, console, options):
# I basically just copied this from https://github.com/willmcgugan/rich/blob/master/rich/traceback.py
# and removed calls to Panel
theme = self.theme
token_style = theme.get_style_for_token
traceback_theme = Theme(
{
"pretty": token_style(token.Text),
"pygments.text": token_style(token.Token),
"pygments.string": token_style(token.String),
"pygments.function": token_style(token.Name.Function),
"pygments.number": token_style(token.Number),
"repr.indent": token_style(token.Comment) + Style(dim=True),
"repr.str": token_style(token.String),
"repr.brace": token_style(token.Text) + Style(bold=True),
"repr.number": token_style(token.Number),
"repr.bool_true": token_style(token.Keyword.Constant),
"repr.bool_false": token_style(token.Keyword.Constant),
"repr.none": token_style(token.Keyword.Constant),
"scope.border": token_style(token.String.Delimiter),
"scope.equals": token_style(token.Operator),
"scope.key": token_style(token.Name),
"scope.key.special": token_style(token.Name.Constant)
+ Style(dim=True),
}
)
highlighter = ReprHighlighter()
for last, stack in loop_last(reversed(self.trace.stacks)):
if stack.frames:
stack_renderable = self._render_stack(stack)
stack_renderable = Constrain(stack_renderable, self.width)
with console.use_theme(traceback_theme):
yield stack_renderable
if stack.syntax_error is not None:
with console.use_theme(traceback_theme):
yield Constrain(
self._render_syntax_error(stack.syntax_error)
)
yield Text.assemble(
(f"{stack.exc_type}: ", "traceback.exc_type"),
highlighter(stack.syntax_error.msg),
)
elif stack.exc_value:
yield Text.assemble(
(f"{stack.exc_type}: ", "traceback.exc_type"),
highlighter(stack.exc_value),
)
else:
yield Text.assemble((f"{stack.exc_type}", "traceback.exc_type"))
if not last:
if stack.is_cause:
yield Text.from_markup(
"\n[i]The above exception was the direct cause of the following exception:\n",
)
else:
yield Text.from_markup(
"\n[i]During handling of the above exception, another exception occurred:\n",
)
class RawSegment(Segment):
@property
def cell_length(self):
assert not self.control
return cell_len(re.sub(ANSI_ESCAPE, "", self.text))
@dataclass
class Line:
text: str = ""
length: int = 0
def __bool__(self):
return bool(self.text)
def breakline(line, limit=80, initial=Line()):
if not line:
yield initial
return
parts = [
(x, i % 2 == 1)
for i, x in enumerate(re.split(pattern=ANSI_ESCAPE, string=line))
]
current_line = initial.text
avail = limit - initial.length
work = deque(parts)
while work:
part, escape = work.popleft()
if escape:
current_line += part
else:
if not avail:
ok, extra = "", part
else:
ok, extra = part[:avail], part[avail:]
avail -= len(ok)
current_line += ok
if extra:
work.appendleft((extra, False))
yield Line(current_line, limit - avail)
current_line = ""
avail = limit
if current_line:
yield Line(current_line, limit - avail)
class TerminalLines:
def __init__(self, title, border="white", border_highlight="bold yellow"):
self.title = title
self.border = border
self.border_highlight = border_highlight
self.height = 0
self.width = 80
self.window_size = 1
self.clear()
def set_at_end(self):
self.at_end = self.start >= (len(self) - self.window_size)
def add(self, text):
line1, *lines = text.split("\n")
self.lines[-1:] = breakline(
line1, limit=self.width, initial=self.lines[-1]
)
for line in lines:
self.lines += breakline(line, limit=self.width)
return self
def clear(self):
self.lines = [Line()]
self.start = 0
self.at_end = True
def shift(self, n, mode):
if mode == "line":
self.start = max(0, self.start + n)
elif mode == "screen":
self.start = max(0, self.start + n * self.window_size)
elif mode == "whole":
self.start = max(0, self.start + n * len(self))
self.set_at_end()
def __len__(self):
# We don't count the last line if it is empty
return len(self.lines) - 1 + bool(self.lines[-1])
def __rich_console__(self, console, options):
if self.at_end:
self.start = len(self)
self.start = max(0, min(self.start, len(self) - self.window_size))
for i, line in enumerate(self.lines[self.start : len(self)]):
yield RawSegment(line.text)
if i < len(self) - 1:
yield Segment.line()
__iadd__ = add
class StackedTerminalLines:
def __init__(self, boxes, total_height, width):
self.boxes = boxes
for b in self.boxes:
b.width = width
self.box_map = {b.title: b for b in self.boxes}
self.total_height = total_height
self.width = width
self.focus = None
def __getitem__(self, item):
return self.box_map[item]
def __setitem__(self, item, value):
pass
def clear(self):
for b in self.boxes:
b.clear()
def move_focus(self, n):
nb = len(self.boxes)
old_focus = self.focus or 0
explore = [(i + n + old_focus + nb) % nb for i in range(nb + 1)]
if n < 0:
explore.reverse()
for focus in explore:
if self.boxes[focus]:
break
self.focus = focus
def shift(self, n, mode):
self.focus = self.focus or 0
self.boxes[self.focus].shift(n, mode=mode)
def distribute_heights(self):
budget = self.total_height
boxes = self.boxes
max_height = max(len(b) for b in boxes)
nactive = len([b for b in boxes if b])
if nactive == 0:
return
max_share = budget // nactive
for i, b in enumerate(boxes):
b.height = h = min(max_share, len(b) + 2) if b else 0
if self.focus is None and len(b) > max_share:
self.focus = i
budget -= h
if budget:
for b in boxes:
if len(b) == max_height:
b.height += budget
break
for b in boxes:
b.window_size = b.height - 2
def __rich_console__(self, console, options):
self.distribute_heights()
for i, box in enumerate(self.boxes):
if box.height:
if i == self.focus:
title = f"[bold]{box.title}"
style = box.border_highlight
else:
title = box.title
style = box.border
yield Panel(
box, title=title, height=box.height, border_style=style
)
class Dash:
def __init__(self, *parts):
self.console = Console(color_system="standard", file=REAL_STDOUT)
self.lv = Live(
auto_refresh=False,
redirect_stdout=False,
redirect_stderr=False,
console=self.console,
screen=True,
)
self.stack = StackedTerminalLines(
parts, self.lv.console.height - 2, width=self.lv.console.width - 4
)
self.header = Text("<header>")
self.footer = Text("<footer>")
def clear(self):
self.stack.clear()
self.header = Text("<header>")
self.footer = Text("<footer>")
def shifter(self, n, mode):
def shift(_=None):
if mode == "line":
self.stack.shift(n, mode="line")
elif mode == "screen":
self.stack.shift(n, mode="screen")
elif mode == "whole":
self.stack.shift(n, mode="whole")
elif mode == "focus":
self.stack.move_focus(n)
else:
raise Exception(f"Unknown mode: {mode}")
self.update()
return shift
def update(self):
self.lv.update(
Group(self.header, self.stack, self.footer), refresh=True
)
def run(self):
return self.lv
class RichDeveloopRunner(RedirectDeveloopRunner):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.dash = Dash(
TerminalLines(title="stdout"),
TerminalLines(
title="stderr", border="red", border_highlight="bold red"
),
TerminalLines(title="given"),
TerminalLines(
title="error", border="red", border_highlight="bold red"
),
TerminalLines(
title="result", border="cyan", border_highlight="bold cyan"
),
)
def _update(self):
wall = (
f" in {readable_duration(self._walltime)}" if self._walltime else ""
)
footer = [
f"#{self.num} ({self._status}{wall})",
"[bold](c)[/bold]ontinue",
"[bold](r)[/bold]erun",
(not self._has_result and not self._has_error)
and "[bold](a)[/bold]bort",
"[bold](q)[/bold]uit",
]
self.dash.header = markup(f"Looping on: [bold]{self.signature()}")
self.dash.footer = markup(" | ".join(x for x in footer if x))
if self._gvn:
self.dash.stack["given"].clear()
table = Table.grid(padding=(0, 3, 0, 0))
table.add_column("key", style="bold green")
table.add_column("value")
for k, v in self._gvn.items():
table.add_row(k, Pretty(v))
with TEMP_CONSOLE.capture() as cap:
TEMP_CONSOLE.print(table)
self.dash.stack["given"].add(cap.get())
self.dash.update()
@contextmanager
def wrap_loop(self):
with self.dash.run(), cbreak():
try:
scheduler = rx.scheduler.EventLoopScheduler()
kp = ObservableProxy(
rx.from_iterable(read_chars(), scheduler=scheduler)
).share()
kp.where(char="c") >> self.command("cont")
kp.where(char="r") >> self.command("go", aborts=True)
kp.where(char="a") >> self.command("abort", aborts=True)
kp.where(char="q") >> self.command("quit", aborts=True)
# Up
kp.where(char="[A") >> self.dash.shifter(-1, mode="line")
# Down
kp.where(char="[B") >> self.dash.shifter(1, mode="line")
# Page Up
kp.where(char="[5~") >> self.dash.shifter(-1, mode="screen")
# Page Down
kp.where(char="[6~") >> self.dash.shifter(1, mode="screen")
# Home
kp.where(char="[1~") >> self.dash.shifter(-1, mode="whole")
# End
kp.where(char="[4~") >> self.dash.shifter(1, mode="whole")
# Left
kp.where(char="[D") >> self.dash.shifter(-1, mode="focus")
# Right
kp.where(char="[C") >> self.dash.shifter(1, mode="focus")
yield
finally:
kill_thread(scheduler._thread)
scheduler.dispose()
def register_updates(self, gv):
self.dash.clear()
self._has_result = False
self._has_error = False
self._status = "running"
self._walltime = 0
self._gvn = {}
# Append stdout/stderr incrementally
gv["?#stdout"] >> itemappender(self.dash.stack, "stdout")
gv["?#stderr"] >> itemappender(self.dash.stack, "stderr")
def _on(key):
# black and vscode's syntax highlighter both choke on parsing the following
# as a decorator, that's why I made a function
return gv.getitem(key, strict=False).subscribe
@_on("#result")
def _(result):
with TEMP_CONSOLE.capture() as cap:
TEMP_CONSOLE.print(result)
self.dash.stack["result"].add(cap.get())
self._has_result = True
@_on("#error")
def _(error):
tb = TracebackNoFrame(
trace=TracebackNoFrame.extract(
type(error), error, error.__traceback__
),
)
with TEMP_CONSOLE.capture() as cap:
TEMP_CONSOLE.print(tb)
self.dash.stack["error"].add(cap.get())
self._has_error = True
@_on("#status")
def _(status):
self._status = status
@_on("#walltime")
def _(walltime):
self._walltime = walltime
# Fill given table
@gv.subscribe
def _(d):
self._gvn.update(
{
k: v
for k, v in d.items()
if not k.startswith("#") and not k.startswith("$")
}
)
# TODO: this may be a bit wasteful
# Debounce is used to ignore events if they are followed by another
# event less than 0.05s later. Delay + throttle ensures we get at
# least one event every 0.25s. We of course update as soon as the
# last event is in.
(gv.debounce(0.05) | gv.delay(0.25).throttle(0.25) | gv.last()) >> (
lambda _: self._update()
)
self._update()

87
jurigged/parse.py Normal file
View File

@ -0,0 +1,87 @@
import ast
from dataclasses import dataclass, field, replace as dc_replace
from ovld import ovld
@dataclass
class Variables:
assigned: set = field(default_factory=set)
read: set = field(default_factory=set)
closure: set = field(default_factory=set)
@property
def free(self):
return self.read - self.assigned
replace = dc_replace
def __or__(self, other):
return Variables(
assigned=self.assigned | other.assigned,
read=self.read | other.read,
)
@ovld
def variables(self, seq: list, mapping):
fvs = Variables()
for node in seq:
fvs = fvs | self(node, mapping)
return fvs
@ovld
def variables(self, node: (ast.FunctionDef, ast.AsyncFunctionDef), mapping):
fvs = (
self(node.body, mapping)
| self(node.args.args, mapping)
| self(node.args.posonlyargs, mapping)
| self(node.args.kwonlyargs, mapping)
| self(node.args.kwarg, mapping)
| self(node.args.vararg, mapping)
)
mapping[node] = fvs
outer = (
self(node.decorator_list, mapping)
| self(node.args.defaults, mapping)
| self(node.args.kw_defaults, mapping)
)
return outer | Variables(assigned={node.name}, read=fvs.free)
@ovld
def variables(self, node: ast.ClassDef, mapping):
fvs = self(node.body, mapping) | Variables(assigned={"__class__"})
mapping[node] = fvs
outer = self(node.decorator_list, mapping)
return outer | Variables(assigned={node.name}, read=fvs.free)
@ovld
def variables(self, node: ast.arg, mapping):
return Variables(assigned={node.arg})
@ovld
def variables(self, node: ast.Name, mapping):
if isinstance(node.ctx, ast.Load):
read = {node.id}
if node.id == "super":
read.add("__class__")
return Variables(read=read)
elif isinstance(node.ctx, ast.Store):
return Variables(assigned={node.id})
else:
return Variables(read={node.id})
@ovld
def variables(self, node: ast.AST, mapping):
return self(list(ast.iter_child_nodes(node)), mapping)
@ovld # pragma: no cover
def variables(self, thing: object, mapping):
# Just in case
return Variables()

131
jurigged/recode.py Normal file
View File

@ -0,0 +1,131 @@
import linecache
import textwrap
from ast import _splitlines_no_ff as splitlines
from contextlib import contextmanager
from itertools import count
from .codetools import CodeFile, CodeFileOperation, LineDefinition, ModuleCode
from .register import registry
from .utils import EventSource
_count = count(1)
class OutOfSyncException(Exception):
pass
def virtual_file(name, contents):
filename = f"<{name}#{next(_count)}>"
linecache.cache[filename] = (None, None, splitlines(contents), filename)
return filename
class Recoder:
def __init__(self, name, codefile, deletable=False, focus=None):
self.name = name
self.codefile = codefile
self.deletable = deletable
self.focus = focus
self.watched = [] if focus is None else [focus]
self.status = "live"
self.on_status = EventSource()
self.codefile.activity.register(self._listen)
self._current_patch = None
self._listening = True
def set_status(self, status):
self.status = status
self.on_status.emit(self, self.status)
def _listen(self, event):
if self._listening:
if isinstance(event, CodeFileOperation):
if event.defn in self.watched:
self.set_status("out-of-sync")
@contextmanager
def _patching(self, new_code):
new_code = new_code.strip()
filename = virtual_file(self.name, new_code)
cf = CodeFile(
filename=filename,
source=new_code,
module_name=self.codefile.module_name,
)
registry.cache[filename] = self.codefile
yield cf
self._listening = False
(same, changes, additions, deletions) = self.codefile.merge(
cf, allow_deletions=self.deletable and self.focus and [self.focus]
)
self.watched = [*same, *changes, *additions]
self.set_status("live")
self._current_patch = new_code
self._listening = True
def patch(self, new_code):
def _encompasses(defn):
for x in self.focus.hierarchy():
if x.correspond(defn).corresponds:
return True
for x in defn.hierarchy():
if x.correspond(self.focus).corresponds:
return True
return False
if self.focus is None:
return self.patch_module(new_code)
for parent in list(self.focus.hierarchy())[1:]:
if not isinstance(parent, ModuleCode):
new_code = textwrap.indent(new_code, " ")
new_code = f"{parent.header()}{new_code}"
with self._patching(new_code) as cf:
(
same,
changes,
additions,
deletions,
) = self.codefile.root.correspond(cf.root).summary()
seq = [*changes, *additions]
seq = [d for d in seq if not isinstance(d, LineDefinition)]
if not all(_encompasses(culprit := d) for d in seq):
raise ValueError(
f"Recoder for {self.focus.name} cannot be used to define {culprit.name}" # noqa: F821
)
def patch_module(self, new_code):
with self._patching(new_code):
pass
def repatch(self):
if self.status == "out-of-sync" and self._current_patch is not None:
self.patch_module(self._current_patch)
def commit(self):
if self.status == "out-of-sync":
raise OutOfSyncException(
f"File {self.codefile.filename} is out of sync with the patch"
)
else:
self.codefile.commit()
self.set_status("saved")
def revert(self):
self.codefile.refresh()
def make_recoder(obj, deletable=False):
cf, defn = registry.find(obj)
return (
cf
and defn
and Recoder(
name=defn.dotpath(), codefile=cf, focus=defn, deletable=deletable
)
)

211
jurigged/register.py Normal file
View File

@ -0,0 +1,211 @@
import importlib.util
import logging
import os
import sys
from types import CodeType, FunctionType, ModuleType
from _frozen_importlib_external import SourceFileLoader
from ovld import OvldMC, ovld
from .codetools import CodeFile, FunctionDefinition
from .utils import EventSource, glob_filter
log = logging.getLogger(__name__)
class Registry(metaclass=OvldMC):
def __init__(self):
self.filename_to_module = {}
# Cache of (module_name, file_contents, mtime)
# A snapshot of the file contents may be saved before it might be modified
self.precache = {}
# Cache of CodeFile (lazy)
self.cache = {}
self.precache_activity = EventSource(save_history=True)
self.activity = EventSource()
self._log = None
def set_logger(self, log):
self._log = log
def log(self, *args, **kwargs):
if self._log is not None:
self._log(*args, **kwargs)
def prepare(self, module_name=None, filename=None):
if filename is None:
assert module_name is not None
filename = sys.modules[module_name].__file__
if filename not in self.precache and filename not in self.cache:
if module_name is None:
if filename in self.filename_to_module:
module_name = self.filename_to_module[filename]
else:
for module_name, module in sys.modules.items():
fname = getattr(module, "__file__", None)
if fname:
self.filename_to_module[fname] = module_name
if fname == filename:
break
else: # pragma: no cover
raise Exception(
f"Cannot find module that corresponds to {filename}"
)
if os.path.exists(filename):
with open(filename, "r", encoding="utf8") as f:
self.precache[filename] = (
module_name,
f.read(),
os.path.getmtime(filename),
)
self.precache_activity.emit(module_name, filename)
return module_name, filename
def get(self, filename):
if filename in self.cache:
return self.cache[filename]
if filename in self.precache:
module_name, cached_source, mtime = self.precache[filename]
if module_name not in sys.modules:
return None
cf = CodeFile(
filename, source=cached_source, module_name=module_name
)
cf.associate(sys.modules[module_name])
cf.activity.register(self.log)
# Basic forwarding of the CodeFile's events
cf.activity.register(self.activity.emit)
self.cache[filename] = cf
return cf
return None
def get_at(self, filename, lineno):
cf = self.get(filename)
if cf is None:
return None, None
for entry in cf.root.walk():
if (
isinstance(entry, FunctionDefinition)
and entry.node is not None
and (
(
entry.stashed.lineno == lineno
and entry.stashed.filename == filename
)
or (
entry.node.extent.lineno == lineno
and entry.node.extent.filename == filename
)
)
):
return cf, entry
else:
return cf, None
def auto_register(self, filter=glob_filter("./*.py")):
def prep(module_name, filename):
if (
filename is not None
and module_name is not None
and filter(filename)
):
try:
self.prepare(module_name, filename)
except (UnicodeDecodeError, OSError): # pragma: no cover
pass
for name, module in list(sys.modules.items()):
filename = getattr(module, "__file__", None)
module_name = getattr(module, "__name__", None)
prep(module_name, filename)
return add_sniffer(prep)
@ovld
def find(self, module: ModuleType):
self.prepare(module.__name__, module.__file__)
cf = self.get(module.__file__)
return cf, cf.root
@ovld
def find(self, fn: FunctionType):
co = fn.__code__
self.prepare(fn.__module__, co.co_filename)
return self.get_at(co.co_filename, co.co_firstlineno)
@ovld
def find(self, co: CodeType):
self.prepare(filename=co.co_filename)
return self.get_at(co.co_filename, co.co_firstlineno)
@ovld
def find(self, cls: type):
_, filename = self.prepare(module_name=cls.__module__)
cf = self.get(filename)
key = f"{cls.__module__}.{cls.__qualname__}"
for entry in cf.root.walk():
if entry.dotpath() == key:
return cf, entry
else:
return cf, None
registry = Registry()
class ImportSniffer:
"""A spec finder that simply sniffs for attempted imports.
Basically we install this at the front of sys.meta_path so that
importlib.util.find_spec calls it, then we call find_spec
ourselves to locate the file that's going to be read so that we
know we have to cache its contents and watch for changes.
"""
def __init__(self):
self.working = False
def find_module(self, spec, path):
if not _sniffer_callbacks:
return None
if not self.working:
self.working = True
# We call find_spec ourselves to find out where the file is.
# This will not cause an infinite loop because self.working
# is True and we will not enter the conditional. I'm not
# sure if it's dangerous to call find_spec within find_spec,
# but it seems to work, so whatever.
mspec = importlib.util.find_spec(spec, path)
if (
mspec is not None
and isinstance(mspec.loader, SourceFileLoader)
and mspec.name is not None
and mspec.origin is not None
):
for report in _sniffer_callbacks:
try:
report(mspec.name, mspec.origin)
except Exception as exc:
log.error(
f"jurigged: Error processing spec {mspec.name}",
exc_info=exc,
)
self.working = False
return None
_main_sniffer = ImportSniffer()
sys.meta_path.insert(0, _main_sniffer)
_sniffer_callbacks = []
def add_sniffer(report):
_sniffer_callbacks.append(report)
report.uninstall = lambda: _sniffer_callbacks.remove(report)
return report

105
jurigged/rescript.py Normal file
View File

@ -0,0 +1,105 @@
import ast
import io
import types
def split_script(script): # pragma: no cover
"""Split code that comes after all function definitions.
Essentially, we want to be able to instrument functions in the main script, which
requires evaluating the functions, but we want to do this before executing the main
code. So we split off code that comes after function definitions so that we can evaluate
the module and then evaluate that code separately.
Code between function definitions will be evaluated right away, but the bulk usually
comes after these definitions (because they need to use them).
"""
with io.open_code(script) as f:
source_code = f.read()
tree = ast.parse(source_code, mode="exec")
last_def = 0
for i, stmt in enumerate(tree.body):
if isinstance(
stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)
):
last_def = i + 1
mod_before = ast.copy_location(
ast.Module(
body=tree.body[:last_def],
type_ignores=[],
),
tree,
)
mod_after = ast.copy_location(
ast.Module(
body=tree.body[last_def:],
type_ignores=[],
),
tree,
)
return (
compile(mod_before, script, "exec"),
compile(mod_after, script, "exec"),
)
redirector = """
def {name}(*args, **kwargs):
return ____jurigged_wrapped_{name}(*args, **kwargs)
"""
def redirector_code(name):
"""Return a code object that calls a global with a certain name.
That code object is meant to be patched onto an existing function so that it
can redirect to something else.
"""
glb = {}
exec(redirector.format(name=name), glb)
fn = glb[name]
return fn.__code__
def redirect(orig, transform):
"""Redirect a function to a transformed version of it.
The __code__ pointer of the function will be patched to redirect to a decorated
version of the function. That way, all existing pointers for the function will
use the decorated version.
"""
saved = types.FunctionType(
orig.__code__,
orig.__globals__,
orig.__name__,
orig.__defaults__,
orig.__closure__,
)
name = orig.__name__
new_code = redirector_code(name)
orig.__code__ = new_code
orig.__globals__[f"____jurigged_wrapped_{name}"] = transform(saved)
def redirect_code(code, transform):
"""Patch the function with the given code to a transformed/decorated version.
The __code__ pointer of the function will be patched to redirect to a decorated
version of the function. That way, all existing pointers for the function will
use the decorated version.
"""
import codefind
fns = codefind.get_functions(code)
if len(fns) != 1:
raise Exception(
f"Redecoration of {code} requires exactly one function to use it."
)
(fn,) = fns
redirect(fn, transform)

306
jurigged/runpy.py Normal file
View File

@ -0,0 +1,306 @@
"""runpy.py - locating and running Python code using the module namespace
Provides support for locating and running Python scripts using the Python
module namespace instead of the native filesystem.
This allows Python code to play nicely with non-filesystem based PEP 302
importers when locating support scripts as well as when importing modules.
"""
# Written by Nick Coghlan <ncoghlan at gmail.com>
# to implement PEP 338 (Executing Modules as Scripts)
# This is a modified version of runpy.py, lifted from Python 3.9.2
# The main difference is that this version does NOT restore sys.modules["__main__"]
import importlib.machinery # importlib first so we can test #15386 via -m
import importlib.util
import io
import os
import sys
from pkgutil import get_importer, read_code
from .rescript import split_script
__all__ = [
"run_module",
"run_path",
]
class _ModifiedArgv0(object):
def __init__(self, value):
self.value = value
self._saved_value = self._sentinel = object()
def __enter__(self):
if self._saved_value is not self._sentinel:
raise RuntimeError("Already preserving saved value")
self._saved_value = sys.argv[0]
sys.argv[0] = self.value
def __exit__(self, *args):
self.value = self._sentinel
sys.argv[0] = self._saved_value
# TODO: Replace these helpers with importlib._bootstrap_external functions.
def _run_code(
code,
run_globals,
init_globals=None,
mod_name=None,
mod_spec=None,
pkg_name=None,
script_name=None,
prepare=None,
):
"""Helper to run code in nominated namespace"""
if init_globals is not None:
run_globals.update(init_globals)
if mod_spec is None:
loader = None
fname = script_name
cached = None
else:
loader = mod_spec.loader
fname = mod_spec.origin
cached = mod_spec.cached
if pkg_name is None:
pkg_name = mod_spec.parent
run_globals.update(
__name__=mod_name,
__file__=fname,
__cached__=cached,
__doc__=None,
__loader__=loader,
__package__=pkg_name,
__spec__=mod_spec,
)
if isinstance(code, tuple):
before, after = code
exec(before, run_globals)
if prepare is not None:
prepare(run_globals)
exec(after, run_globals)
else:
exec(code, run_globals)
if prepare is not None:
prepare(run_globals)
return run_globals
def _run_module_code(
code,
init_globals=None,
module_object=None,
mod_spec=None,
pkg_name=None,
script_name=None,
prepare=None,
):
"""Helper to run code in new namespace with sys modified"""
fname = script_name if mod_spec is None else mod_spec.origin
mod_name = module_object.__name__
temp_module = module_object
sys.modules[mod_name] = temp_module
with _ModifiedArgv0(fname):
mod_globals = temp_module.__dict__
_run_code(
code,
mod_globals,
init_globals,
mod_name,
mod_spec,
pkg_name,
script_name,
prepare=prepare,
)
return temp_module
# Helper to get the full name, spec and code for a module
def _get_module_details(mod_name, error=ImportError):
if mod_name.startswith("."):
raise error("Relative module names not supported")
pkg_name, _, _ = mod_name.rpartition(".")
if pkg_name:
# Try importing the parent to avoid catching initialization errors
try:
__import__(pkg_name)
except ImportError as e:
# If the parent or higher ancestor package is missing, let the
# error be raised by find_spec() below and then be caught. But do
# not allow other errors to be caught.
if e.name is None or (
e.name != pkg_name and not pkg_name.startswith(e.name + ".")
):
raise
# Warn if the module has already been imported under its normal name
existing = sys.modules.get(mod_name)
if existing is not None and not hasattr(existing, "__path__"):
from warnings import warn
msg = (
"{mod_name!r} found in sys.modules after import of "
"package {pkg_name!r}, but prior to execution of "
"{mod_name!r}; this may result in unpredictable "
"behaviour".format(mod_name=mod_name, pkg_name=pkg_name)
)
warn(RuntimeWarning(msg))
try:
spec = importlib.util.find_spec(mod_name)
except (ImportError, AttributeError, TypeError, ValueError) as ex:
# This hack fixes an impedance mismatch between pkgutil and
# importlib, where the latter raises other errors for cases where
# pkgutil previously raised ImportError
msg = "Error while finding module specification for {!r} ({}: {})"
if mod_name.endswith(".py"):
msg += (
f". Try using '{mod_name[:-3]}' instead of "
f"'{mod_name}' as the module name."
)
raise error(msg.format(mod_name, type(ex).__name__, ex)) from ex
if spec is None:
raise error("No module named %s" % mod_name)
if spec.submodule_search_locations is not None:
if mod_name == "__main__" or mod_name.endswith(".__main__"):
raise error("Cannot use package as __main__ module")
try:
pkg_main_name = mod_name + ".__main__"
return _get_module_details(pkg_main_name, error)
except error as e:
if mod_name not in sys.modules:
raise # No module loaded; being a package is irrelevant
raise error(
("%s; %r is a package and cannot " + "be directly executed")
% (e, mod_name)
)
loader = spec.loader
if loader is None:
raise error(
"%r is a namespace package and cannot be executed" % mod_name
)
try:
code = loader.get_code(mod_name)
except ImportError as e:
raise error(format(e)) from e
if code is None:
raise error("No code object available for %s" % mod_name)
return mod_name, spec, code
def run_module(
mod_name,
init_globals=None,
module_object=None,
alter_sys=True,
prepare=None,
):
"""Execute a module's code without importing it
Returns the resulting top level namespace dictionary
"""
mod_name, mod_spec, code = _get_module_details(mod_name)
if alter_sys:
return _run_module_code(
code, init_globals, module_object, mod_spec, prepare=prepare
)
else:
# Leave the sys module alone
return _run_code(
code,
module_object.__dict__,
init_globals,
module_object.__name__,
mod_spec,
prepare=prepare,
)
def _get_main_module_details(error=ImportError):
# Helper that gives a nicer error message when attempting to
# execute a zipfile or directory by invoking __main__.py
# Also moves the standard __main__ out of the way so that the
# preexisting __loader__ entry doesn't cause issues
main_name = "__main__"
saved_main = sys.modules[main_name]
del sys.modules[main_name]
try:
return _get_module_details(main_name)
except ImportError as exc:
if main_name in str(exc):
raise error(
"can't find %r module in %r" % (main_name, sys.path[0])
) from exc
raise
finally:
sys.modules[main_name] = saved_main
def _get_code_from_file(run_name, fname):
# Check for a compiled file first
decoded_path = os.path.abspath(os.fsdecode(fname))
with io.open_code(decoded_path) as f:
code = read_code(f)
if code is None:
# That didn't work, so try it as normal source code
code = split_script(fname)
return code, fname
def run_path(path_name, module_object, init_globals=None, prepare=None):
"""Execute code located at the specified filesystem location
Returns the resulting top level namespace dictionary
The file path may refer directly to a Python script (i.e.
one that could be directly executed with execfile) or else
it may refer to a zipfile or directory containing a top
level __main__.py script.
"""
run_name = pkg_name = module_object.__name__
importer = get_importer(path_name)
# Trying to avoid importing imp so as to not consume the deprecation warning.
is_NullImporter = False
if type(importer).__module__ == "imp":
if type(importer).__name__ == "NullImporter":
is_NullImporter = True
if isinstance(importer, type(None)) or is_NullImporter:
# Not a valid sys.path entry, so run the code directly
# execfile() doesn't help as we want to allow compiled files
code, fname = _get_code_from_file(run_name, path_name)
return _run_module_code(
code,
init_globals,
module_object,
pkg_name=pkg_name,
script_name=fname,
prepare=prepare,
)
else:
# Finder is defined for path, so add it to
# the start of sys.path
sys.path.insert(0, path_name)
try:
# Here's where things are a little different from the run_module
# case. There, we only had to replace the module in sys while the
# code was running and doing so was somewhat optional. Here, we
# have no choice and we have to remove it even while we read the
# code. If we don't do this, a __loader__ attribute in the
# existing __main__ module may prevent location of the new module.
mod_name, mod_spec, code = _get_main_module_details()
temp_module = module_object
sys.modules[run_name] = temp_module
with _ModifiedArgv0(path_name):
mod_globals = temp_module.__dict__
_run_code(
code,
mod_globals,
init_globals,
run_name,
mod_spec,
pkg_name,
prepare=prepare,
)
return temp_module
finally:
try:
sys.path.remove(path_name)
except ValueError:
pass

49
jurigged/utils.py Normal file
View File

@ -0,0 +1,49 @@
import fnmatch
import os
import types
class EventSource(list):
def __init__(self, *, save_history=False):
if save_history:
self._history = []
else:
self._history = None
def register(self, listener, apply_history=True):
if self._history and apply_history:
for args, kwargs in self._history:
listener(*args, **kwargs)
self.append(listener)
return listener
def emit(self, *args, **kwargs):
for listener in self:
listener(*args, **kwargs)
if self._history is not None:
self._history.append((args, kwargs))
def glob_filter(pattern):
if pattern.startswith("~"):
pattern = os.path.expanduser(pattern)
elif not pattern.startswith("/"):
pattern = os.path.abspath(pattern)
if os.path.isdir(pattern):
pattern = os.path.join(pattern, "*")
def matcher(filename):
return fnmatch.fnmatch(filename, pattern)
return matcher
def shift_lineno(co, delta):
if isinstance(co, types.CodeType):
return co.replace(
co_firstlineno=co.co_firstlineno + delta,
co_consts=tuple(shift_lineno(ct, delta) for ct in co.co_consts),
)
else:
return co

1
jurigged/version.py Normal file
View File

@ -0,0 +1 @@
version = "0.5.5"