From 77508bfe0dc351c47f05d72e2b2b73d970297a4e Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sat, 18 Sep 2021 13:08:53 +0530 Subject: [PATCH] Handle blocking with unused input in librsync --- kittens/transfer/librsync.py | 13 +++++++++++-- kittens/transfer/rsync.c | 6 +++--- kittens/transfer/rsync.pyi | 2 +- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/kittens/transfer/librsync.py b/kittens/transfer/librsync.py index ad72693c0..0b29ef6d9 100644 --- a/kittens/transfer/librsync.py +++ b/kittens/transfer/librsync.py @@ -5,7 +5,7 @@ import os from typing import Iterator -from .rsync import IO_BUFFER_SIZE, begin_create_signature, iter_job +from .rsync import IO_BUFFER_SIZE, RsyncError, begin_create_signature, iter_job def signature_of_file(path: str) -> Iterator[bytes]: @@ -15,7 +15,16 @@ def signature_of_file(path: str) -> Iterator[bytes]: job = begin_create_signature(fsz) f.seek(0) finished = False + prev_unused_input = b'' while not finished: input_data = f.read(IO_BUFFER_SIZE) - output, finished = iter_job(job, input_data) + no_more_data = not input_data + if prev_unused_input: + input_data = prev_unused_input + input_data + prev_unused_input = b'' + output, finished, sz_of_unused_input = iter_job(job, input_data) + if sz_of_unused_input > 0 and not finished: + if no_more_data: + raise RsyncError(f"{sz_of_unused_input} bytes of input data were not used") + prev_unused_input = input_data[-sz_of_unused_input:] yield output diff --git a/kittens/transfer/rsync.c b/kittens/transfer/rsync.c index 1776d4389..6beae864a 100644 --- a/kittens/transfer/rsync.c +++ b/kittens/transfer/rsync.c @@ -69,14 +69,13 @@ iter_job(PyObject *self UNUSED, PyObject *args) { size_t before = buffer.avail_out; result = rs_job_iter(job, &buffer); output_size += before - buffer.avail_out; - if (result == RS_DONE) break; + if (result == RS_DONE || result == RS_BLOCKED) break; if (buffer.avail_in) { if (_PyBytes_Resize(&ans, PyBytes_GET_SIZE(ans) * 2) != 0) return NULL; buffer.avail_out = PyBytes_GET_SIZE(ans) - output_size; buffer.next_out = PyBytes_AS_STRING(ans) + output_size; continue; } - if (result == RS_BLOCKED) break; Py_DECREF(ans); PyErr_SetString(RsyncError, rs_strerror(result)); return NULL; @@ -84,7 +83,8 @@ iter_job(PyObject *self UNUSED, PyObject *args) { if ((ssize_t)output_size != PyBytes_GET_SIZE(ans)) { if (_PyBytes_Resize(&ans, output_size) != 0) return NULL; } - return Py_BuildValue("NO", ans, result == RS_DONE ? Py_True : Py_False); + Py_ssize_t unused_input = buffer.avail_in; + return Py_BuildValue("NOn", ans, result == RS_DONE ? Py_True : Py_False, unused_input); } static PyObject* diff --git a/kittens/transfer/rsync.pyi b/kittens/transfer/rsync.pyi index e9d904db7..bd3a9302f 100644 --- a/kittens/transfer/rsync.pyi +++ b/kittens/transfer/rsync.pyi @@ -27,5 +27,5 @@ def make_hash_table(sig: SignatureCapsule) -> None: pass -def iter_job(job_capsule: JobCapsule, input_data: bytes, eof: Optional[bool] = None) -> Tuple[bytes, bool]: +def iter_job(job_capsule: JobCapsule, input_data: bytes, eof: Optional[bool] = None) -> Tuple[bytes, bool, int]: pass