From 5729e33412dd29a8d19e545d4ac4ddaf2a7b4e5d Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sat, 2 Oct 2021 09:26:35 +0530 Subject: [PATCH] librsync actually blocks on output buffer size as well as input availability. So handle that --- kittens/transfer/librsync.py | 73 +++++++++++++++++++------------- kittens/transfer/rsync.c | 27 ++++-------- kittens/transfer/send.py | 3 +- kitty_tests/file_transmission.py | 44 +++++++++++-------- 4 files changed, 80 insertions(+), 67 deletions(-) diff --git a/kittens/transfer/librsync.py b/kittens/transfer/librsync.py index e8f6616c9..ad05e36be 100644 --- a/kittens/transfer/librsync.py +++ b/kittens/transfer/librsync.py @@ -4,7 +4,7 @@ import os import tempfile -from typing import IO, TYPE_CHECKING, Iterator +from typing import IO, TYPE_CHECKING, Iterator, Union from .rsync import ( IO_BUFFER_SIZE, RsyncError, begin_create_delta, begin_create_signature, @@ -22,34 +22,41 @@ class StreamingJob: def __init__(self, job: 'JobCapsule', output_buf_size: int = IO_BUFFER_SIZE): self.job = job self.finished = False - self.prev_unused_input = b'' self.calls_with_no_data = 0 self.output_buf = bytearray(output_buf_size) + self.uncomsumed_data = b'' - def __call__(self, input_data: bytes = b'') -> memoryview: + def __call__(self, input_data: Union[memoryview, bytes] = b'') -> Iterator[memoryview]: if self.finished: if input_data: raise RsyncError('There was too much input data') return memoryview(self.output_buf)[:0] - no_more_data = not input_data - if self.prev_unused_input: - input_data = self.prev_unused_input + input_data - self.prev_unused_input = b'' - self.finished, sz_of_unused_input, output_size = iter_job(self.job, input_data, self.output_buf) - if sz_of_unused_input > 0 and not self.finished: - if no_more_data: - raise RsyncError(f"{sz_of_unused_input} bytes of input data were not used") - self.prev_unused_input = bytes(input_data[-sz_of_unused_input:]) - if self.finished: - self.commit() - if no_more_data and not output_size: - self.calls_with_no_data += 1 - if self.calls_with_no_data > 3: # prevent infinite loop - raise RsyncError('There was not enough input data') - return memoryview(self.output_buf)[:output_size] + if self.uncomsumed_data: + input_data = self.uncomsumed_data + bytes(input_data) + self.uncomsumed_data = b'' + while True: + self.finished, sz_of_unused_input, output_size = iter_job(self.job, input_data, self.output_buf) + if output_size: + yield memoryview(self.output_buf)[:output_size] + if self.finished: + break + if not sz_of_unused_input and len(input_data): + break + consumed_some_input = sz_of_unused_input < len(input_data) + produced_some_output = output_size > 0 + if not consumed_some_input and not produced_some_output: + break + input_data = memoryview(input_data)[-sz_of_unused_input:] + if sz_of_unused_input: + self.uncomsumed_data = bytes(input_data[-sz_of_unused_input:]) - def commit(self) -> None: - pass + def get_remaining_output(self) -> Iterator[memoryview]: + if not self.finished: + yield from self() + if not self.finished: + raise RsyncError('Insufficient input data') + if self.uncomsumed_data: + raise RsyncError(f'{len(self.uncomsumed_data)} bytes if unconsumed input data') def drive_job_on_file(f: IO[bytes], job: 'JobCapsule', input_buf_size: int = IO_BUFFER_SIZE, output_buf_size: int = IO_BUFFER_SIZE) -> Iterator[memoryview]: @@ -57,9 +64,11 @@ def drive_job_on_file(f: IO[bytes], job: 'JobCapsule', input_buf_size: int = IO_ input_buf = bytearray(input_buf_size) while not sj.finished: sz = f.readinto(input_buf) # type: ignore - result = sj(memoryview(input_buf)[:sz]) - if len(result) > 0: - yield result + if not sz: + del input_buf + yield from sj.get_remaining_output() + break + yield from sj(memoryview(input_buf)[:sz]) def signature_of_file(path: str) -> Iterator[memoryview]: @@ -83,7 +92,13 @@ class LoadSignature(StreamingJob): job, self.signature = begin_load_signature() super().__init__(job, output_buf_size=0) + def add_chunk(self, chunk: bytes) -> None: + for ignored in self(chunk): + pass + def commit(self) -> None: + for ignored in self.get_remaining_output(): + pass build_hash_table(self.signature) @@ -115,6 +130,7 @@ class PatchFile(StreamingJob): def close(self) -> None: if not self.src_file.closed: + self.get_remaining_output() self.src_file.close() count = 100 while not self.finished: @@ -127,8 +143,7 @@ class PatchFile(StreamingJob): os.replace(self.dest_file.name, self.src_file.name) def write(self, data: bytes) -> None: - output = self(data) - if output: + for output in self(data): self.dest_file.write(output) def __enter__(self) -> 'PatchFile': @@ -144,12 +159,10 @@ def develop() -> None: sig_loader = LoadSignature() with open(src + '.sig', 'wb') as f: for chunk in signature_of_file(src): - sig_loader(chunk) + sig_loader.add_chunk(chunk) f.write(chunk) - sig_loader() + sig_loader.commit() with open(src + '.delta', 'wb') as f, PatchFile(src, src + '.output') as patcher: for chunk in delta_for_file(src, sig_loader.signature): f.write(chunk) patcher.write(chunk) - if not patcher.finished: - patcher.write(b'') diff --git a/kittens/transfer/rsync.c b/kittens/transfer/rsync.c index 58f5116a1..5bd1e9df4 100644 --- a/kittens/transfer/rsync.c +++ b/kittens/transfer/rsync.c @@ -89,26 +89,15 @@ iter_job(PyObject *self UNUSED, PyObject *args) { .avail_in=input_buf.len, .next_in=input_buf.buf, .eof_in=eof, .avail_out=output_buf.len, .next_out=output_buf.buf }; - Py_ssize_t output_size = 0; - rs_result result = RS_DONE; - while (true) { - size_t before = buffer.avail_out; - result = rs_job_iter(job, &buffer); - output_size += before - buffer.avail_out; - if (result == RS_DONE || result == RS_BLOCKED) break; - if (buffer.avail_in) { - PyBuffer_Release(&output_buf); - if (PyByteArray_Resize(output_array, MAX(IO_BUFFER_SIZE, (size_t)PyByteArray_GET_SIZE(output_array) * 2)) != 0) return NULL; - if (PyObject_GetBuffer(output_array, &output_buf, PyBUF_WRITE) != 0) return NULL; - buffer.avail_out = output_buf.len - output_size; - buffer.next_out = (char*)output_buf.buf + output_size; - continue; - } - PyErr_SetString(RsyncError, rs_strerror(result)); - return NULL; + size_t before = buffer.avail_out; + rs_result result = rs_job_iter(job, &buffer); + Py_ssize_t output_size = before - buffer.avail_out; + if (result == RS_DONE || result == RS_BLOCKED) { + Py_ssize_t unused_input = buffer.avail_in; + return Py_BuildValue("Onn", result == RS_DONE ? Py_True : Py_False, unused_input, output_size); } - Py_ssize_t unused_input = buffer.avail_in; - return Py_BuildValue("Onn", result == RS_DONE ? Py_True : Py_False, unused_input, output_size); + PyErr_SetString(RsyncError, rs_strerror(result)); + return NULL; } static PyObject* diff --git a/kittens/transfer/send.py b/kittens/transfer/send.py index a5606b11a..2a0d2cdb6 100644 --- a/kittens/transfer/send.py +++ b/kittens/transfer/send.py @@ -392,8 +392,9 @@ class SendManager: return sl = file.signature_loader assert sl is not None - sl(ftc.data) + sl.add_chunk(ftc.data) if ftc.action is Action.end_data: + sl.commit() file.start_delta_calculation() self.update_collective_statuses() diff --git a/kitty_tests/file_transmission.py b/kitty_tests/file_transmission.py index 11b1bb6d5..9aded2f66 100644 --- a/kitty_tests/file_transmission.py +++ b/kitty_tests/file_transmission.py @@ -91,28 +91,38 @@ class TestFileTransmission(BaseTest): a_path = os.path.join(self.tdir, 'a') b_path = os.path.join(self.tdir, 'b') c_path = os.path.join(self.tdir, 'c') + + def files_equal(a_path, c_path): + self.ae(os.path.getsize(a_path), os.path.getsize(b_path)) + with open(b_path, 'rb') as b, open(c_path, 'rb') as c: + self.ae(b.read(), c.read()) + + def patch(old_path, new_path, output_path, max_delta_len=0): + sig_loader = LoadSignature() + for chunk in signature_of_file(old_path): + sig_loader.add_chunk(chunk) + sig_loader.commit() + self.assertTrue(sig_loader.finished) + delta_len = 0 + with PatchFile(old_path, output_path) as patcher: + for chunk in delta_for_file(new_path, sig_loader.signature): + self.assertFalse(patcher.finished) + patcher.write(chunk) + delta_len += len(chunk) + self.assertTrue(patcher.finished) + if max_delta_len: + self.assertLessEqual(delta_len, max_delta_len) + files_equal(output_path, new_path) + sz = 1024 * 1024 + 37 with open(a_path, 'wb') as f: f.write(os.urandom(sz)) with open(b_path, 'wb') as f: f.write(os.urandom(sz)) - sig_loader = LoadSignature() - for chunk in signature_of_file(a_path): - sig_loader(chunk) - sig_loader() - self.assertTrue(sig_loader.finished) - with PatchFile(a_path, c_path) as patcher: - for chunk in delta_for_file(b_path, sig_loader.signature): - self.assertFalse(patcher.finished) - patcher.write(chunk) - self.assertTrue(patcher.finished) - with open(b_path, 'rb') as b, open(c_path, 'rb') as c: - while True: - bc = b.read(4096) - cc = c.read(4096) - self.ae(bc, cc) - if not bc and not cc: - break + + patch(a_path, b_path, c_path) + # test size of delta + patch(a_path, a_path, c_path, max_delta_len=256) def test_file_put(self): # send refusal