diff --git a/kittens/transfer/librsync.py b/kittens/transfer/librsync.py index 4a3f6674f..d0b7291c9 100644 --- a/kittens/transfer/librsync.py +++ b/kittens/transfer/librsync.py @@ -3,30 +3,57 @@ # License: GPLv3 Copyright: 2021, Kovid Goyal import os -from typing import IO, Generator, Iterator, Optional +from typing import IO, TYPE_CHECKING, Iterator from .rsync import ( - IO_BUFFER_SIZE, JobCapsule, RsyncError, SignatureCapsule, - begin_create_delta, begin_create_signature, begin_load_signature, iter_job, - make_hash_table + IO_BUFFER_SIZE, RsyncError, begin_create_delta, begin_create_signature, + begin_load_signature, begin_patch, iter_job, build_hash_table ) +if TYPE_CHECKING: + from .rsync import JobCapsule, SignatureCapsule -def drive_job_on_file(f: IO[bytes], job: JobCapsule) -> Iterator[bytes]: - finished = False - prev_unused_input = b'' - while not finished: - input_data = f.read(IO_BUFFER_SIZE) + +class StreamingJob: + + def __init__(self, job: 'JobCapsule', expecting_output: bool = True): + self.job = job + self.finished = False + self.prev_unused_input = b'' + self.calls_with_no_data = 0 + self.expecting_output = expecting_output + + def __call__(self, input_data: bytes = b'') -> bytes: + if self.finished: + if input_data: + raise RsyncError('There was too much input data') + return b'' no_more_data = not input_data - if prev_unused_input: - input_data = prev_unused_input + input_data - prev_unused_input = b'' - output, finished, sz_of_unused_input = iter_job(job, input_data, no_more_data) - if sz_of_unused_input > 0 and not finished: + if no_more_data: + self.calls_with_no_data += 1 + if self.prev_unused_input: + input_data = self.prev_unused_input + input_data + self.prev_unused_input = b'' + output, self.finished, sz_of_unused_input = iter_job(self.job, input_data, no_more_data, self.expecting_output) + if sz_of_unused_input > 0 and not self.finished: if no_more_data: raise RsyncError(f"{sz_of_unused_input} bytes of input data were not used") - prev_unused_input = input_data[-sz_of_unused_input:] - yield output + self.prev_unused_input = input_data[-sz_of_unused_input:] + if self.finished: + self.commit() + elif self.calls_with_no_data > 3: + raise RsyncError('There was not enough input data') + return output + + def commit(self) -> None: + pass + + +def drive_job_on_file(f: IO[bytes], job: 'JobCapsule') -> Iterator[bytes]: + sj = StreamingJob(job) + while not sj.finished: + input_data = f.read(IO_BUFFER_SIZE) + yield sj(input_data) def signature_of_file(path: str) -> Iterator[bytes]: @@ -38,26 +65,34 @@ def signature_of_file(path: str) -> Iterator[bytes]: yield from drive_job_on_file(f, job) -def load_signature() -> Generator[Optional[SignatureCapsule], bytes, None]: - job, signature = begin_load_signature() - finished = False - prev_unused_input = b'' - while not finished: - input_data = yield None - no_more_data = not input_data - if prev_unused_input: - input_data = prev_unused_input + input_data - prev_unused_input = b'' - output, finished, sz_of_unused_input = iter_job(job, input_data, no_more_data, False) - if sz_of_unused_input > 0 and not finished: - if no_more_data: - raise RsyncError(f"{sz_of_unused_input} bytes of input data were not used") - prev_unused_input = input_data[-sz_of_unused_input:] - make_hash_table(signature) - yield signature +class LoadSignature(StreamingJob): + + def __init__(self) -> None: + job, self.signature = begin_load_signature() + super().__init__(job, expecting_output=False) + + def commit(self) -> None: + build_hash_table(self.signature) -def delta_for_file(path: str, sig: SignatureCapsule) -> Iterator[bytes]: +def delta_for_file(path: str, sig: 'SignatureCapsule') -> Iterator[bytes]: job = begin_create_delta(sig) with open(path, 'rb') as f: yield from drive_job_on_file(f, job) + + +class PatchFile(StreamingJob): + + def __init__(self, src_path: str): + self.src_file = open(src_path, 'rb') + job = begin_patch(self.read_from_src) + super().__init__(job) + + def read_from_src(self, b: memoryview, pos: int) -> int: + self.src_file.seek(pos) + return self.src_file.readinto(b) # type: ignore + + def close(self) -> None: + if not self.src_file.closed: + self.src_file.close() + commit = close diff --git a/kittens/transfer/rsync.c b/kittens/transfer/rsync.c index 06d56b339..61bc60837 100644 --- a/kittens/transfer/rsync.c +++ b/kittens/transfer/rsync.c @@ -8,15 +8,20 @@ #include "data-types.h" #include -#define JOB_CAPSULE "rs_job_t" #define SIGNATURE_CAPSULE "rs_signature_t" +#define JOB_WITH_CALLBACK_CAPSULE "rs_callback_job_t" +// See whole.c in the librsync source code for estimating IO_BUFFER_SIZE #define IO_BUFFER_SIZE (64u * 1024u) static PyObject *RsyncError = NULL; static void -free_job_capsule(PyObject *capsule) { - rs_job_t *job = PyCapsule_GetPointer(capsule, JOB_CAPSULE); - if (job) rs_job_free(job); +free_job_with_callback_capsule(PyObject *capsule) { + if (PyCapsule_IsValid(capsule, JOB_WITH_CALLBACK_CAPSULE)) { + void *job = PyCapsule_GetPointer(capsule, JOB_WITH_CALLBACK_CAPSULE); + if (job && job != RsyncError) rs_job_free(job); + PyObject *callback = PyCapsule_GetContext(capsule); + Py_CLEAR(callback); + } } static void @@ -25,6 +30,25 @@ free_sig_capsule(PyObject *capsule) { if (sig) rs_free_sumset(sig); } +#define CREATE_JOB(func, cb, ...) \ + PyObject *job_capsule = PyCapsule_New(RsyncError, JOB_WITH_CALLBACK_CAPSULE, free_job_with_callback_capsule); \ + if (job_capsule) { \ + rs_job_t *job = func(__VA_ARGS__); \ + if (job) { \ + if (PyCapsule_SetPointer(job_capsule, job) == 0) { \ + if (cb) { \ + if (PyCapsule_SetContext(job_capsule, cb) == 0) { Py_INCREF(cb); } \ + else { Py_CLEAR(job_capsule); } \ + } \ + } else { \ + rs_job_free(job); Py_CLEAR(job_capsule); \ + } \ + } else { \ + Py_CLEAR(job_capsule); \ + } \ + } + + static PyObject* begin_create_signature(PyObject *self UNUSED, PyObject *args) { long long file_size = -1; @@ -39,16 +63,14 @@ begin_create_signature(PyObject *self UNUSED, PyObject *args) { return NULL; } #endif - rs_job_t *job = rs_sig_begin(block_len, strong_len, magic_number); - if (!job) return PyErr_NoMemory(); - PyObject *ans = PyCapsule_New(job, JOB_CAPSULE, free_job_capsule); - if (!ans) rs_job_free(job); - return ans; + CREATE_JOB(rs_sig_begin, NULL, block_len, strong_len, magic_number); + return job_capsule; } #define GET_JOB_FROM_CAPSULE \ - rs_job_t *job = PyCapsule_GetPointer(job_capsule, JOB_CAPSULE); \ - if (!job) { PyErr_SetString(PyExc_TypeError, "Not a job capsule"); return NULL; } + rs_job_t *job = PyCapsule_GetPointer(job_capsule, JOB_WITH_CALLBACK_CAPSULE); \ + if (!job) { PyErr_SetString(PyExc_TypeError, "Not a job capsule"); return NULL; } \ + static PyObject* iter_job(PyObject *self UNUSED, PyObject *args) { @@ -56,12 +78,12 @@ iter_job(PyObject *self UNUSED, PyObject *args) { char *input_data; int eof = -1, expecting_output = 1; PyObject *job_capsule; - if (!PyArg_ParseTuple(args, "O!y#|pp", &PyCapsule_Type, &job_capsule, &input_data, &input_data_size, &eof, expecting_output)) return NULL; + if (!PyArg_ParseTuple(args, "O!y#|pp", &PyCapsule_Type, &job_capsule, &input_data, &input_data_size, &eof, &expecting_output)) return NULL; GET_JOB_FROM_CAPSULE; if (eof == -1) eof = input_data_size > 0 ? 0 : 1; rs_buffers_t buffer = { .avail_in=input_data_size, .next_in = input_data, .eof_in=eof, - .avail_out=expecting_output ? (MAX(IO_BUFFER_SIZE, 2 * (size_t)input_data_size)) : 64 + .avail_out=expecting_output ? IO_BUFFER_SIZE : 64 }; PyObject *ans = PyBytes_FromStringAndSize(NULL, buffer.avail_out); if (!ans) return NULL; @@ -93,13 +115,11 @@ iter_job(PyObject *self UNUSED, PyObject *args) { static PyObject* begin_load_signature(PyObject *self UNUSED, PyObject *args UNUSED) { rs_signature_t *sig = NULL; - rs_job_t *job = rs_loadsig_begin(&sig); - if (!sig || !job) return PyErr_NoMemory(); - PyObject *jc = PyCapsule_New(job, JOB_CAPSULE, free_job_capsule); - if (!jc) { rs_job_free(job); rs_free_sumset(sig); return NULL; } + CREATE_JOB(rs_loadsig_begin, NULL, &sig); + if (!job_capsule) { rs_free_sumset(sig); return NULL; } PyObject *sc = PyCapsule_New(sig, SIGNATURE_CAPSULE, free_sig_capsule); - if (!sc) { Py_DECREF(jc); rs_free_sumset(sig); return NULL; } - return Py_BuildValue("NN", jc, sc); + if (!sc) { Py_CLEAR(job_capsule); rs_free_sumset(sig); return NULL; } + return Py_BuildValue("NN", job_capsule, sc); } #define GET_SIG_FROM_CAPSULE \ @@ -125,20 +145,40 @@ begin_create_delta(PyObject *self UNUSED, PyObject *args) { PyObject *sig_capsule; if (!PyArg_ParseTuple(args, "O!", &PyCapsule_Type, &sig_capsule)) return NULL; GET_SIG_FROM_CAPSULE; - rs_job_t *job = rs_delta_begin(sig); - if (!job) return PyErr_NoMemory(); - PyObject *ans = PyCapsule_New(job, JOB_CAPSULE, free_job_capsule); - if (!ans) rs_job_free(job); - return ans; + CREATE_JOB(rs_delta_begin, NULL, sig); + return job_capsule; } +static rs_result +copy_callback(void *opaque, rs_long_t pos, size_t *len, void **buf) { + PyObject *callback = opaque; + long long p = pos; + PyObject *mem = PyMemoryView_FromMemory(*buf, *len, PyBUF_WRITE); + if (!mem) { PyErr_Clear(); return RS_MEM_ERROR; } + PyObject *res = PyObject_CallFunction(callback, "OL", mem, p); + Py_DECREF(mem); + if (res == NULL) { PyErr_Clear(); return RS_IO_ERROR; } + rs_result r = RS_DONE; + if (PyLong_Check(res)) { *len = PyLong_AsSize_t(res); } + else { r = RS_INTERNAL_ERROR; } + Py_DECREF(res); + return r; +} + +static PyObject* +begin_patch(PyObject *self UNUSED, PyObject *callback) { + if (!PyCallable_Check(callback)) { PyErr_SetString(PyExc_TypeError, "callback must be a callable"); return NULL; } + CREATE_JOB(rs_patch_begin, callback, copy_callback, callback); + return job_capsule; +} static PyMethodDef module_methods[] = { - {"begin_create_signature", (PyCFunction)begin_create_signature, METH_VARARGS, ""}, - {"begin_load_signature", (PyCFunction)begin_load_signature, METH_NOARGS, ""}, - {"build_hash_table", (PyCFunction)build_hash_table, METH_VARARGS, ""}, - {"begin_create_delta", (PyCFunction)begin_create_delta, METH_VARARGS, ""}, - {"iter_job", (PyCFunction)iter_job, METH_VARARGS, ""}, + {"begin_create_signature", begin_create_signature, METH_VARARGS, ""}, + {"begin_load_signature", begin_load_signature, METH_NOARGS, ""}, + {"build_hash_table", build_hash_table, METH_VARARGS, ""}, + {"begin_patch", begin_patch, METH_O, ""}, + {"begin_create_delta", begin_create_delta, METH_VARARGS, ""}, + {"iter_job", iter_job, METH_VARARGS, ""}, {NULL, NULL, 0, NULL} /* Sentinel */ }; diff --git a/kittens/transfer/rsync.pyi b/kittens/transfer/rsync.pyi index 7a8572848..8912dc787 100644 --- a/kittens/transfer/rsync.pyi +++ b/kittens/transfer/rsync.pyi @@ -1,4 +1,4 @@ -from typing import Optional, Tuple +from typing import Callable, Optional, Tuple IO_BUFFER_SIZE: int @@ -23,7 +23,7 @@ def begin_load_signature() -> Tuple[JobCapsule, SignatureCapsule]: pass -def make_hash_table(sig: SignatureCapsule) -> None: +def build_hash_table(sig: SignatureCapsule) -> None: pass @@ -31,5 +31,9 @@ def begin_create_delta(sig: SignatureCapsule) -> JobCapsule: pass +def begin_patch(callback: Callable[[memoryview, int], int]) -> JobCapsule: + pass + + def iter_job(job_capsule: JobCapsule, input_data: bytes, eof: Optional[bool] = None, expecting_output: bool = True) -> Tuple[bytes, bool, int]: pass