Start work on rsync integration for receiving files
This commit is contained in:
parent
bc03b4dff6
commit
63399fe975
@ -15,7 +15,7 @@ from kitty.cli_stub import TransferCLIOptions
|
||||
from kitty.fast_data_types import FILE_TRANSFER_CODE, wcswidth
|
||||
from kitty.file_transmission import (
|
||||
Action, Compression, FileTransmissionCommand, FileType, NameReprEnum,
|
||||
encode_bypass
|
||||
TransmissionType, encode_bypass, split_for_transfer
|
||||
)
|
||||
from kitty.typing import KeyEventType
|
||||
from kitty.utils import sanitize_control_codes
|
||||
@ -25,6 +25,7 @@ from ..tui.loop import Loop, debug
|
||||
from ..tui.operations import styled, without_line_wrap
|
||||
from ..tui.spinners import Spinner
|
||||
from ..tui.utils import human_size
|
||||
from .librsync import signature_of_file
|
||||
from .send import Transfer
|
||||
from .utils import (
|
||||
expand_home, random_id, render_progress_in_width, safe_divide,
|
||||
@ -208,7 +209,7 @@ class Manager:
|
||||
|
||||
def __init__(
|
||||
self, request_id: str, spec: List[str], dest: str,
|
||||
bypass: Optional[str] = None
|
||||
bypass: Optional[str] = None, use_rsync: bool = False
|
||||
):
|
||||
self.request_id = request_id
|
||||
self.spec = spec
|
||||
@ -223,6 +224,7 @@ class Manager:
|
||||
self.files: List[File] = []
|
||||
self.progress_tracker = ProgressTracker()
|
||||
self.transfer_done = False
|
||||
self.use_rsync = use_rsync
|
||||
|
||||
def start_transfer(self) -> Iterator[str]:
|
||||
yield FileTransmissionCommand(action=Action.receive, bypass=self.bypass, size=len(self.spec)).serialize()
|
||||
@ -274,10 +276,24 @@ class Manager:
|
||||
for f in self.files:
|
||||
if f.ftype is FileType.directory or (f.ftype is FileType.link and f.remote_target):
|
||||
continue
|
||||
read_signature = self.use_rsync
|
||||
if read_signature and f.ftype is FileType.regular:
|
||||
try:
|
||||
sr = os.stat(f.expanded_local_path, follow_symlinks=False)
|
||||
except OSError:
|
||||
read_signature = False
|
||||
else:
|
||||
read_signature = sr.st_size > 4096
|
||||
yield FileTransmissionCommand(
|
||||
action=Action.file, name=f.remote_path, file_id=f.file_id,
|
||||
action=Action.file, name=f.remote_path, file_id=f.file_id, ttype=TransmissionType.rsync if read_signature else TransmissionType.simple,
|
||||
compression=Compression.zlib if f.compression_capable else Compression.none
|
||||
).serialize()
|
||||
if read_signature:
|
||||
fs = signature_of_file(f.expanded_local_path)
|
||||
for chunk in fs:
|
||||
for data in split_for_transfer(chunk, file_id=f.file_id):
|
||||
yield data.serialize()
|
||||
yield FileTransmissionCommand(file_id=f.file_id, action=Action.end_data).serialize()
|
||||
|
||||
def collect_files(self, cli_opts: TransferCLIOptions) -> None:
|
||||
self.files = list(files_for_receive(cli_opts, self.dest, self.files, self.remote_home, self.spec))
|
||||
@ -345,7 +361,7 @@ class Receive(Handler):
|
||||
|
||||
def __init__(self, cli_opts: TransferCLIOptions, spec: List[str], dest: str = ''):
|
||||
self.cli_opts = cli_opts
|
||||
self.manager = Manager(random_id(), spec, dest, bypass=cli_opts.permissions_bypass)
|
||||
self.manager = Manager(random_id(), spec, dest, bypass=cli_opts.permissions_bypass, use_rsync=cli_opts.transmit_deltas)
|
||||
self.quit_after_write_code: Optional[int] = None
|
||||
self.check_paths_printed = False
|
||||
self.transmit_started = False
|
||||
@ -353,6 +369,7 @@ class Receive(Handler):
|
||||
self.spinner = Spinner()
|
||||
self.progress_update_call: Optional[TimerHandle] = None
|
||||
self.progress_drawn = False
|
||||
self.transmit_iterator: Optional[Iterator[str]] = None
|
||||
|
||||
def send_payload(self, payload: str) -> None:
|
||||
self.write(self.manager.prefix)
|
||||
@ -454,11 +471,28 @@ class Receive(Handler):
|
||||
|
||||
def start_transfer(self) -> None:
|
||||
self.transmit_started = True
|
||||
self.print(f'Queueing transfer of {len(self.manager.files)} files(s)')
|
||||
for x in self.manager.request_files():
|
||||
self.send_payload(x)
|
||||
if self.manager.use_rsync:
|
||||
self.print(f'Transmitting signatures of {len(self.manager.files)} files(s)')
|
||||
else:
|
||||
self.print(f'Queueing transfer of {len(self.manager.files)} files(s)')
|
||||
names = (f.display_name for f in self.manager.files)
|
||||
self.max_name_length = max(6, max(map(wcswidth, names)))
|
||||
self.transmit_iterator = self.manager.request_files()
|
||||
self.transmit_one()
|
||||
|
||||
def transmit_one(self) -> None:
|
||||
if self.transmit_iterator is None:
|
||||
return
|
||||
try:
|
||||
data = next(self.transmit_iterator)
|
||||
except StopIteration:
|
||||
self.transmit_iterator = None
|
||||
except Exception as err:
|
||||
self.print_err(str(err))
|
||||
self.print('Waiting to ensure terminal cancels transfer, will quit in a few seconds')
|
||||
self.abort_transfer()
|
||||
else:
|
||||
self.send_payload(data)
|
||||
|
||||
def print_err(self, msg: str) -> None:
|
||||
self.cmd.styled(msg, fg='red')
|
||||
@ -567,6 +601,8 @@ class Receive(Handler):
|
||||
def on_writing_finished(self) -> None:
|
||||
if self.quit_after_write_code is not None:
|
||||
self.quit_loop(self.quit_after_write_code)
|
||||
elif self.transmit_iterator is not None:
|
||||
self.transmit_one()
|
||||
|
||||
|
||||
def receive_main(cli_opts: TransferCLIOptions, args: List[str]) -> None:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user