diff --git a/kittens/transfer/receive.py b/kittens/transfer/receive.py index 707128ba5..2b51470a9 100644 --- a/kittens/transfer/receive.py +++ b/kittens/transfer/receive.py @@ -15,7 +15,7 @@ from kitty.cli_stub import TransferCLIOptions from kitty.fast_data_types import FILE_TRANSFER_CODE, wcswidth from kitty.file_transmission import ( Action, Compression, FileTransmissionCommand, FileType, NameReprEnum, - encode_bypass + TransmissionType, encode_bypass, split_for_transfer ) from kitty.typing import KeyEventType from kitty.utils import sanitize_control_codes @@ -25,6 +25,7 @@ from ..tui.loop import Loop, debug from ..tui.operations import styled, without_line_wrap from ..tui.spinners import Spinner from ..tui.utils import human_size +from .librsync import signature_of_file from .send import Transfer from .utils import ( expand_home, random_id, render_progress_in_width, safe_divide, @@ -208,7 +209,7 @@ class Manager: def __init__( self, request_id: str, spec: List[str], dest: str, - bypass: Optional[str] = None + bypass: Optional[str] = None, use_rsync: bool = False ): self.request_id = request_id self.spec = spec @@ -223,6 +224,7 @@ class Manager: self.files: List[File] = [] self.progress_tracker = ProgressTracker() self.transfer_done = False + self.use_rsync = use_rsync def start_transfer(self) -> Iterator[str]: yield FileTransmissionCommand(action=Action.receive, bypass=self.bypass, size=len(self.spec)).serialize() @@ -274,10 +276,24 @@ class Manager: for f in self.files: if f.ftype is FileType.directory or (f.ftype is FileType.link and f.remote_target): continue + read_signature = self.use_rsync + if read_signature and f.ftype is FileType.regular: + try: + sr = os.stat(f.expanded_local_path, follow_symlinks=False) + except OSError: + read_signature = False + else: + read_signature = sr.st_size > 4096 yield FileTransmissionCommand( - action=Action.file, name=f.remote_path, file_id=f.file_id, + action=Action.file, name=f.remote_path, file_id=f.file_id, ttype=TransmissionType.rsync if read_signature else TransmissionType.simple, compression=Compression.zlib if f.compression_capable else Compression.none ).serialize() + if read_signature: + fs = signature_of_file(f.expanded_local_path) + for chunk in fs: + for data in split_for_transfer(chunk, file_id=f.file_id): + yield data.serialize() + yield FileTransmissionCommand(file_id=f.file_id, action=Action.end_data).serialize() def collect_files(self, cli_opts: TransferCLIOptions) -> None: self.files = list(files_for_receive(cli_opts, self.dest, self.files, self.remote_home, self.spec)) @@ -345,7 +361,7 @@ class Receive(Handler): def __init__(self, cli_opts: TransferCLIOptions, spec: List[str], dest: str = ''): self.cli_opts = cli_opts - self.manager = Manager(random_id(), spec, dest, bypass=cli_opts.permissions_bypass) + self.manager = Manager(random_id(), spec, dest, bypass=cli_opts.permissions_bypass, use_rsync=cli_opts.transmit_deltas) self.quit_after_write_code: Optional[int] = None self.check_paths_printed = False self.transmit_started = False @@ -353,6 +369,7 @@ class Receive(Handler): self.spinner = Spinner() self.progress_update_call: Optional[TimerHandle] = None self.progress_drawn = False + self.transmit_iterator: Optional[Iterator[str]] = None def send_payload(self, payload: str) -> None: self.write(self.manager.prefix) @@ -454,11 +471,28 @@ class Receive(Handler): def start_transfer(self) -> None: self.transmit_started = True - self.print(f'Queueing transfer of {len(self.manager.files)} files(s)') - for x in self.manager.request_files(): - self.send_payload(x) + if self.manager.use_rsync: + self.print(f'Transmitting signatures of {len(self.manager.files)} files(s)') + else: + self.print(f'Queueing transfer of {len(self.manager.files)} files(s)') names = (f.display_name for f in self.manager.files) self.max_name_length = max(6, max(map(wcswidth, names))) + self.transmit_iterator = self.manager.request_files() + self.transmit_one() + + def transmit_one(self) -> None: + if self.transmit_iterator is None: + return + try: + data = next(self.transmit_iterator) + except StopIteration: + self.transmit_iterator = None + except Exception as err: + self.print_err(str(err)) + self.print('Waiting to ensure terminal cancels transfer, will quit in a few seconds') + self.abort_transfer() + else: + self.send_payload(data) def print_err(self, msg: str) -> None: self.cmd.styled(msg, fg='red') @@ -567,6 +601,8 @@ class Receive(Handler): def on_writing_finished(self) -> None: if self.quit_after_write_code is not None: self.quit_loop(self.quit_after_write_code) + elif self.transmit_iterator is not None: + self.transmit_one() def receive_main(cli_opts: TransferCLIOptions, args: List[str]) -> None: