diff --git a/kittens/transfer/send.py b/kittens/transfer/send.py index 40c842fde..25da0c76a 100644 --- a/kittens/transfer/send.py +++ b/kittens/transfer/send.py @@ -5,6 +5,7 @@ import os import stat +from asyncio import TimerHandle from collections import deque from enum import auto from itertools import count @@ -81,7 +82,8 @@ class File: self.err_msg = '' self.actual_file: Optional[IO[bytes]] = None self.transmitted_bytes = 0 - self.transmit_started_at = self.transmit_ended_at = 0. + self.reported_progress = 0 + self.transmit_started_at = self.transmit_ended_at = self.done_at = 0. self.signature_loader: Optional[LoadSignature] = None self.delta_loader: Optional[Iterator[memoryview]] = None @@ -250,6 +252,7 @@ class ProgressTracker: self.transfered_stats_amt = 0 self.transfered_stats_interval = 0. self.started_at = 0. + self.total_reported_progress = 0 def change_active_file(self, nf: File) -> None: now = monotonic() @@ -260,7 +263,7 @@ class ProgressTracker: self.transfers.append(Transfer()) self.started_at = monotonic() - def on_transfer(self, amt: int) -> None: + def on_transmit(self, amt: int) -> None: if self.active_file is not None: self.active_file.transmitted_bytes += amt self.total_transferred += amt @@ -271,12 +274,19 @@ class ProgressTracker: self.transfered_stats_interval = now - self.transfers[0].at self.transfered_stats_amt = sum(t.amt for t in self.transfers) + def on_file_progress(self, af: File, delta: int) -> None: + self.total_reported_progress += delta + + def on_file_done(self, af: File) -> None: + af.done_at = monotonic() + class SendManager: def __init__( self, request_id: str, files: Tuple[File, ...], bypass: Optional[str] = None, use_rsync: bool = False, + file_progress: Callable[[File, int], None] = lambda f, i: None, file_done: Callable[[File], None] = lambda f: None, ): self.use_rsync = use_rsync @@ -293,6 +303,7 @@ class SendManager: self.suffix = '\x1b\\' self.progress = ProgressTracker(sum(df.file_size for df in self.files if df.file_size >= 0)) self.file_done = file_done + self.file_progress = file_progress @property def active_file(self) -> Optional[File]: @@ -305,7 +316,6 @@ class SendManager: if self.active_idx is not None: paf = self.files[self.active_idx] paf.transmit_ended_at = monotonic() - self.file_done(paf) for i, f in enumerate(self.files): if f.state is FileState.transmitting: self.active_idx = i @@ -365,16 +375,28 @@ class SendManager: file.state = FileState.waiting_for_data if ftc.ttype is TransmissionType.rsync else FileState.transmitting if file.state is FileState.waiting_for_data: file.signature_loader = LoadSignature() + self.update_collective_statuses() + elif ftc.status == 'PROGRESS': + change = ftc.size - file.reported_progress + file.reported_progress = ftc.size + self.progress.on_file_progress(file, change) + self.file_progress(file, change) else: if ftc.name and not file.remote_final_path: file.remote_final_path = ftc.name file.state = FileState.acknowledged - if ftc.status != 'OK': + if ftc.status == 'OK': + change = ftc.size - file.reported_progress + file.reported_progress = ftc.size + self.progress.on_file_progress(file, change) + self.file_progress(file, change) + else: file.err_msg = ftc.status + self.progress.on_file_done(file) + self.file_done(file) if self.active_idx is not None and file is self.files[self.active_idx]: - self.file_done(self.files[self.active_idx]) self.active_idx = None - self.update_collective_statuses() + self.update_collective_statuses() def on_signature_data_received(self, ftc: FileTransmissionCommand) -> None: file = self.fid_map.get(ftc.file_id) @@ -404,7 +426,8 @@ class Send(Handler): def __init__(self, cli_opts: TransferCLIOptions, files: Tuple[File, ...]): Handler.__init__(self) - self.manager = SendManager(random_id(), files, cli_opts.permissions_bypass, cli_opts.transmit_deltas, self.on_file_done) + self.manager = SendManager( + random_id(), files, cli_opts.permissions_bypass, cli_opts.transmit_deltas, self.on_file_progress, self.on_file_done) self.cli_opts = cli_opts self.transmit_started = False self.file_metadata_sent = False @@ -417,6 +440,7 @@ class Send(Handler): self.done_files: List[File] = [] self.failed_files: List[File] = [] self.transmit_ok_checked = False + self.progress_update_call: Optional[TimerHandle] = None def send_payload(self, payload: str) -> None: self.write(self.manager.prefix) @@ -527,7 +551,7 @@ class Send(Handler): def on_writing_finished(self) -> None: chunk_transmitted = self.manager.current_chunk_uncompressed_sz is not None if chunk_transmitted: - self.manager.progress.on_transfer(self.manager.current_chunk_uncompressed_sz or 0) + self.manager.progress.on_transmit(self.manager.current_chunk_uncompressed_sz or 0) self.manager.current_chunk_uncompressed_sz = None if self.quit_after_write_code is not None: self.quit_loop(self.quit_after_write_code) @@ -602,11 +626,21 @@ class Send(Handler): self.cmd.clear_to_end_of_screen() self.progress_drawn = False + def schedule_progress_update(self, delay: float = 0.1) -> None: + if self.progress_update_call is None: + self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress) + elif self.asyncio_loop.time() + delay < self.progress_update_call.when(): + self.progress_update_call.cancel() + self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress) + + def on_file_progress(self, file: File, change: int) -> None: + self.schedule_progress_update() + def on_file_done(self, file: File) -> None: self.done_files.append(file) if file.err_msg: self.failed_files.append(file) - self.asyncio_loop.call_soon(self.refresh_progress) + self.schedule_progress_update() @Handler.atomic_update def draw_progress(self) -> None: @@ -631,12 +665,12 @@ class Send(Handler): self.print() self.render_progress( 'Total', spinner_char=sc, - bytes_so_far=p.total_transferred, total_bytes=p.total_bytes_to_transfer, + bytes_so_far=p.total_reported_progress, total_bytes=p.total_bytes_to_transfer, secs_so_far=now - p.started_at, is_complete=is_complete, bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval) ) self.print() - self.asyncio_loop.call_later(self.spinner.interval, self.refresh_progress) + self.schedule_progress_update(self.spinner.interval) self.progress_drawn = True def refresh_progress(self) -> None: @@ -648,8 +682,8 @@ class Send(Handler): now = monotonic() self.render_progress( af.display_name, spinner_char=spinner_char, is_complete=is_complete, - bytes_so_far=af.transmitted_bytes, total_bytes=af.bytes_to_transmit, - secs_so_far=(af.transmit_ended_at or now) - af.transmit_started_at, + bytes_so_far=af.reported_progress, total_bytes=af.bytes_to_transmit, + secs_so_far=(af.done_at or now) - af.transmit_started_at, bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval) ) diff --git a/kitty/file_transmission.py b/kitty/file_transmission.py index ac071779e..105ef9448 100644 --- a/kitty/file_transmission.py +++ b/kitty/file_transmission.py @@ -15,7 +15,7 @@ from functools import partial from gettext import gettext as _ from time import monotonic from typing import ( - IO, Any, Callable, Deque, Dict, Iterator, List, Optional, Union + IO, Any, Callable, Deque, Dict, Iterator, List, Optional, Union, cast ) from kittens.transfer.librsync import PatchFile, signature_of_file @@ -105,7 +105,7 @@ class TransmissionType(NameReprEnum): rsync = auto() -ErrorCode = Enum('ErrorCode', 'OK STARTED CANCELED EINVAL EPERM EISDIR') +ErrorCode = Enum('ErrorCode', 'OK STARTED CANCELED PROGRESS EINVAL EPERM EISDIR') class TransmissionError(Exception): @@ -290,6 +290,7 @@ class DestFile: self.closed = self.ftype is FileType.directory self.actual_file: Union[PatchFile, IO[bytes], None] = None self.failed = False + self.bytes_written = 0 def __repr__(self) -> str: return f'DestFile(name={self.name}, file_id={self.file_id}, actual_file={self.actual_file})' @@ -335,6 +336,7 @@ class DestFile: raise TransmissionError(file_id=self.file_id, msg='Cannot write to a closed file') if self.ftype in (FileType.symlink, FileType.link): self.link_target += data + self.bytes_written += len(data) if is_last: lt = self.link_target.decode('utf-8', 'replace') base = self.make_parent_dirs() @@ -374,7 +376,9 @@ class DestFile: self.unlink_existing_if_needed() flags = os.O_RDWR | os.O_CREAT | os.O_TRUNC | getattr(os, 'O_CLOEXEC', 0) | getattr(os, 'O_BINARY', 0) self.actual_file = open(os.open(self.name, flags, self.permissions), mode='r+b', closefd=True) - self.actual_file.write(data) # type: ignore + af = cast(Union[IO[bytes], PatchFile], self.actual_file) + af.write(data) + self.bytes_written = af.tell() if is_last: self.close() self.apply_metadata() @@ -567,8 +571,13 @@ class FileTransmission: df = ar.add_data(cmd) if df.failed: return - if df.closed and ar.send_acknowledgements: - self.send_status_response(code=ErrorCode.OK, request_id=ar.id, file_id=df.file_id, name=df.name) + if ar.send_acknowledgements: + if df.closed: + self.send_status_response( + code=ErrorCode.OK, request_id=ar.id, file_id=df.file_id, name=df.name, size=df.bytes_written) + else: + self.send_status_response( + code=ErrorCode.PROGRESS, request_id=ar.id, file_id=df.file_id, size=df.bytes_written) except TransmissionError as err: if ar.send_errors: self.send_transmission_error(ar.id, err)