diff --git a/kittens/transfer/main.py b/kittens/transfer/main.py index 9f7792420..e64e8aa7e 100644 --- a/kittens/transfer/main.py +++ b/kittens/transfer/main.py @@ -43,6 +43,13 @@ is assumed to be the number of a file descriptor from which to read the actual p type=bool-set Before actually transferring files, show a mapping of local file names to remote file names and ask for confirmation. + + +--transmit-deltas -x +type=bool-set +If a file on the receiving side already exists, use the rsync algorithm to update it to match +the file on the sending side, potentially saving lots of bandwidth and also automatically resuming +partial transfers. ''' diff --git a/kittens/transfer/send.py b/kittens/transfer/send.py index dfb321a9e..6731ae4ef 100644 --- a/kittens/transfer/send.py +++ b/kittens/transfer/send.py @@ -28,10 +28,10 @@ from ..tui.loop import Loop, debug from ..tui.operations import styled, without_line_wrap from ..tui.spinners import Spinner from ..tui.utils import human_size +from .librsync import LoadSignature, delta_for_file from .utils import ( IdentityCompressor, ZlibCompressor, abspath, expand_home, home_path, - random_id, render_progress_in_width, safe_divide, - should_be_compressed + random_id, render_progress_in_width, safe_divide, should_be_compressed ) debug @@ -57,9 +57,8 @@ class File: def __init__( self, local_path: str, expanded_local_path: str, file_id: int, stat_result: os.stat_result, - remote_base: str, file_type: FileType, ttype: TransmissionType = TransmissionType.simple + remote_base: str, file_type: FileType, ) -> None: - self.ttype = ttype self.state = FileState.waiting_for_start self.local_path = local_path self.display_name = sanitize_control_codes(local_path) @@ -75,17 +74,26 @@ class File: self.symbolic_link_target = '' self.stat_result = stat_result self.file_type = file_type - self.compression = Compression.zlib if ( - self.file_type is FileType.regular and self.file_size > 4096 and should_be_compressed(self.expanded_local_path) - ) else Compression.none - self.compression = Compression.zlib - self.compressor: Union[ZlibCompressor, IdentityCompressor] = ZlibCompressor() if self.compression is Compression.zlib else IdentityCompressor() + self.rsync_capable = self.file_type is FileType.regular and self.file_size > 4096 + self.compression_capable = self.file_type is FileType.regular and self.file_size > 4096 and should_be_compressed(self.expanded_local_path) self.remote_final_path = '' self.remote_initial_size = -1 self.err_msg = '' self.actual_file: Optional[IO[bytes]] = None self.transmitted_bytes = 0 self.transmit_started_at = self.transmit_ended_at = 0. + self.signature_loader: Optional[LoadSignature] = None + self.delta_loader: Optional[Iterator[memoryview]] = None + + def start_delta_calculation(self) -> None: + sl = self.signature_loader + assert sl is not None + if not sl.finished: + sl() + if not sl.finished: + raise ValueError('Delta signature is incomplete') + self.state = FileState.transmitting + self.delta_loader = delta_for_file(self.expanded_local_path, sl.signature) def __repr__(self) -> str: return f'File(name={self.display_name}, ft={self.file_type}, state={self.state})' @@ -99,25 +107,38 @@ class File: self.state = FileState.finished ans = self.hard_link_target.encode('utf-8') return ans, len(ans) - if self.actual_file is None: - self.actual_file = open(self.expanded_local_path, 'rb') - chunk = self.actual_file.read(sz) + is_last = False + if self.delta_loader is not None: + try: + chunk: Union[bytes, memoryview] = next(self.delta_loader) + except StopIteration: + is_last = True + self.delta_loader = None + chunk = b'' + else: + if self.actual_file is None: + self.actual_file = open(self.expanded_local_path, 'rb') + chunk = self.actual_file.read(sz) + is_last = not chunk or self.actual_file.tell() >= self.file_size uncompressed_sz = len(chunk) - is_last = not chunk or self.actual_file.tell() >= self.file_size cchunk = self.compressor.compress(chunk) if is_last and not isinstance(self.compressor, IdentityCompressor): cchunk += self.compressor.flush() if is_last: self.state = FileState.finished - self.actual_file.close() - self.actual_file = None + if self.actual_file is not None: + self.actual_file.close() + self.actual_file = None return cchunk, uncompressed_sz - def metadata_command(self) -> FileTransmissionCommand: + def metadata_command(self, use_rsync: bool = False) -> FileTransmissionCommand: + self.ttype = TransmissionType.rsync if self.rsync_capable and use_rsync else TransmissionType.simple + self.compression = Compression.zlib if self.compression_capable else Compression.none + self.compressor: Union[ZlibCompressor, IdentityCompressor] = ZlibCompressor() if self.compression is Compression.zlib else IdentityCompressor() return FileTransmissionCommand( action=Action.file, compression=self.compression, ftype=self.file_type, name=self.remote_path, permissions=self.permissions, mtime=self.mtime, - file_id=self.file_id, + file_id=self.file_id, ttype=self.ttype ) @@ -257,7 +278,12 @@ class ProgressTracker: class SendManager: - def __init__(self, request_id: str, files: Tuple[File, ...], bypass: Optional[str] = None, file_done: Callable[[File], None] = lambda f: None): + def __init__( + self, request_id: str, files: Tuple[File, ...], + bypass: Optional[str] = None, use_rsync: bool = False, + file_done: Callable[[File], None] = lambda f: None, + ): + self.use_rsync = use_rsync self.files = files self.bypass = encode_bypass(request_id, bypass) if bypass else '' self.fid_map = {f.file_id: f for f in self.files} @@ -331,7 +357,7 @@ class SendManager: def send_file_metadata(self) -> Iterator[str]: for f in self.files: - yield f.metadata_command().serialize() + yield f.metadata_command(self.use_rsync).serialize() def on_file_status_update(self, ftc: FileTransmissionCommand) -> None: file = self.fid_map.get(ftc.file_id) @@ -344,6 +370,8 @@ class SendManager: file.state = FileState.finished else: file.state = FileState.waiting_for_data if file.ttype is TransmissionType.rsync else FileState.transmitting + if file.state is FileState.waiting_for_data: + file.signature_loader = LoadSignature() else: if ftc.name and not file.remote_final_path: file.remote_final_path = ftc.name @@ -355,12 +383,26 @@ class SendManager: self.active_idx = None self.update_collective_statuses() + def on_signature_data_received(self, ftc: FileTransmissionCommand) -> None: + file = self.fid_map.get(ftc.file_id) + if file is None or file.state is not FileState.waiting_for_data: + return + sl = file.signature_loader + assert sl is not None + sl(ftc.data) + if ftc.action is Action.end_data: + file.start_delta_calculation() + self.update_collective_statuses() + def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None: if ftc.action is Action.status: if ftc.file_id: self.on_file_status_update(ftc) else: self.state = SendState.permission_granted if ftc.status == 'OK' else SendState.permission_denied + elif ftc.action in (Action.data, Action.end_data): + if ftc.file_id: + self.on_signature_data_received(ftc) class Send(Handler): @@ -368,7 +410,7 @@ class Send(Handler): def __init__(self, cli_opts: TransferCLIOptions, files: Tuple[File, ...]): Handler.__init__(self) - self.manager = SendManager(random_id(), files, cli_opts.permissions_bypass, self.on_file_done) + self.manager = SendManager(random_id(), files, cli_opts.permissions_bypass, cli_opts.transmit_deltas, self.on_file_done) self.cli_opts = cli_opts self.transmit_started = False self.file_metadata_sent = False @@ -380,6 +422,7 @@ class Send(Handler): self.progress_drawn = True self.done_files: List[File] = [] self.failed_files: List[File] = [] + self.transmit_ok_checked = False def send_payload(self, payload: str) -> None: self.write(self.manager.prefix) @@ -463,12 +506,15 @@ class Send(Handler): self.on_interrupt() def check_for_transmit_ok(self) -> None: + if self.transmit_ok_checked: + return self.start_transfer() if self.manager.state is not SendState.permission_granted: return if self.cli_opts.confirm_paths: if self.manager.all_started: self.print_check_paths() return + self.transmit_ok_checked = True self.start_transfer() def transmit_next_chunk(self) -> None: diff --git a/kitty/file_transmission.py b/kitty/file_transmission.py index b6e868ffa..0f75e0630 100644 --- a/kitty/file_transmission.py +++ b/kitty/file_transmission.py @@ -18,7 +18,7 @@ from typing import ( IO, Any, Callable, Deque, Dict, Iterator, List, Optional, Union ) -from kittens.transfer.librsync import signature_of_file +from kittens.transfer.librsync import PatchFile, signature_of_file from kitty.fast_data_types import ( FILE_TRANSFER_CODE, OSC, add_timer, get_boss, get_options ) @@ -201,7 +201,7 @@ class FileTransmissionCommand: if not fmap: fmap = {k.name.encode('ascii'): k for k in fields(cls)} setattr(cls, 'fmap', fmap) - from kittens.transfer.rsync import parse_ftc, decode_utf8_buffer + from kittens.transfer.rsync import decode_utf8_buffer, parse_ftc def handle_item(key: memoryview, val: memoryview, has_semicolons: bool) -> None: field = fmap.get(key) @@ -272,7 +272,7 @@ class DestFile: self.needs_data_sent = self.ttype is not TransmissionType.simple self.decompressor: Union[ZlibDecompressor, IdentityDecompressor] = ZlibDecompressor() if ftc.compression is Compression.zlib else IdentityDecompressor() self.closed = self.ftype is FileType.directory - self.actual_file: Optional[IO[bytes]] = None + self.actual_file: Union[PatchFile, IO[bytes], None] = None self.failed = False def __repr__(self) -> str: @@ -349,15 +349,16 @@ class DestFile: self.close() self.apply_metadata(is_symlink=True) elif self.ftype is FileType.regular: + data = self.decompressor(data, is_last=is_last) if self.actual_file is None: self.make_parent_dirs() - self.unlink_existing_if_needed() - flags = os.O_RDWR | os.O_CREAT | getattr(os, 'O_CLOEXEC', 0) | getattr(os, 'O_BINARY', 0) - if self.ttype is TransmissionType.simple: - flags |= os.O_TRUNC - self.actual_file = open(os.open(self.name, flags, self.permissions), mode='r+b', closefd=True) - data = self.decompressor(data, is_last=is_last) - self.actual_file.write(data) + if self.ttype is TransmissionType.rsync: + self.actual_file = PatchFile(self.name) + else: + self.unlink_existing_if_needed() + flags = os.O_RDWR | os.O_CREAT | os.O_TRUNC | getattr(os, 'O_CLOEXEC', 0) | getattr(os, 'O_BINARY', 0) + self.actual_file = open(os.open(self.name, flags, self.permissions), mode='r+b', closefd=True) + self.actual_file.write(data) # type: ignore if is_last: self.close() self.apply_metadata() @@ -537,6 +538,7 @@ class FileTransmission: ttype = TransmissionType.rsync \ if sz > -1 and df.ttype is TransmissionType.rsync and df.ftype is FileType.regular else TransmissionType.simple self.send_status_response(code=ErrorCode.STARTED, request_id=ar.id, file_id=df.file_id, name=df.name, size=sz, ttype=ttype) + df.ttype = ttype if ttype is TransmissionType.rsync: try: fs = signature_of_file(df.name) @@ -588,6 +590,7 @@ class FileTransmission: pending.popleft() else: self.callback_after(func, timeout=0.1) + return try: next_bit_of_data = next(fs) except StopIteration: @@ -599,11 +602,12 @@ class FileTransmission: return has_capacity = True pos = 0 - while True: + is_last = False + while not is_last: r = next_bit_of_data[pos:pos + 4096] - if len(r) < 1: - break - data = FileTransmissionCommand(id=receive_id, action=Action.data, file_id=file_id, data=r) + is_last = len(r) < 4096 + pos += len(r) + data = FileTransmissionCommand(id=receive_id, action=Action.end_data if is_last else Action.data, file_id=file_id, data=r) if has_capacity: if not self.write_ftc_to_child(data, use_pending=False): has_capacity = False