More work on file transfer
This commit is contained in:
parent
60b8023928
commit
69e54cb9c1
@ -3,10 +3,12 @@
|
||||
|
||||
import os
|
||||
import posixpath
|
||||
from collections import deque
|
||||
from contextlib import suppress
|
||||
from enum import auto
|
||||
from itertools import count
|
||||
from typing import Dict, Iterator, List, Optional
|
||||
from time import monotonic
|
||||
from typing import Deque, Dict, Iterator, List, Optional
|
||||
|
||||
from kitty.cli_stub import TransferCLIOptions
|
||||
from kitty.fast_data_types import FILE_TRANSFER_CODE
|
||||
@ -21,6 +23,7 @@ from ..tui.handler import Handler
|
||||
from ..tui.loop import Loop, debug
|
||||
from ..tui.operations import styled, without_line_wrap
|
||||
from ..tui.utils import human_size
|
||||
from .send import Transfer
|
||||
from .utils import expand_home, random_id, should_be_compressed
|
||||
|
||||
debug
|
||||
@ -38,6 +41,8 @@ class File:
|
||||
|
||||
def __init__(self, ftc: FileTransmissionCommand):
|
||||
self.expected_size = ftc.size
|
||||
self.transmit_started_at = self.done_at = 0.
|
||||
self.transmitted_bytes = 0
|
||||
self.ftype = ftc.ftype
|
||||
self.mtime = ftc.mtime
|
||||
self.spec_id = int(ftc.file_id)
|
||||
@ -56,10 +61,10 @@ class File:
|
||||
def __repr__(self) -> str:
|
||||
return f'File(rpath={self.remote_path!r}, lpath={self.expanded_local_path!r})'
|
||||
|
||||
def write_data(self, data: bytes, is_last: bool) -> None:
|
||||
def write_data(self, data: bytes, is_last: bool) -> int:
|
||||
if self.ftype is FileType.symlink:
|
||||
self.remote_symlink_value += data
|
||||
return
|
||||
return 0
|
||||
if self.ftype is FileType.regular:
|
||||
if not self.local_write_started:
|
||||
parent = os.path.dirname(self.expanded_local_path)
|
||||
@ -67,7 +72,10 @@ class File:
|
||||
os.makedirs(parent, exist_ok=True)
|
||||
self.local_write_started = True
|
||||
with open(self.expanded_local_path, 'ab') as f:
|
||||
base = f.tell()
|
||||
f.write(data)
|
||||
return f.tell() - base
|
||||
return 0
|
||||
|
||||
def apply_metadata(self) -> None:
|
||||
if self.ftype is FileType.symlink:
|
||||
@ -152,6 +160,42 @@ def files_for_receive(cli_opts: TransferCLIOptions, dest: str, files: List[File]
|
||||
yield x.entry
|
||||
|
||||
|
||||
class ProgressTracker:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.total_size_of_all_files = 0
|
||||
self.total_bytes_to_transfer = 0
|
||||
self.active_file: Optional[File] = None
|
||||
self.total_transferred = 0
|
||||
self.transfers: Deque[Transfer] = deque()
|
||||
self.transfered_stats_amt = 0
|
||||
self.transfered_stats_interval = 0.
|
||||
self.started_at = 0.
|
||||
self.signature_bytes = 0
|
||||
|
||||
def change_active_file(self, nf: File) -> None:
|
||||
now = monotonic()
|
||||
self.active_file = nf
|
||||
nf.transmit_started_at = now
|
||||
|
||||
def start_transfer(self) -> None:
|
||||
self.transfers.append(Transfer())
|
||||
self.started_at = monotonic()
|
||||
|
||||
def file_written(self, af: File, amt: int, is_done: bool) -> None:
|
||||
self.active_file = af
|
||||
af.transmitted_bytes += amt
|
||||
self.total_transferred += amt
|
||||
self.transfers.append(Transfer(amt))
|
||||
now = self.transfers[-1].at
|
||||
while len(self.transfers) > 2 and self.transfers[0].is_too_old(now):
|
||||
self.transfers.popleft()
|
||||
self.transfered_stats_interval = now - self.transfers[0].at
|
||||
self.transfered_stats_amt = sum(t.amt for t in self.transfers)
|
||||
if is_done:
|
||||
af.done_at = monotonic()
|
||||
|
||||
|
||||
class Manager:
|
||||
|
||||
def __init__(
|
||||
@ -169,13 +213,14 @@ class Manager:
|
||||
self.suffix = '\x1b\\'
|
||||
self.state = State.waiting_for_permission
|
||||
self.files: List[File] = []
|
||||
self.total_transfer_size = 0
|
||||
self.progress_tracker = ProgressTracker()
|
||||
self.transfer_done = False
|
||||
|
||||
def start_transfer(self) -> Iterator[str]:
|
||||
yield FileTransmissionCommand(action=Action.receive, bypass=self.bypass, size=len(self.spec)).serialize()
|
||||
for i, x in enumerate(self.spec):
|
||||
yield FileTransmissionCommand(action=Action.file, file_id=str(i), name=x).serialize()
|
||||
self.progress_tracker.start_transfer()
|
||||
|
||||
def finalize_transfer(self) -> str:
|
||||
self.transfer_done = True
|
||||
@ -221,7 +266,8 @@ class Manager:
|
||||
|
||||
def collect_files(self, cli_opts: TransferCLIOptions) -> None:
|
||||
self.files = list(files_for_receive(cli_opts, self.dest, self.files, self.remote_home, self.spec))
|
||||
self.total_transfer_size = sum(max(0, f.expected_size) for f in self.files)
|
||||
self.progress_tracker.total_size_of_all_files = sum(max(0, f.expected_size) for f in self.files)
|
||||
self.progress_tracker.total_bytes_to_transfer = self.progress_tracker.total_size_of_all_files
|
||||
self.fid_map = {f.file_id: f for f in self.files}
|
||||
|
||||
def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> str:
|
||||
@ -266,11 +312,13 @@ class Manager:
|
||||
f = self.fid_map.get(ftc.file_id)
|
||||
if f is None:
|
||||
return f'Got data for unknown file id: {ftc.file_id}'
|
||||
is_last = ftc.action is Action.end_data
|
||||
try:
|
||||
f.write_data(ftc.data, ftc.action is Action.end_data)
|
||||
amt_written = f.write_data(ftc.data, is_last)
|
||||
except OSError as err:
|
||||
return str(err)
|
||||
if ftc.action is Action.end_data:
|
||||
self.progress_tracker.file_written(f, amt_written, is_last)
|
||||
if is_last:
|
||||
del self.fid_map[ftc.file_id]
|
||||
if not self.fid_map:
|
||||
return self.finalize_transfer()
|
||||
@ -353,7 +401,7 @@ class Receive(Handler):
|
||||
self.print(df.display_name, '→', end=' ')
|
||||
self.cmd.styled(df.expanded_local_path, fg='red' if os.path.lexists(df.expanded_local_path) else None)
|
||||
self.print()
|
||||
self.print(f'Transferring {len(self.manager.files)} file(s) of total size: {human_size(self.manager.total_transfer_size)}')
|
||||
self.print(f'Transferring {len(self.manager.files)} file(s) of total size: {human_size(self.manager.progress_tracker.total_size_of_all_files)}')
|
||||
self.print()
|
||||
self.print_continue_msg()
|
||||
|
||||
@ -417,6 +465,8 @@ class Receive(Handler):
|
||||
|
||||
@Handler.atomic_update
|
||||
def draw_progress(self) -> None:
|
||||
if self.manager.state is State.canceled:
|
||||
return
|
||||
with without_line_wrap(self.write):
|
||||
pass
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user