From 85be3e4ed1cdf973c7fde00f6254bb4ec4f7b90c Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sun, 19 Sep 2021 15:28:22 +0530 Subject: [PATCH] Also avoid mallocs for the output buffer --- kittens/transfer/librsync.py | 20 ++++++++++---------- kittens/transfer/rsync.c | 32 ++++++++++++++------------------ kittens/transfer/rsync.pyi | 4 ++-- 3 files changed, 26 insertions(+), 30 deletions(-) diff --git a/kittens/transfer/librsync.py b/kittens/transfer/librsync.py index 395d9422e..60226188b 100644 --- a/kittens/transfer/librsync.py +++ b/kittens/transfer/librsync.py @@ -16,25 +16,25 @@ if TYPE_CHECKING: class StreamingJob: - def __init__(self, job: 'JobCapsule', expecting_output: bool = True): + def __init__(self, job: 'JobCapsule', output_buf_size: int = IO_BUFFER_SIZE): self.job = job self.finished = False self.prev_unused_input = b'' self.calls_with_no_data = 0 - self.expecting_output = expecting_output + self.output_buf = bytearray(output_buf_size) - def __call__(self, input_data: bytes = b'') -> bytes: + def __call__(self, input_data: bytes = b'') -> memoryview: if self.finished: if input_data: raise RsyncError('There was too much input data') - return b'' + return memoryview(self.output_buf)[:0] no_more_data = not input_data if no_more_data: self.calls_with_no_data += 1 if self.prev_unused_input: input_data = self.prev_unused_input + input_data self.prev_unused_input = b'' - output, self.finished, sz_of_unused_input = iter_job(self.job, input_data, no_more_data, self.expecting_output) + self.finished, sz_of_unused_input, output_size = iter_job(self.job, input_data, self.output_buf) if sz_of_unused_input > 0 and not self.finished: if no_more_data: raise RsyncError(f"{sz_of_unused_input} bytes of input data were not used") @@ -43,13 +43,13 @@ class StreamingJob: self.commit() elif self.calls_with_no_data > 3: raise RsyncError('There was not enough input data') - return output + return memoryview(self.output_buf)[:output_size] def commit(self) -> None: pass -def drive_job_on_file(f: IO[bytes], job: 'JobCapsule') -> Iterator[bytes]: +def drive_job_on_file(f: IO[bytes], job: 'JobCapsule') -> Iterator[memoryview]: sj = StreamingJob(job) input_buf = bytearray(IO_BUFFER_SIZE) while not sj.finished: @@ -57,7 +57,7 @@ def drive_job_on_file(f: IO[bytes], job: 'JobCapsule') -> Iterator[bytes]: yield sj(memoryview(input_buf)[:sz]) -def signature_of_file(path: str) -> Iterator[bytes]: +def signature_of_file(path: str) -> Iterator[memoryview]: with open(path, 'rb') as f: f.seek(0, os.SEEK_END) fsz = f.tell() @@ -70,13 +70,13 @@ class LoadSignature(StreamingJob): def __init__(self) -> None: job, self.signature = begin_load_signature() - super().__init__(job, expecting_output=False) + super().__init__(job, output_buf_size=0) def commit(self) -> None: build_hash_table(self.signature) -def delta_for_file(path: str, sig: 'SignatureCapsule') -> Iterator[bytes]: +def delta_for_file(path: str, sig: 'SignatureCapsule') -> Iterator[memoryview]: job = begin_create_delta(sig) with open(path, 'rb') as f: yield from drive_job_on_file(f, job) diff --git a/kittens/transfer/rsync.c b/kittens/transfer/rsync.c index 35016625f..fdf717930 100644 --- a/kittens/transfer/rsync.c +++ b/kittens/transfer/rsync.c @@ -77,19 +77,17 @@ begin_create_signature(PyObject *self UNUSED, PyObject *args) { static PyObject* iter_job(PyObject *self UNUSED, PyObject *args) { FREE_BUFFER_AFTER_FUNCTION Py_buffer input_buf = {0}; - int eof = -1, expecting_output = 1; - PyObject *job_capsule; - if (!PyArg_ParseTuple(args, "O!y*|pp", &PyCapsule_Type, &job_capsule, &input_buf, &eof, &expecting_output)) return NULL; + FREE_BUFFER_AFTER_FUNCTION Py_buffer output_buf = {0}; + PyObject *job_capsule, *output_array; + if (!PyArg_ParseTuple(args, "O!y*O!", &PyCapsule_Type, &job_capsule, &input_buf, &PyByteArray_Type, &output_array)) return NULL; GET_JOB_FROM_CAPSULE; - if (eof == -1) eof = input_buf.len > 0 ? 0 : 1; + if (PyObject_GetBuffer(output_array, &output_buf, PyBUF_WRITE) != 0) return NULL; + int eof = input_buf.len > 0 ? 0 : 1; rs_buffers_t buffer = { - .avail_in=input_buf.len, .next_in = input_buf.buf, .eof_in=eof, - .avail_out=expecting_output ? IO_BUFFER_SIZE : 64 + .avail_in=input_buf.len, .next_in=input_buf.buf, .eof_in=eof, + .avail_out=output_buf.len, .next_out=output_buf.buf }; - PyObject *ans = PyBytes_FromStringAndSize(NULL, buffer.avail_out); - if (!ans) return NULL; - buffer.next_out = PyBytes_AS_STRING(ans); - size_t output_size = 0; + Py_ssize_t output_size = 0; rs_result result = RS_DONE; while (true) { size_t before = buffer.avail_out; @@ -97,20 +95,18 @@ iter_job(PyObject *self UNUSED, PyObject *args) { output_size += before - buffer.avail_out; if (result == RS_DONE || result == RS_BLOCKED) break; if (buffer.avail_in) { - if (_PyBytes_Resize(&ans, MAX(IO_BUFFER_SIZE, (size_t)PyBytes_GET_SIZE(ans) * 2)) != 0) return NULL; - buffer.avail_out = PyBytes_GET_SIZE(ans) - output_size; - buffer.next_out = PyBytes_AS_STRING(ans) + output_size; + PyBuffer_Release(&output_buf); + if (PyByteArray_Resize(output_array, MAX(IO_BUFFER_SIZE, (size_t)PyByteArray_GET_SIZE(output_array) * 2)) != 0) return NULL; + if (PyObject_GetBuffer(output_array, &output_buf, PyBUF_WRITE) != 0) return NULL; + buffer.avail_out = output_buf.len - output_size; + buffer.next_out = (char*)output_buf.buf + output_size; continue; } - Py_DECREF(ans); PyErr_SetString(RsyncError, rs_strerror(result)); return NULL; } - if ((ssize_t)output_size != PyBytes_GET_SIZE(ans)) { - if (_PyBytes_Resize(&ans, output_size) != 0) return NULL; - } Py_ssize_t unused_input = buffer.avail_in; - return Py_BuildValue("NOn", ans, result == RS_DONE ? Py_True : Py_False, unused_input); + return Py_BuildValue("Onn", result == RS_DONE ? Py_True : Py_False, unused_input, output_size); } static PyObject* diff --git a/kittens/transfer/rsync.pyi b/kittens/transfer/rsync.pyi index 8912dc787..09be9b19b 100644 --- a/kittens/transfer/rsync.pyi +++ b/kittens/transfer/rsync.pyi @@ -1,4 +1,4 @@ -from typing import Callable, Optional, Tuple +from typing import Callable, Tuple IO_BUFFER_SIZE: int @@ -35,5 +35,5 @@ def begin_patch(callback: Callable[[memoryview, int], int]) -> JobCapsule: pass -def iter_job(job_capsule: JobCapsule, input_data: bytes, eof: Optional[bool] = None, expecting_output: bool = True) -> Tuple[bytes, bool, int]: +def iter_job(job_capsule: JobCapsule, input_data: bytes, output_buf: bytearray) -> Tuple[bool, int, int]: pass