143 lines
4.1 KiB
Python
143 lines
4.1 KiB
Python
import re
|
|
import select
|
|
import sys
|
|
import termios
|
|
import traceback
|
|
import tty
|
|
from contextlib import contextmanager
|
|
from functools import partial
|
|
|
|
from .develoop import Abort, DeveloopRunner
|
|
|
|
ANSI_ESCAPE = re.compile(r"\x1b\[[;\d]*[A-Za-z]")
|
|
ANSI_ESCAPE_INNER = re.compile(r"[\x1b\[;\d]")
|
|
ANSI_ESCAPE_END = re.compile(r"[A-Za-z~]")
|
|
|
|
|
|
@contextmanager
|
|
def cbreak():
|
|
old_attrs = termios.tcgetattr(sys.stdin)
|
|
tty.setcbreak(sys.stdin)
|
|
try:
|
|
yield
|
|
finally:
|
|
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attrs)
|
|
|
|
|
|
def read_chars():
|
|
esc = None
|
|
try:
|
|
while True:
|
|
ready, _, _ = select.select([sys.stdin], [], [], 0.02)
|
|
if ready:
|
|
# Sometimes, e.g. when pressing an up arrow, multiple
|
|
# characters are buffered, and read1() is the only way
|
|
# I found to read precisely what was buffered. select
|
|
# seems unreliable in these cases, probably because the
|
|
# buffer fools it into thinking there is nothing else
|
|
# to read. So read(1) would leave some characters dangling
|
|
# in the buffer until the next keypress.
|
|
for ch in sys.stdin.buffer.read1():
|
|
ch = chr(ch)
|
|
if esc is not None:
|
|
if ANSI_ESCAPE_INNER.match(ch):
|
|
esc += ch
|
|
elif ANSI_ESCAPE_END.match(ch):
|
|
yield {"char": esc + ch, "escape": True}
|
|
esc = None
|
|
else:
|
|
yield {"char": esc, "escape": True}
|
|
esc = None
|
|
yield {"char": ch}
|
|
elif ch == "\x1b":
|
|
esc = ""
|
|
else:
|
|
yield {"char": ch}
|
|
except Abort:
|
|
pass
|
|
|
|
|
|
class BasicDeveloopRunner(DeveloopRunner):
|
|
def __init__(self, fn, args, kwargs):
|
|
super().__init__(fn, args, kwargs)
|
|
self._status = "running"
|
|
self._walltime = 0
|
|
|
|
def _pad(self, text, total):
|
|
text = f"#{self.num}: {text}"
|
|
rest = total - len(text) - 6
|
|
return f"---- {text} " + "-" * rest
|
|
|
|
def _finish(self, status, result):
|
|
print(self._pad(status, 50))
|
|
if status == "ERROR":
|
|
traceback.print_exception(
|
|
type(result), result, result.__traceback__
|
|
)
|
|
else:
|
|
print(f"{result}")
|
|
|
|
footer = [
|
|
"(c)ontinue",
|
|
"(r)erun",
|
|
"(q)uit",
|
|
]
|
|
print(self._pad(" | ".join(footer), 50))
|
|
|
|
with cbreak():
|
|
for c in read_chars():
|
|
if c["char"] == "c":
|
|
self.command("cont")()
|
|
break
|
|
elif c["char"] == "r":
|
|
self.command("go")()
|
|
break
|
|
elif c["char"] == "q":
|
|
self.command("quit")()
|
|
break
|
|
|
|
def register_updates(self, gv):
|
|
print(self._pad(self.signature(), 50))
|
|
|
|
gv["?#result"] >> partial(self._finish, "RESULT")
|
|
gv["?#error"] >> partial(self._finish, "ERROR")
|
|
|
|
gv.filter(
|
|
lambda d: not any(
|
|
k.startswith("#") and not k.startswith("$") for k in d.keys()
|
|
)
|
|
).display()
|
|
|
|
def _on(key):
|
|
# black and vscode's syntax highlighter both choke on parsing the following
|
|
# as a decorator, that's why I made a function
|
|
return gv.getitem(key, strict=False).subscribe
|
|
|
|
@_on("#status")
|
|
def _(status):
|
|
self._status = status
|
|
|
|
@_on("#walltime")
|
|
def _(walltime):
|
|
self._walltime = walltime
|
|
|
|
|
|
def readable_duration(t):
|
|
if t < 0.001:
|
|
return "<1ms"
|
|
elif t < 1:
|
|
t = int(t * 1000)
|
|
return f"{t}ms"
|
|
elif t < 10:
|
|
return f"{t:.3f}s"
|
|
elif t < 60:
|
|
return f"{t:.1f}s"
|
|
else:
|
|
s = t % 60
|
|
m = (t // 60) % 60
|
|
if t < 3600:
|
|
return f"{m:.0f}m{s:.0f}s"
|
|
else:
|
|
h = t // 3600
|
|
return f"{h:.0f}h{m:.0f}m{s:.0f}s"
|