Have the receiving side send progress updates

This is both more accurate and works for rsync based transfers, where we
dont know the total size to transmit because of the streaming nature of
the rsync protocol.
This commit is contained in:
Kovid Goyal 2021-10-03 11:04:46 +05:30
parent 3a373a200c
commit 76eab44f53
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 61 additions and 18 deletions

View File

@ -5,6 +5,7 @@
import os
import stat
from asyncio import TimerHandle
from collections import deque
from enum import auto
from itertools import count
@ -81,7 +82,8 @@ class File:
self.err_msg = ''
self.actual_file: Optional[IO[bytes]] = None
self.transmitted_bytes = 0
self.transmit_started_at = self.transmit_ended_at = 0.
self.reported_progress = 0
self.transmit_started_at = self.transmit_ended_at = self.done_at = 0.
self.signature_loader: Optional[LoadSignature] = None
self.delta_loader: Optional[Iterator[memoryview]] = None
@ -250,6 +252,7 @@ class ProgressTracker:
self.transfered_stats_amt = 0
self.transfered_stats_interval = 0.
self.started_at = 0.
self.total_reported_progress = 0
def change_active_file(self, nf: File) -> None:
now = monotonic()
@ -260,7 +263,7 @@ class ProgressTracker:
self.transfers.append(Transfer())
self.started_at = monotonic()
def on_transfer(self, amt: int) -> None:
def on_transmit(self, amt: int) -> None:
if self.active_file is not None:
self.active_file.transmitted_bytes += amt
self.total_transferred += amt
@ -271,12 +274,19 @@ class ProgressTracker:
self.transfered_stats_interval = now - self.transfers[0].at
self.transfered_stats_amt = sum(t.amt for t in self.transfers)
def on_file_progress(self, af: File, delta: int) -> None:
self.total_reported_progress += delta
def on_file_done(self, af: File) -> None:
af.done_at = monotonic()
class SendManager:
def __init__(
self, request_id: str, files: Tuple[File, ...],
bypass: Optional[str] = None, use_rsync: bool = False,
file_progress: Callable[[File, int], None] = lambda f, i: None,
file_done: Callable[[File], None] = lambda f: None,
):
self.use_rsync = use_rsync
@ -293,6 +303,7 @@ class SendManager:
self.suffix = '\x1b\\'
self.progress = ProgressTracker(sum(df.file_size for df in self.files if df.file_size >= 0))
self.file_done = file_done
self.file_progress = file_progress
@property
def active_file(self) -> Optional[File]:
@ -305,7 +316,6 @@ class SendManager:
if self.active_idx is not None:
paf = self.files[self.active_idx]
paf.transmit_ended_at = monotonic()
self.file_done(paf)
for i, f in enumerate(self.files):
if f.state is FileState.transmitting:
self.active_idx = i
@ -365,16 +375,28 @@ class SendManager:
file.state = FileState.waiting_for_data if ftc.ttype is TransmissionType.rsync else FileState.transmitting
if file.state is FileState.waiting_for_data:
file.signature_loader = LoadSignature()
self.update_collective_statuses()
elif ftc.status == 'PROGRESS':
change = ftc.size - file.reported_progress
file.reported_progress = ftc.size
self.progress.on_file_progress(file, change)
self.file_progress(file, change)
else:
if ftc.name and not file.remote_final_path:
file.remote_final_path = ftc.name
file.state = FileState.acknowledged
if ftc.status != 'OK':
if ftc.status == 'OK':
change = ftc.size - file.reported_progress
file.reported_progress = ftc.size
self.progress.on_file_progress(file, change)
self.file_progress(file, change)
else:
file.err_msg = ftc.status
self.progress.on_file_done(file)
self.file_done(file)
if self.active_idx is not None and file is self.files[self.active_idx]:
self.file_done(self.files[self.active_idx])
self.active_idx = None
self.update_collective_statuses()
self.update_collective_statuses()
def on_signature_data_received(self, ftc: FileTransmissionCommand) -> None:
file = self.fid_map.get(ftc.file_id)
@ -404,7 +426,8 @@ class Send(Handler):
def __init__(self, cli_opts: TransferCLIOptions, files: Tuple[File, ...]):
Handler.__init__(self)
self.manager = SendManager(random_id(), files, cli_opts.permissions_bypass, cli_opts.transmit_deltas, self.on_file_done)
self.manager = SendManager(
random_id(), files, cli_opts.permissions_bypass, cli_opts.transmit_deltas, self.on_file_progress, self.on_file_done)
self.cli_opts = cli_opts
self.transmit_started = False
self.file_metadata_sent = False
@ -417,6 +440,7 @@ class Send(Handler):
self.done_files: List[File] = []
self.failed_files: List[File] = []
self.transmit_ok_checked = False
self.progress_update_call: Optional[TimerHandle] = None
def send_payload(self, payload: str) -> None:
self.write(self.manager.prefix)
@ -527,7 +551,7 @@ class Send(Handler):
def on_writing_finished(self) -> None:
chunk_transmitted = self.manager.current_chunk_uncompressed_sz is not None
if chunk_transmitted:
self.manager.progress.on_transfer(self.manager.current_chunk_uncompressed_sz or 0)
self.manager.progress.on_transmit(self.manager.current_chunk_uncompressed_sz or 0)
self.manager.current_chunk_uncompressed_sz = None
if self.quit_after_write_code is not None:
self.quit_loop(self.quit_after_write_code)
@ -602,11 +626,21 @@ class Send(Handler):
self.cmd.clear_to_end_of_screen()
self.progress_drawn = False
def schedule_progress_update(self, delay: float = 0.1) -> None:
if self.progress_update_call is None:
self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress)
elif self.asyncio_loop.time() + delay < self.progress_update_call.when():
self.progress_update_call.cancel()
self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress)
def on_file_progress(self, file: File, change: int) -> None:
self.schedule_progress_update()
def on_file_done(self, file: File) -> None:
self.done_files.append(file)
if file.err_msg:
self.failed_files.append(file)
self.asyncio_loop.call_soon(self.refresh_progress)
self.schedule_progress_update()
@Handler.atomic_update
def draw_progress(self) -> None:
@ -631,12 +665,12 @@ class Send(Handler):
self.print()
self.render_progress(
'Total', spinner_char=sc,
bytes_so_far=p.total_transferred, total_bytes=p.total_bytes_to_transfer,
bytes_so_far=p.total_reported_progress, total_bytes=p.total_bytes_to_transfer,
secs_so_far=now - p.started_at, is_complete=is_complete,
bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval)
)
self.print()
self.asyncio_loop.call_later(self.spinner.interval, self.refresh_progress)
self.schedule_progress_update(self.spinner.interval)
self.progress_drawn = True
def refresh_progress(self) -> None:
@ -648,8 +682,8 @@ class Send(Handler):
now = monotonic()
self.render_progress(
af.display_name, spinner_char=spinner_char, is_complete=is_complete,
bytes_so_far=af.transmitted_bytes, total_bytes=af.bytes_to_transmit,
secs_so_far=(af.transmit_ended_at or now) - af.transmit_started_at,
bytes_so_far=af.reported_progress, total_bytes=af.bytes_to_transmit,
secs_so_far=(af.done_at or now) - af.transmit_started_at,
bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval)
)

View File

@ -15,7 +15,7 @@ from functools import partial
from gettext import gettext as _
from time import monotonic
from typing import (
IO, Any, Callable, Deque, Dict, Iterator, List, Optional, Union
IO, Any, Callable, Deque, Dict, Iterator, List, Optional, Union, cast
)
from kittens.transfer.librsync import PatchFile, signature_of_file
@ -105,7 +105,7 @@ class TransmissionType(NameReprEnum):
rsync = auto()
ErrorCode = Enum('ErrorCode', 'OK STARTED CANCELED EINVAL EPERM EISDIR')
ErrorCode = Enum('ErrorCode', 'OK STARTED CANCELED PROGRESS EINVAL EPERM EISDIR')
class TransmissionError(Exception):
@ -290,6 +290,7 @@ class DestFile:
self.closed = self.ftype is FileType.directory
self.actual_file: Union[PatchFile, IO[bytes], None] = None
self.failed = False
self.bytes_written = 0
def __repr__(self) -> str:
return f'DestFile(name={self.name}, file_id={self.file_id}, actual_file={self.actual_file})'
@ -335,6 +336,7 @@ class DestFile:
raise TransmissionError(file_id=self.file_id, msg='Cannot write to a closed file')
if self.ftype in (FileType.symlink, FileType.link):
self.link_target += data
self.bytes_written += len(data)
if is_last:
lt = self.link_target.decode('utf-8', 'replace')
base = self.make_parent_dirs()
@ -374,7 +376,9 @@ class DestFile:
self.unlink_existing_if_needed()
flags = os.O_RDWR | os.O_CREAT | os.O_TRUNC | getattr(os, 'O_CLOEXEC', 0) | getattr(os, 'O_BINARY', 0)
self.actual_file = open(os.open(self.name, flags, self.permissions), mode='r+b', closefd=True)
self.actual_file.write(data) # type: ignore
af = cast(Union[IO[bytes], PatchFile], self.actual_file)
af.write(data)
self.bytes_written = af.tell()
if is_last:
self.close()
self.apply_metadata()
@ -567,8 +571,13 @@ class FileTransmission:
df = ar.add_data(cmd)
if df.failed:
return
if df.closed and ar.send_acknowledgements:
self.send_status_response(code=ErrorCode.OK, request_id=ar.id, file_id=df.file_id, name=df.name)
if ar.send_acknowledgements:
if df.closed:
self.send_status_response(
code=ErrorCode.OK, request_id=ar.id, file_id=df.file_id, name=df.name, size=df.bytes_written)
else:
self.send_status_response(
code=ErrorCode.PROGRESS, request_id=ar.id, file_id=df.file_id, size=df.bytes_written)
except TransmissionError as err:
if ar.send_errors:
self.send_transmission_error(ar.id, err)