Also avoid mallocs for the output buffer

This commit is contained in:
Kovid Goyal 2021-09-19 15:28:22 +05:30
parent d5d52ec8b9
commit 85be3e4ed1
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 26 additions and 30 deletions

View File

@ -16,25 +16,25 @@ if TYPE_CHECKING:
class StreamingJob:
def __init__(self, job: 'JobCapsule', expecting_output: bool = True):
def __init__(self, job: 'JobCapsule', output_buf_size: int = IO_BUFFER_SIZE):
self.job = job
self.finished = False
self.prev_unused_input = b''
self.calls_with_no_data = 0
self.expecting_output = expecting_output
self.output_buf = bytearray(output_buf_size)
def __call__(self, input_data: bytes = b'') -> bytes:
def __call__(self, input_data: bytes = b'') -> memoryview:
if self.finished:
if input_data:
raise RsyncError('There was too much input data')
return b''
return memoryview(self.output_buf)[:0]
no_more_data = not input_data
if no_more_data:
self.calls_with_no_data += 1
if self.prev_unused_input:
input_data = self.prev_unused_input + input_data
self.prev_unused_input = b''
output, self.finished, sz_of_unused_input = iter_job(self.job, input_data, no_more_data, self.expecting_output)
self.finished, sz_of_unused_input, output_size = iter_job(self.job, input_data, self.output_buf)
if sz_of_unused_input > 0 and not self.finished:
if no_more_data:
raise RsyncError(f"{sz_of_unused_input} bytes of input data were not used")
@ -43,13 +43,13 @@ class StreamingJob:
self.commit()
elif self.calls_with_no_data > 3:
raise RsyncError('There was not enough input data')
return output
return memoryview(self.output_buf)[:output_size]
def commit(self) -> None:
pass
def drive_job_on_file(f: IO[bytes], job: 'JobCapsule') -> Iterator[bytes]:
def drive_job_on_file(f: IO[bytes], job: 'JobCapsule') -> Iterator[memoryview]:
sj = StreamingJob(job)
input_buf = bytearray(IO_BUFFER_SIZE)
while not sj.finished:
@ -57,7 +57,7 @@ def drive_job_on_file(f: IO[bytes], job: 'JobCapsule') -> Iterator[bytes]:
yield sj(memoryview(input_buf)[:sz])
def signature_of_file(path: str) -> Iterator[bytes]:
def signature_of_file(path: str) -> Iterator[memoryview]:
with open(path, 'rb') as f:
f.seek(0, os.SEEK_END)
fsz = f.tell()
@ -70,13 +70,13 @@ class LoadSignature(StreamingJob):
def __init__(self) -> None:
job, self.signature = begin_load_signature()
super().__init__(job, expecting_output=False)
super().__init__(job, output_buf_size=0)
def commit(self) -> None:
build_hash_table(self.signature)
def delta_for_file(path: str, sig: 'SignatureCapsule') -> Iterator[bytes]:
def delta_for_file(path: str, sig: 'SignatureCapsule') -> Iterator[memoryview]:
job = begin_create_delta(sig)
with open(path, 'rb') as f:
yield from drive_job_on_file(f, job)

View File

@ -77,19 +77,17 @@ begin_create_signature(PyObject *self UNUSED, PyObject *args) {
static PyObject*
iter_job(PyObject *self UNUSED, PyObject *args) {
FREE_BUFFER_AFTER_FUNCTION Py_buffer input_buf = {0};
int eof = -1, expecting_output = 1;
PyObject *job_capsule;
if (!PyArg_ParseTuple(args, "O!y*|pp", &PyCapsule_Type, &job_capsule, &input_buf, &eof, &expecting_output)) return NULL;
FREE_BUFFER_AFTER_FUNCTION Py_buffer output_buf = {0};
PyObject *job_capsule, *output_array;
if (!PyArg_ParseTuple(args, "O!y*O!", &PyCapsule_Type, &job_capsule, &input_buf, &PyByteArray_Type, &output_array)) return NULL;
GET_JOB_FROM_CAPSULE;
if (eof == -1) eof = input_buf.len > 0 ? 0 : 1;
if (PyObject_GetBuffer(output_array, &output_buf, PyBUF_WRITE) != 0) return NULL;
int eof = input_buf.len > 0 ? 0 : 1;
rs_buffers_t buffer = {
.avail_in=input_buf.len, .next_in = input_buf.buf, .eof_in=eof,
.avail_out=expecting_output ? IO_BUFFER_SIZE : 64
.avail_in=input_buf.len, .next_in=input_buf.buf, .eof_in=eof,
.avail_out=output_buf.len, .next_out=output_buf.buf
};
PyObject *ans = PyBytes_FromStringAndSize(NULL, buffer.avail_out);
if (!ans) return NULL;
buffer.next_out = PyBytes_AS_STRING(ans);
size_t output_size = 0;
Py_ssize_t output_size = 0;
rs_result result = RS_DONE;
while (true) {
size_t before = buffer.avail_out;
@ -97,20 +95,18 @@ iter_job(PyObject *self UNUSED, PyObject *args) {
output_size += before - buffer.avail_out;
if (result == RS_DONE || result == RS_BLOCKED) break;
if (buffer.avail_in) {
if (_PyBytes_Resize(&ans, MAX(IO_BUFFER_SIZE, (size_t)PyBytes_GET_SIZE(ans) * 2)) != 0) return NULL;
buffer.avail_out = PyBytes_GET_SIZE(ans) - output_size;
buffer.next_out = PyBytes_AS_STRING(ans) + output_size;
PyBuffer_Release(&output_buf);
if (PyByteArray_Resize(output_array, MAX(IO_BUFFER_SIZE, (size_t)PyByteArray_GET_SIZE(output_array) * 2)) != 0) return NULL;
if (PyObject_GetBuffer(output_array, &output_buf, PyBUF_WRITE) != 0) return NULL;
buffer.avail_out = output_buf.len - output_size;
buffer.next_out = (char*)output_buf.buf + output_size;
continue;
}
Py_DECREF(ans);
PyErr_SetString(RsyncError, rs_strerror(result));
return NULL;
}
if ((ssize_t)output_size != PyBytes_GET_SIZE(ans)) {
if (_PyBytes_Resize(&ans, output_size) != 0) return NULL;
}
Py_ssize_t unused_input = buffer.avail_in;
return Py_BuildValue("NOn", ans, result == RS_DONE ? Py_True : Py_False, unused_input);
return Py_BuildValue("Onn", result == RS_DONE ? Py_True : Py_False, unused_input, output_size);
}
static PyObject*

View File

@ -1,4 +1,4 @@
from typing import Callable, Optional, Tuple
from typing import Callable, Tuple
IO_BUFFER_SIZE: int
@ -35,5 +35,5 @@ def begin_patch(callback: Callable[[memoryview, int], int]) -> JobCapsule:
pass
def iter_job(job_capsule: JobCapsule, input_data: bytes, eof: Optional[bool] = None, expecting_output: bool = True) -> Tuple[bytes, bool, int]:
def iter_job(job_capsule: JobCapsule, input_data: bytes, output_buf: bytearray) -> Tuple[bool, int, int]:
pass