diff --git a/kittens/transfer/librsync.py b/kittens/transfer/librsync.py index 57fddca20..fdf2a1979 100644 --- a/kittens/transfer/librsync.py +++ b/kittens/transfer/librsync.py @@ -31,8 +31,6 @@ class StreamingJob: raise RsyncError('There was too much input data') return memoryview(self.output_buf)[:0] no_more_data = not input_data - if no_more_data: - self.calls_with_no_data += 1 if self.prev_unused_input: input_data = self.prev_unused_input + input_data self.prev_unused_input = b'' @@ -43,7 +41,9 @@ class StreamingJob: self.prev_unused_input = bytes(input_data[-sz_of_unused_input:]) if self.finished: self.commit() - elif self.calls_with_no_data > 3: + if no_more_data and not output_size: + self.calls_with_no_data += 1 + if self.calls_with_no_data > 3: # prevent infinite loop raise RsyncError('There was not enough input data') return memoryview(self.output_buf)[:output_size] @@ -56,7 +56,9 @@ def drive_job_on_file(f: IO[bytes], job: 'JobCapsule', input_buf_size: int = IO_ input_buf = bytearray(input_buf_size) while not sj.finished: sz = f.readinto(input_buf) # type: ignore - yield sj(memoryview(input_buf)[:sz]) + result = sj(memoryview(input_buf)[:sz]) + if len(result) > 0: + yield result def signature_of_file(path: str) -> Iterator[memoryview]: @@ -108,3 +110,9 @@ class PatchFile(StreamingJob): if not self.src_file.closed: self.src_file.close() commit = close + + def __enter__(self) -> 'PatchFile': + return self + + def __exit__(self, *a: object) -> None: + self.close() diff --git a/kitty_tests/file_transmission.py b/kitty_tests/file_transmission.py index 0d462cec2..81bbb170e 100644 --- a/kitty_tests/file_transmission.py +++ b/kitty_tests/file_transmission.py @@ -10,6 +10,9 @@ import tempfile import zlib from pathlib import Path +from kittens.transfer.librsync import ( + LoadSignature, PatchFile, delta_for_file, signature_of_file +) from kittens.transfer.main import parse_transfer_args from kittens.transfer.send import files_for_send from kittens.transfer.utils import set_paths @@ -83,6 +86,38 @@ class TestFileTransmission(BaseTest): b = os.path.abspath(os.path.realpath(b)) self.ae(a, b) + def test_rsync_roundtrip(self): + a_path = os.path.join(self.tdir, 'a') + b_path = os.path.join(self.tdir, 'b') + c_path = os.path.join(self.tdir, 'c') + sz = 1024 * 1024 + 37 + with open(a_path, 'wb') as f: + f.write(os.urandom(sz)) + with open(b_path, 'wb') as f: + f.write(os.urandom(sz)) + sig_loader = LoadSignature() + for chunk in signature_of_file(a_path): + sig_loader(chunk) + sig_loader() + self.assertTrue(sig_loader.finished) + with open(c_path, 'wb') as dest, PatchFile(a_path) as patcher: + for chunk in delta_for_file(b_path, sig_loader.signature): + self.assertFalse(patcher.finished) + output = patcher(chunk) + if output: + dest.write(output) + while not patcher.finished: + output = patcher() + if output: + dest.write(output) + with open(b_path, 'rb') as b, open(c_path, 'rb') as c: + while True: + bc = b.read(4096) + cc = c.read(4096) + self.ae(bc, cc) + if not bc and not cc: + break + def test_file_put(self): # send refusal for quiet in (0, 1, 2):