Start work on delta based file transmission

This commit is contained in:
Kovid Goyal 2021-10-01 14:32:28 +05:30
parent e6cff61f99
commit cfeeec95fa
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 91 additions and 34 deletions

View File

@ -43,6 +43,13 @@ is assumed to be the number of a file descriptor from which to read the actual p
type=bool-set
Before actually transferring files, show a mapping of local file names to remote file names
and ask for confirmation.
--transmit-deltas -x
type=bool-set
If a file on the receiving side already exists, use the rsync algorithm to update it to match
the file on the sending side, potentially saving lots of bandwidth and also automatically resuming
partial transfers.
'''

View File

@ -28,10 +28,10 @@ from ..tui.loop import Loop, debug
from ..tui.operations import styled, without_line_wrap
from ..tui.spinners import Spinner
from ..tui.utils import human_size
from .librsync import LoadSignature, delta_for_file
from .utils import (
IdentityCompressor, ZlibCompressor, abspath, expand_home, home_path,
random_id, render_progress_in_width, safe_divide,
should_be_compressed
random_id, render_progress_in_width, safe_divide, should_be_compressed
)
debug
@ -57,9 +57,8 @@ class File:
def __init__(
self, local_path: str, expanded_local_path: str, file_id: int, stat_result: os.stat_result,
remote_base: str, file_type: FileType, ttype: TransmissionType = TransmissionType.simple
remote_base: str, file_type: FileType,
) -> None:
self.ttype = ttype
self.state = FileState.waiting_for_start
self.local_path = local_path
self.display_name = sanitize_control_codes(local_path)
@ -75,17 +74,26 @@ class File:
self.symbolic_link_target = ''
self.stat_result = stat_result
self.file_type = file_type
self.compression = Compression.zlib if (
self.file_type is FileType.regular and self.file_size > 4096 and should_be_compressed(self.expanded_local_path)
) else Compression.none
self.compression = Compression.zlib
self.compressor: Union[ZlibCompressor, IdentityCompressor] = ZlibCompressor() if self.compression is Compression.zlib else IdentityCompressor()
self.rsync_capable = self.file_type is FileType.regular and self.file_size > 4096
self.compression_capable = self.file_type is FileType.regular and self.file_size > 4096 and should_be_compressed(self.expanded_local_path)
self.remote_final_path = ''
self.remote_initial_size = -1
self.err_msg = ''
self.actual_file: Optional[IO[bytes]] = None
self.transmitted_bytes = 0
self.transmit_started_at = self.transmit_ended_at = 0.
self.signature_loader: Optional[LoadSignature] = None
self.delta_loader: Optional[Iterator[memoryview]] = None
def start_delta_calculation(self) -> None:
sl = self.signature_loader
assert sl is not None
if not sl.finished:
sl()
if not sl.finished:
raise ValueError('Delta signature is incomplete')
self.state = FileState.transmitting
self.delta_loader = delta_for_file(self.expanded_local_path, sl.signature)
def __repr__(self) -> str:
return f'File(name={self.display_name}, ft={self.file_type}, state={self.state})'
@ -99,25 +107,38 @@ class File:
self.state = FileState.finished
ans = self.hard_link_target.encode('utf-8')
return ans, len(ans)
if self.actual_file is None:
self.actual_file = open(self.expanded_local_path, 'rb')
chunk = self.actual_file.read(sz)
is_last = False
if self.delta_loader is not None:
try:
chunk: Union[bytes, memoryview] = next(self.delta_loader)
except StopIteration:
is_last = True
self.delta_loader = None
chunk = b''
else:
if self.actual_file is None:
self.actual_file = open(self.expanded_local_path, 'rb')
chunk = self.actual_file.read(sz)
is_last = not chunk or self.actual_file.tell() >= self.file_size
uncompressed_sz = len(chunk)
is_last = not chunk or self.actual_file.tell() >= self.file_size
cchunk = self.compressor.compress(chunk)
if is_last and not isinstance(self.compressor, IdentityCompressor):
cchunk += self.compressor.flush()
if is_last:
self.state = FileState.finished
self.actual_file.close()
self.actual_file = None
if self.actual_file is not None:
self.actual_file.close()
self.actual_file = None
return cchunk, uncompressed_sz
def metadata_command(self) -> FileTransmissionCommand:
def metadata_command(self, use_rsync: bool = False) -> FileTransmissionCommand:
self.ttype = TransmissionType.rsync if self.rsync_capable and use_rsync else TransmissionType.simple
self.compression = Compression.zlib if self.compression_capable else Compression.none
self.compressor: Union[ZlibCompressor, IdentityCompressor] = ZlibCompressor() if self.compression is Compression.zlib else IdentityCompressor()
return FileTransmissionCommand(
action=Action.file, compression=self.compression, ftype=self.file_type,
name=self.remote_path, permissions=self.permissions, mtime=self.mtime,
file_id=self.file_id,
file_id=self.file_id, ttype=self.ttype
)
@ -257,7 +278,12 @@ class ProgressTracker:
class SendManager:
def __init__(self, request_id: str, files: Tuple[File, ...], bypass: Optional[str] = None, file_done: Callable[[File], None] = lambda f: None):
def __init__(
self, request_id: str, files: Tuple[File, ...],
bypass: Optional[str] = None, use_rsync: bool = False,
file_done: Callable[[File], None] = lambda f: None,
):
self.use_rsync = use_rsync
self.files = files
self.bypass = encode_bypass(request_id, bypass) if bypass else ''
self.fid_map = {f.file_id: f for f in self.files}
@ -331,7 +357,7 @@ class SendManager:
def send_file_metadata(self) -> Iterator[str]:
for f in self.files:
yield f.metadata_command().serialize()
yield f.metadata_command(self.use_rsync).serialize()
def on_file_status_update(self, ftc: FileTransmissionCommand) -> None:
file = self.fid_map.get(ftc.file_id)
@ -344,6 +370,8 @@ class SendManager:
file.state = FileState.finished
else:
file.state = FileState.waiting_for_data if file.ttype is TransmissionType.rsync else FileState.transmitting
if file.state is FileState.waiting_for_data:
file.signature_loader = LoadSignature()
else:
if ftc.name and not file.remote_final_path:
file.remote_final_path = ftc.name
@ -355,12 +383,26 @@ class SendManager:
self.active_idx = None
self.update_collective_statuses()
def on_signature_data_received(self, ftc: FileTransmissionCommand) -> None:
file = self.fid_map.get(ftc.file_id)
if file is None or file.state is not FileState.waiting_for_data:
return
sl = file.signature_loader
assert sl is not None
sl(ftc.data)
if ftc.action is Action.end_data:
file.start_delta_calculation()
self.update_collective_statuses()
def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None:
if ftc.action is Action.status:
if ftc.file_id:
self.on_file_status_update(ftc)
else:
self.state = SendState.permission_granted if ftc.status == 'OK' else SendState.permission_denied
elif ftc.action in (Action.data, Action.end_data):
if ftc.file_id:
self.on_signature_data_received(ftc)
class Send(Handler):
@ -368,7 +410,7 @@ class Send(Handler):
def __init__(self, cli_opts: TransferCLIOptions, files: Tuple[File, ...]):
Handler.__init__(self)
self.manager = SendManager(random_id(), files, cli_opts.permissions_bypass, self.on_file_done)
self.manager = SendManager(random_id(), files, cli_opts.permissions_bypass, cli_opts.transmit_deltas, self.on_file_done)
self.cli_opts = cli_opts
self.transmit_started = False
self.file_metadata_sent = False
@ -380,6 +422,7 @@ class Send(Handler):
self.progress_drawn = True
self.done_files: List[File] = []
self.failed_files: List[File] = []
self.transmit_ok_checked = False
def send_payload(self, payload: str) -> None:
self.write(self.manager.prefix)
@ -463,12 +506,15 @@ class Send(Handler):
self.on_interrupt()
def check_for_transmit_ok(self) -> None:
if self.transmit_ok_checked:
return self.start_transfer()
if self.manager.state is not SendState.permission_granted:
return
if self.cli_opts.confirm_paths:
if self.manager.all_started:
self.print_check_paths()
return
self.transmit_ok_checked = True
self.start_transfer()
def transmit_next_chunk(self) -> None:

View File

@ -18,7 +18,7 @@ from typing import (
IO, Any, Callable, Deque, Dict, Iterator, List, Optional, Union
)
from kittens.transfer.librsync import signature_of_file
from kittens.transfer.librsync import PatchFile, signature_of_file
from kitty.fast_data_types import (
FILE_TRANSFER_CODE, OSC, add_timer, get_boss, get_options
)
@ -201,7 +201,7 @@ class FileTransmissionCommand:
if not fmap:
fmap = {k.name.encode('ascii'): k for k in fields(cls)}
setattr(cls, 'fmap', fmap)
from kittens.transfer.rsync import parse_ftc, decode_utf8_buffer
from kittens.transfer.rsync import decode_utf8_buffer, parse_ftc
def handle_item(key: memoryview, val: memoryview, has_semicolons: bool) -> None:
field = fmap.get(key)
@ -272,7 +272,7 @@ class DestFile:
self.needs_data_sent = self.ttype is not TransmissionType.simple
self.decompressor: Union[ZlibDecompressor, IdentityDecompressor] = ZlibDecompressor() if ftc.compression is Compression.zlib else IdentityDecompressor()
self.closed = self.ftype is FileType.directory
self.actual_file: Optional[IO[bytes]] = None
self.actual_file: Union[PatchFile, IO[bytes], None] = None
self.failed = False
def __repr__(self) -> str:
@ -349,15 +349,16 @@ class DestFile:
self.close()
self.apply_metadata(is_symlink=True)
elif self.ftype is FileType.regular:
data = self.decompressor(data, is_last=is_last)
if self.actual_file is None:
self.make_parent_dirs()
self.unlink_existing_if_needed()
flags = os.O_RDWR | os.O_CREAT | getattr(os, 'O_CLOEXEC', 0) | getattr(os, 'O_BINARY', 0)
if self.ttype is TransmissionType.simple:
flags |= os.O_TRUNC
self.actual_file = open(os.open(self.name, flags, self.permissions), mode='r+b', closefd=True)
data = self.decompressor(data, is_last=is_last)
self.actual_file.write(data)
if self.ttype is TransmissionType.rsync:
self.actual_file = PatchFile(self.name)
else:
self.unlink_existing_if_needed()
flags = os.O_RDWR | os.O_CREAT | os.O_TRUNC | getattr(os, 'O_CLOEXEC', 0) | getattr(os, 'O_BINARY', 0)
self.actual_file = open(os.open(self.name, flags, self.permissions), mode='r+b', closefd=True)
self.actual_file.write(data) # type: ignore
if is_last:
self.close()
self.apply_metadata()
@ -537,6 +538,7 @@ class FileTransmission:
ttype = TransmissionType.rsync \
if sz > -1 and df.ttype is TransmissionType.rsync and df.ftype is FileType.regular else TransmissionType.simple
self.send_status_response(code=ErrorCode.STARTED, request_id=ar.id, file_id=df.file_id, name=df.name, size=sz, ttype=ttype)
df.ttype = ttype
if ttype is TransmissionType.rsync:
try:
fs = signature_of_file(df.name)
@ -588,6 +590,7 @@ class FileTransmission:
pending.popleft()
else:
self.callback_after(func, timeout=0.1)
return
try:
next_bit_of_data = next(fs)
except StopIteration:
@ -599,11 +602,12 @@ class FileTransmission:
return
has_capacity = True
pos = 0
while True:
is_last = False
while not is_last:
r = next_bit_of_data[pos:pos + 4096]
if len(r) < 1:
break
data = FileTransmissionCommand(id=receive_id, action=Action.data, file_id=file_id, data=r)
is_last = len(r) < 4096
pos += len(r)
data = FileTransmissionCommand(id=receive_id, action=Action.end_data if is_last else Action.data, file_id=file_id, data=r)
if has_capacity:
if not self.write_ftc_to_child(data, use_pending=False):
has_capacity = False