urban-investment-research/jurigged/loop/basic.py

143 lines
4.1 KiB
Python

import re
import select
import sys
import termios
import traceback
import tty
from contextlib import contextmanager
from functools import partial
from .develoop import Abort, DeveloopRunner
ANSI_ESCAPE = re.compile(r"\x1b\[[;\d]*[A-Za-z]")
ANSI_ESCAPE_INNER = re.compile(r"[\x1b\[;\d]")
ANSI_ESCAPE_END = re.compile(r"[A-Za-z~]")
@contextmanager
def cbreak():
old_attrs = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin)
try:
yield
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attrs)
def read_chars():
esc = None
try:
while True:
ready, _, _ = select.select([sys.stdin], [], [], 0.02)
if ready:
# Sometimes, e.g. when pressing an up arrow, multiple
# characters are buffered, and read1() is the only way
# I found to read precisely what was buffered. select
# seems unreliable in these cases, probably because the
# buffer fools it into thinking there is nothing else
# to read. So read(1) would leave some characters dangling
# in the buffer until the next keypress.
for ch in sys.stdin.buffer.read1():
ch = chr(ch)
if esc is not None:
if ANSI_ESCAPE_INNER.match(ch):
esc += ch
elif ANSI_ESCAPE_END.match(ch):
yield {"char": esc + ch, "escape": True}
esc = None
else:
yield {"char": esc, "escape": True}
esc = None
yield {"char": ch}
elif ch == "\x1b":
esc = ""
else:
yield {"char": ch}
except Abort:
pass
class BasicDeveloopRunner(DeveloopRunner):
def __init__(self, fn, args, kwargs):
super().__init__(fn, args, kwargs)
self._status = "running"
self._walltime = 0
def _pad(self, text, total):
text = f"#{self.num}: {text}"
rest = total - len(text) - 6
return f"---- {text} " + "-" * rest
def _finish(self, status, result):
print(self._pad(status, 50))
if status == "ERROR":
traceback.print_exception(
type(result), result, result.__traceback__
)
else:
print(f"{result}")
footer = [
"(c)ontinue",
"(r)erun",
"(q)uit",
]
print(self._pad(" | ".join(footer), 50))
with cbreak():
for c in read_chars():
if c["char"] == "c":
self.command("cont")()
break
elif c["char"] == "r":
self.command("go")()
break
elif c["char"] == "q":
self.command("quit")()
break
def register_updates(self, gv):
print(self._pad(self.signature(), 50))
gv["?#result"] >> partial(self._finish, "RESULT")
gv["?#error"] >> partial(self._finish, "ERROR")
gv.filter(
lambda d: not any(
k.startswith("#") and not k.startswith("$") for k in d.keys()
)
).display()
def _on(key):
# black and vscode's syntax highlighter both choke on parsing the following
# as a decorator, that's why I made a function
return gv.getitem(key, strict=False).subscribe
@_on("#status")
def _(status):
self._status = status
@_on("#walltime")
def _(walltime):
self._walltime = walltime
def readable_duration(t):
if t < 0.001:
return "<1ms"
elif t < 1:
t = int(t * 1000)
return f"{t}ms"
elif t < 10:
return f"{t:.3f}s"
elif t < 60:
return f"{t:.1f}s"
else:
s = t % 60
m = (t // 60) % 60
if t < 3600:
return f"{m:.0f}m{s:.0f}s"
else:
h = t // 3600
return f"{h:.0f}h{m:.0f}m{s:.0f}s"