parent
a3bbad0060
commit
dab555ea3b
@ -10,6 +10,7 @@ import re
|
||||
import selectors
|
||||
import signal
|
||||
import sys
|
||||
import termios
|
||||
from contextlib import contextmanager
|
||||
from functools import partial
|
||||
from typing import Any, Callable, Dict, Generator, List, NamedTuple, Optional
|
||||
@ -61,8 +62,9 @@ debug = Debug()
|
||||
|
||||
class TermManager:
|
||||
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, optional_actions: int = termios.TCSANOW) -> None:
|
||||
self.extra_finalize: Optional[str] = None
|
||||
self.optional_actions = optional_actions
|
||||
|
||||
def set_state_for_loop(self, set_raw: bool = True) -> None:
|
||||
if set_raw:
|
||||
@ -82,7 +84,7 @@ class TermManager:
|
||||
self.set_state_for_loop()
|
||||
|
||||
def __enter__(self) -> 'TermManager':
|
||||
self.tty_fd, self.original_termios = open_tty()
|
||||
self.tty_fd, self.original_termios = open_tty(False, self.optional_actions)
|
||||
self.set_state_for_loop(set_raw=False)
|
||||
return self
|
||||
|
||||
@ -183,7 +185,8 @@ class Loop:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
sanitize_bracketed_paste: str = '[\x03\x04\x0e\x0f\r\x07\x7f\x8d\x8e\x8f\x90\x9b\x9d\x9e\x9f]'
|
||||
sanitize_bracketed_paste: str = '[\x03\x04\x0e\x0f\r\x07\x7f\x8d\x8e\x8f\x90\x9b\x9d\x9e\x9f]',
|
||||
optional_actions: int = termios.TCSADRAIN
|
||||
):
|
||||
if is_macos:
|
||||
# On macOS PTY devices are not supported by the KqueueSelector and
|
||||
@ -193,6 +196,7 @@ class Loop:
|
||||
else:
|
||||
self.asycio_loop = asyncio.get_event_loop()
|
||||
self.return_code = 0
|
||||
self.optional_actions = optional_actions
|
||||
self.read_buf = ''
|
||||
self.decoder = codecs.getincrementaldecoder('utf-8')('ignore')
|
||||
try:
|
||||
@ -388,7 +392,7 @@ class Loop:
|
||||
handler.on_resize(handler.screen_size)
|
||||
|
||||
signal_manager = SignalManager(self.asycio_loop, _on_sigwinch, handler.on_interrupt, handler.on_term)
|
||||
with TermManager() as term_manager, signal_manager:
|
||||
with TermManager(self.optional_actions) as term_manager, signal_manager:
|
||||
self._get_screen_size: ScreenSizeGetter = screen_size_function(term_manager.tty_fd)
|
||||
image_manager = None
|
||||
if handler.image_manager_class is not None:
|
||||
|
||||
@ -105,8 +105,8 @@ put_tty_in_raw_mode(int fd, const struct termios* termios_p, bool read_with_time
|
||||
|
||||
static PyObject*
|
||||
open_tty(PyObject *self UNUSED, PyObject *args) {
|
||||
int read_with_timeout = 0;
|
||||
if (!PyArg_ParseTuple(args, "|p", &read_with_timeout)) return NULL;
|
||||
int read_with_timeout = 0, optional_actions = TCSAFLUSH;
|
||||
if (!PyArg_ParseTuple(args, "|pi", &read_with_timeout, &optional_actions)) return NULL;
|
||||
int flags = O_RDWR | O_CLOEXEC | O_NOCTTY;
|
||||
if (!read_with_timeout) flags |= O_NONBLOCK;
|
||||
static char ctty[L_ctermid+1];
|
||||
@ -115,13 +115,13 @@ open_tty(PyObject *self UNUSED, PyObject *args) {
|
||||
struct termios *termios_p = calloc(1, sizeof(struct termios));
|
||||
if (!termios_p) return PyErr_NoMemory();
|
||||
if (tcgetattr(fd, termios_p) != 0) { free(termios_p); PyErr_SetFromErrno(PyExc_OSError); return NULL; }
|
||||
if (!put_tty_in_raw_mode(fd, termios_p, read_with_timeout != 0, TCSAFLUSH)) { free(termios_p); return NULL; }
|
||||
if (!put_tty_in_raw_mode(fd, termios_p, read_with_timeout != 0, optional_actions)) { free(termios_p); return NULL; }
|
||||
return Py_BuildValue("iN", fd, PyLong_FromVoidPtr(termios_p));
|
||||
}
|
||||
|
||||
#define TTY_ARGS \
|
||||
PyObject *tp; int fd; int optional_actions = TCSAFLUSH; \
|
||||
if (!PyArg_ParseTuple(args, "iO!|", &fd, &PyLong_Type, &tp, &optional_actions)) return NULL; \
|
||||
if (!PyArg_ParseTuple(args, "iO!|i", &fd, &PyLong_Type, &tp, &optional_actions)) return NULL; \
|
||||
struct termios *termios_p = PyLong_AsVoidPtr(tp);
|
||||
|
||||
static PyObject*
|
||||
|
||||
@ -878,7 +878,7 @@ def normal_tty(fd: int, termios_ptr: TermiosPtr, optional_actions: int = termios
|
||||
pass
|
||||
|
||||
|
||||
def open_tty(read_with_timeout: bool = False) -> Tuple[int, TermiosPtr]:
|
||||
def open_tty(read_with_timeout: bool = False, optional_actions: int = termios.TCSAFLUSH) -> Tuple[int, TermiosPtr]:
|
||||
pass
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user