Port parsing of CSI codes

This commit is contained in:
Kovid Goyal 2016-11-23 20:22:52 +05:30
parent ce8db74154
commit 2c3893d8a4
4 changed files with 227 additions and 14 deletions

View File

@ -117,7 +117,8 @@ typedef unsigned int index_type;
return result; \
}
#define START_ALLOW_CASE_RANGE _Pragma("GCC diagnostic ignored \"-Wpedantic\"")
#define END_ALLOW_CASE_RANGE _Pragma("GCC diagnostic pop")
typedef struct {
PyObject_HEAD

View File

@ -180,7 +180,8 @@ handle_esc_mode_char(Screen *screen, uint32_t ch, PyObject DUMP_UNUSED *dump_cal
} else {
REPORT_ERROR("Unhandled charset related escape code: 0x%x 0x%x", screen->parser_buf[0], screen->parser_buf[1]);
}
SET_STATE(0); break;
SET_STATE(0);
break;
}
#undef CALL_ED
} // }}}
@ -219,12 +220,156 @@ dispatch_osc(Screen *screen, PyObject DUMP_UNUSED *dump_callback) {
break;
default:
REPORT_ERROR("Unknown OSC code: %u", code);
break;
}
Py_CLEAR(string);
#undef DISPATCH_OSC
}
// }}}
// CSI mode {{{
#define MAX_PARAMS 256
#define CSI_SECONDARY \
case ';': \
case '"': \
case '*': \
case '\'': \
case ' ':
static inline void
screen_cursor_up2(Screen *s, unsigned int count) { screen_cursor_up(s, count, false, -1); }
static inline void
screen_cursor_back1(Screen *s, unsigned int count) { screen_cursor_back(s, count, -1); }
static inline void
dispatch_csi(Screen *screen, PyObject DUMP_UNUSED *dump_callback) {
#define CALL_CSI_HANDLER1(name, defval) \
p1 = num_params > 0 ? params[0] : defval; \
REPORT_COMMAND(name, p1); \
name(screen, p1); \
break;
#define CALL_CSI_HANDLER1P(name, defval, qch) \
p1 = num_params > 0 ? params[0] : defval; \
private = start_modifier == qch; \
REPORT_COMMAND(name, p1, private); \
name(screen, p1, private); \
break;
#define CALL_CSI_HANDLER1M(name, defval) \
p1 = num_params > 0 ? params[0] : defval; \
REPORT_COMMAND(name, p1, end_modifier); \
name(screen, p1, end_modifier); \
break;
#define CALL_CSI_HANDLER2(name, defval1, defval2) \
p1 = num_params > 0 ? params[0] : defval1; \
p2 = num_params > 1 ? params[1] : defval2; \
REPORT_COMMAND(name, p1, p2); \
name(screen, p1, p2); \
break;
#define SET_MODE(func) \
p1 = start_modifier == '?' ? 5 : 0; \
for (i = 0; i < num_params; i++) { \
REPORT_COMMAND(func, params[i] << p1); \
func(screen, params[i] << p1); \
} \
break;
#define CSI_HANDLER_MULTIPLE(name) \
REPORT_COMMAND(name, num_params); \
name(screen, params, num_params); \
break;
char start_modifier = 0, end_modifier = 0;
uint32_t *buf = screen->parser_buf, code = screen->parser_buf[screen->parser_buf_pos];
unsigned int num = screen->parser_buf_pos, start, i, num_params=0, p1, p2;
static unsigned int params[MAX_PARAMS] = {0};
bool private;
if (buf[0] == '>' || buf[0] == '?' || buf[0] == '!') {
start_modifier = (char)screen->parser_buf[0];
buf++; num--;
}
if (num > 0) {
switch(buf[num-1]) {
CSI_SECONDARY
end_modifier = (char)buf[--num];
}
}
for (i=0, start=0; i < num; i++) {
switch(buf[i]) {
IS_DIGIT
break;
default:
if (i > start) params[num_params++] = utoi(buf + start, i - start);
if (num_params >= MAX_PARAMS) { i = num; start = num + 1; }
start = i + 1;
break;
}
}
if (i > start) params[num_params++] = utoi(buf + start, i - start);
switch(code) {
case ICH:
CALL_CSI_HANDLER1(screen_insert_characters, 1);
case CUU:
CALL_CSI_HANDLER1(screen_cursor_up2, 1);
case CUD:
case VPR:
CALL_CSI_HANDLER1(screen_cursor_down, 1);
case CUF:
case HPR:
CALL_CSI_HANDLER1(screen_cursor_forward, 1);
case CUB:
CALL_CSI_HANDLER1(screen_cursor_back1, 1);
case CNL:
CALL_CSI_HANDLER1(screen_cursor_down1, 1);
case CPL:
CALL_CSI_HANDLER1(screen_cursor_up1, 1);
case CHA:
case HPA:
CALL_CSI_HANDLER1(screen_cursor_to_column, 1);
case VPA:
CALL_CSI_HANDLER1(screen_cursor_to_line, 1);
case CUP:
case HVP:
CALL_CSI_HANDLER2(screen_cursor_position, 1, 1);
case ED:
CALL_CSI_HANDLER1P(screen_erase_in_display, 0, '?');
case EL:
CALL_CSI_HANDLER1P(screen_erase_in_line, 0, '?');
case IL:
CALL_CSI_HANDLER1(screen_insert_lines, 1);
case DL:
CALL_CSI_HANDLER1(screen_delete_lines, 1);
case DCH:
CALL_CSI_HANDLER1(screen_delete_characters, 1);
case ECH:
CALL_CSI_HANDLER1(screen_erase_characters, 1);
case DA:
CALL_CSI_HANDLER1P(report_device_attributes, 0, '>');
case TBC:
CALL_CSI_HANDLER1(screen_clear_tab_stop, 0);
case SM:
SET_MODE(screen_set_mode);
case RM:
SET_MODE(screen_reset_mode);
case SGR:
CSI_HANDLER_MULTIPLE(select_graphic_rendition);
case DSR:
CALL_CSI_HANDLER1P(report_device_status, 0, '?');
case DECSTBM:
CALL_CSI_HANDLER2(screen_set_margins, 0, 0);
case DECSCUSR:
CALL_CSI_HANDLER1M(screen_set_cursor, 1);
default:
REPORT_ERROR("Unknown CSI code: 0x%x", code);
}
}
// }}}
// DCS mode {{{
static inline void
dispatch_dcs(Screen *screen) {
@ -269,9 +414,9 @@ accumulate_dcs(Screen *screen, uint32_t ch, PyObject DUMP_UNUSED *dump_callback)
case DEL:
break;
case ESC:
#pragma GCC diagnostic ignored "-Wpedantic"
START_ALLOW_CASE_RANGE
case 32 ... 126:
#pragma GCC diagnostic pop
END_ALLOW_CASE_RANGE
if (screen->parser_buf_pos > 0 && screen->parser_buf[screen->parser_buf_pos-1] == ESC) {
if (ch == '\\') { screen->parser_buf_pos--; return true; }
REPORT_ERROR("DCS sequence contained non-printable character: 0x%x ignoring the sequence", ESC);
@ -289,6 +434,72 @@ accumulate_dcs(Screen *screen, uint32_t ch, PyObject DUMP_UNUSED *dump_callback)
return false;
}
static inline bool
accumulate_csi(Screen *screen, uint32_t ch, PyObject DUMP_UNUSED *dump_callback) {
#define ENSURE_SPACE \
if (screen->parser_buf_pos > PARSER_BUF_SZ - 1) { \
REPORT_ERROR("CSI sequence too long, ignoring"); \
SET_STATE(0); \
return false; \
}
switch(ch) {
IS_DIGIT
CSI_SECONDARY
ENSURE_SPACE;
screen->parser_buf[screen->parser_buf_pos++] = ch;
break;
case '?':
case '>':
case '!':
if (screen->parser_buf_pos != 0) {
REPORT_ERROR("Invalid character in CSI: 0x%x, ignoring the sequence", ch);
SET_STATE(0);
return false;
}
ENSURE_SPACE;
screen->parser_buf[screen->parser_buf_pos++] = ch;
break;
START_ALLOW_CASE_RANGE
case 'a' ... 'z':
case 'A' ... 'Z':
END_ALLOW_CASE_RANGE
case '@':
case '`':
case '{':
case '|':
case '}':
case '~':
screen->parser_buf[screen->parser_buf_pos] = ch;
return true;
case BEL:
case BS:
case HT:
case LF:
case VT:
case FF:
case NEL:
case CR:
case SO:
case SI:
case IND:
case RI:
case HTS:
handle_normal_mode_char(screen, ch, dump_callback);
break;
case NUL:
case DEL:
break; // no-op
default:
REPORT_ERROR("Invalid character in CSI: 0x%x, ignoring the sequence", ch);
SET_STATE(0);
return false;
}
return false;
#undef ENSURE_SPACE
}
static inline void
_parse_bytes(Screen *screen, uint8_t *buf, Py_ssize_t len, PyObject DUMP_UNUSED *dump_callback) {
#define HANDLE(name) handle_##name(screen, codepoint, dump_callback); break
@ -299,8 +510,9 @@ _parse_bytes(Screen *screen, uint8_t *buf, Py_ssize_t len, PyObject DUMP_UNUSED
switch(screen->parser_state) {
case ESC:
HANDLE(esc_mode_char);
/* case CSI_STATE: */
/* CALL_HANDLER(csi); */
case CSI:
if (accumulate_csi(screen, codepoint, dump_callback)) { dispatch_csi(screen, dump_callback); SET_STATE(0); }
break;
case OSC:
if (accumulate_osc(screen, codepoint, dump_callback)) { dispatch_osc(screen, dump_callback); SET_STATE(0); }
break;

View File

@ -241,7 +241,7 @@ void select_graphic_rendition(Screen *self, unsigned int *params, unsigned int c
self->cursor->reverse = false; break;
case 29:
self->cursor->strikethrough = false; break;
#pragma GCC diagnostic ignored "-Wpedantic"
START_ALLOW_CASE_RANGE
case 30 ... 37:
case 39:
case 90 ... 97:
@ -249,7 +249,7 @@ void select_graphic_rendition(Screen *self, unsigned int *params, unsigned int c
case 40 ... 47:
case 49:
case 100 ... 107:
#pragma GCC diagnostic pop
END_ALLOW_CASE_RANGE
self->cursor->bg = (attr << 8) | 1; break;
case 38:
SET_COLOR(fg);

View File

@ -151,7 +151,7 @@ class TestParser(BaseTest):
pb = partial(self.parse_bytes_dump, s)
c = Callbacks()
s.callbacks = c
pb(b'a\033]2;xyz\x07bcde', 'a', ('set_title', 'xyz'), 'bcde')
pb('a\033]2;xyz\x9cbcde', 'a', ('set_title', 'xyz'), 'bcde')
self.ae(str(s.line(0)), 'abcde')
self.ae(c.titlebuf, 'xyz')
c.clear()
@ -165,8 +165,8 @@ class TestParser(BaseTest):
pb('\033]110\x07', ('set_dynamic_color', ''))
self.ae(c.colorbuf, '')
# def test_dcs_codes(self):
# s = self.create_screen()
# pb = partial(self.parse_bytes_dump, s)
# pb(b'a\033P2;xyz\x9cbcde', 'a', 'bcde')
# self.ae(str(s.line(0)), 'abcde')
def test_dcs_codes(self):
s = self.create_screen()
pb = partial(self.parse_bytes_dump, s)
pb('a\033P2;xyz\x9cbcde', 'abcde')
self.ae(str(s.line(0)), 'abcde')