2021-11-24 08:41:16 +05:30

752 lines
31 KiB
Python

#!/usr/bin/env python
# License: GPLv3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net>
import os
import stat
from asyncio import TimerHandle
from collections import deque
from enum import auto
from itertools import count
from time import monotonic
from typing import (
IO, Callable, Deque, Dict, Iterable, Iterator, List, Optional, Sequence,
Set, Tuple, Union
)
from kitty.cli_stub import TransferCLIOptions
from kitty.fast_data_types import FILE_TRANSFER_CODE, wcswidth
from kitty.file_transmission import (
Action, Compression, FileTransmissionCommand, FileType, NameReprEnum,
TransmissionType, encode_bypass, split_for_transfer
)
from kitty.typing import KeyEventType, ScreenSize
from kitty.utils import sanitize_control_codes
from ..tui.handler import Handler
from ..tui.loop import Loop, debug
from ..tui.operations import styled, without_line_wrap
from ..tui.spinners import Spinner
from ..tui.utils import human_size
from .librsync import LoadSignature, delta_for_file
from .utils import (
IdentityCompressor, ZlibCompressor, abspath, expand_home, home_path,
print_rsync_stats, random_id, render_progress_in_width, safe_divide,
should_be_compressed
)
debug
def get_remote_path(local_path: str, remote_base: str) -> str:
if not remote_base:
return local_path.replace(os.sep, '/')
if remote_base.endswith('/'):
return os.path.join(remote_base, os.path.basename(local_path))
return remote_base
class FileState(NameReprEnum):
waiting_for_start = auto()
waiting_for_data = auto()
transmitting = auto()
finished = auto()
acknowledged = auto()
class File:
def __init__(
self, local_path: str, expanded_local_path: str, file_id: int, stat_result: os.stat_result,
remote_base: str, file_type: FileType,
) -> None:
self.state = FileState.waiting_for_start
self.local_path = local_path
self.display_name = sanitize_control_codes(local_path)
self.expanded_local_path = expanded_local_path
self.permissions = stat.S_IMODE(stat_result.st_mode)
self.mtime = stat_result.st_mtime_ns
self.file_size = self.bytes_to_transmit = stat_result.st_size
self.file_hash = stat_result.st_dev, stat_result.st_ino
self.remote_path = get_remote_path(self.local_path, remote_base)
self.remote_path = self.remote_path.replace(os.sep, '/')
self.file_id = hex(file_id)[2:]
self.hard_link_target = ''
self.symbolic_link_target = ''
self.stat_result = stat_result
self.file_type = file_type
self.rsync_capable = self.file_type is FileType.regular and self.file_size > 4096
self.compression_capable = self.file_type is FileType.regular and self.file_size > 4096 and should_be_compressed(self.expanded_local_path)
self.remote_final_path = ''
self.remote_initial_size = -1
self.err_msg = ''
self.actual_file: Optional[IO[bytes]] = None
self.transmitted_bytes = 0
self.reported_progress = 0
self.transmit_started_at = self.transmit_ended_at = self.done_at = 0.
self.signature_loader: Optional[LoadSignature] = None
self.delta_loader: Optional[Iterator[memoryview]] = None
def start_delta_calculation(self) -> None:
sl = self.signature_loader
assert sl is not None
self.state = FileState.transmitting
self.delta_loader = delta_for_file(self.expanded_local_path, sl.signature)
def __repr__(self) -> str:
return f'File(name={self.display_name}, ft={self.file_type}, state={self.state})'
def next_chunk(self, sz: int = 1024 * 1024) -> Tuple[bytes, int]:
if self.file_type is FileType.symlink:
self.state = FileState.finished
ans = self.symbolic_link_target.encode('utf-8')
return ans, len(ans)
if self.file_type is FileType.link:
self.state = FileState.finished
ans = self.hard_link_target.encode('utf-8')
return ans, len(ans)
is_last = False
if self.delta_loader is not None:
try:
chunk: Union[bytes, memoryview] = next(self.delta_loader)
except StopIteration:
is_last = True
self.delta_loader = None
chunk = b''
else:
if self.actual_file is None:
self.actual_file = open(self.expanded_local_path, 'rb')
chunk = self.actual_file.read(sz)
is_last = not chunk or self.actual_file.tell() >= self.file_size
uncompressed_sz = len(chunk)
cchunk = self.compressor.compress(chunk)
if is_last and not isinstance(self.compressor, IdentityCompressor):
cchunk += self.compressor.flush()
if is_last:
self.state = FileState.finished
if self.actual_file is not None:
self.actual_file.close()
self.actual_file = None
return cchunk, uncompressed_sz
def metadata_command(self, use_rsync: bool = False) -> FileTransmissionCommand:
self.ttype = TransmissionType.rsync if self.rsync_capable and use_rsync else TransmissionType.simple
self.compression = Compression.zlib if self.compression_capable else Compression.none
self.compressor: Union[ZlibCompressor, IdentityCompressor] = ZlibCompressor() if self.compression is Compression.zlib else IdentityCompressor()
return FileTransmissionCommand(
action=Action.file, compression=self.compression, ftype=self.file_type,
name=self.remote_path, permissions=self.permissions, mtime=self.mtime,
file_id=self.file_id, ttype=self.ttype
)
def process(cli_opts: TransferCLIOptions, paths: Iterable[str], remote_base: str, counter: Iterator[int]) -> Iterator[File]:
for x in paths:
expanded = expand_home(x)
try:
s = os.stat(expanded, follow_symlinks=False)
except OSError as err:
raise SystemExit(f'Failed to stat {x} with error: {err}') from err
if stat.S_ISDIR(s.st_mode):
yield File(x, expanded, next(counter), s, remote_base, FileType.directory)
new_remote_base = remote_base
if new_remote_base:
new_remote_base = new_remote_base.rstrip('/') + '/' + os.path.basename(x) + '/'
else:
new_remote_base = x.replace(os.sep, '/').rstrip('/') + '/'
yield from process(cli_opts, [os.path.join(x, y) for y in os.listdir(expanded)], new_remote_base, counter)
elif stat.S_ISLNK(s.st_mode):
yield File(x, expanded, next(counter), s, remote_base, FileType.symlink)
elif stat.S_ISREG(s.st_mode):
yield File(x, expanded, next(counter), s, remote_base, FileType.regular)
def process_mirrored_files(cli_opts: TransferCLIOptions, args: Sequence[str]) -> Iterator[File]:
paths = [abspath(x) for x in args]
try:
common_path = os.path.commonpath(paths)
except ValueError:
common_path = ''
home = home_path().rstrip(os.sep)
if common_path and common_path.startswith(home + os.sep):
paths = [os.path.join('~', os.path.relpath(x, home)) for x in paths]
yield from process(cli_opts, paths, '', count(1))
def process_normal_files(cli_opts: TransferCLIOptions, args: Sequence[str]) -> Iterator[File]:
if len(args) < 2:
raise SystemExit('Must specify at least one local path and one remote path')
args = list(args)
remote_base = args.pop().replace(os.sep, '/')
if len(args) > 1 and not remote_base.endswith('/'):
remote_base += '/'
paths = [abspath(x) for x in args]
yield from process(cli_opts, paths, remote_base, count(1))
def files_for_send(cli_opts: TransferCLIOptions, args: List[str]) -> Tuple[File, ...]:
if cli_opts.mode == 'mirror':
files = list(process_mirrored_files(cli_opts, args))
else:
files = list(process_normal_files(cli_opts, args))
groups: Dict[Tuple[int, int], List[File]] = {}
# detect hard links
for f in files:
groups.setdefault(f.file_hash, []).append(f)
for group in groups.values():
if len(group) > 1:
for lf in group[1:]:
lf.file_type = FileType.link
lf.hard_link_target = group[0].file_id
# detect symlinks to other transferred files
for f in tuple(files):
if f.file_type is FileType.symlink:
try:
link_dest = os.readlink(f.local_path)
except OSError:
files.remove(f)
continue
f.symbolic_link_target = f'path:{link_dest}'
is_abs = os.path.isabs(link_dest)
q = link_dest if is_abs else os.path.join(os.path.dirname(f.local_path), link_dest)
try:
st = os.stat(q)
except OSError:
pass
else:
fh = st.st_dev, st.st_ino
if fh in groups:
g = tuple(x for x in groups[fh] if os.path.samestat(st, x.stat_result))
if g:
t = g[0]
prefix = 'fid_abs' if is_abs else 'fid'
f.symbolic_link_target = f'{prefix}:{t.file_id}'
return tuple(files)
class SendState(NameReprEnum):
waiting_for_permission = auto()
permission_granted = auto()
permission_denied = auto()
canceled = auto()
class Transfer:
def __init__(self, amt: int = 0):
self.amt = amt
self.at = monotonic()
def is_too_old(self, now: float) -> bool:
return now - self.at > 30
class ProgressTracker:
def __init__(self, total_size_of_all_files: int):
self.total_size_of_all_files = total_size_of_all_files
self.total_bytes_to_transfer = total_size_of_all_files
self.active_file: Optional[File] = None
self.total_transferred = 0
self.transfers: Deque[Transfer] = deque()
self.transfered_stats_amt = 0
self.transfered_stats_interval = 0.
self.started_at = 0.
self.signature_bytes = 0
self.total_reported_progress = 0
def change_active_file(self, nf: File) -> None:
now = monotonic()
self.active_file = nf
nf.transmit_started_at = now
def start_transfer(self) -> None:
self.transfers.append(Transfer())
self.started_at = monotonic()
def on_transmit(self, amt: int) -> None:
if self.active_file is not None:
self.active_file.transmitted_bytes += amt
self.total_transferred += amt
self.transfers.append(Transfer(amt))
now = self.transfers[-1].at
while len(self.transfers) > 2 and self.transfers[0].is_too_old(now):
self.transfers.popleft()
self.transfered_stats_interval = now - self.transfers[0].at
self.transfered_stats_amt = sum(t.amt for t in self.transfers)
def on_file_progress(self, af: File, delta: int) -> None:
if delta > 0:
self.total_reported_progress += delta
def on_file_done(self, af: File) -> None:
af.done_at = monotonic()
class SendManager:
def __init__(
self, request_id: str, files: Tuple[File, ...],
bypass: Optional[str] = None, use_rsync: bool = False,
file_progress: Callable[[File, int], None] = lambda f, i: None,
file_done: Callable[[File], None] = lambda f: None,
):
self.use_rsync = use_rsync
self.files = files
self.bypass = encode_bypass(request_id, bypass) if bypass else ''
self.fid_map = {f.file_id: f for f in self.files}
self.request_id = request_id
self.state = SendState.waiting_for_permission
self.all_acknowledged = self.all_started = self.has_transmitting = self.has_rsync = False
self.active_idx: Optional[int] = None
self.current_chunk_uncompressed_sz: Optional[int] = None
self.prefix = f'\x1b]{FILE_TRANSFER_CODE};id={self.request_id};'
self.suffix = '\x1b\\'
self.progress = ProgressTracker(sum(df.file_size for df in self.files if df.file_size >= 0))
self.file_done = file_done
self.file_progress = file_progress
self.last_progress_file: Optional[File] = None
@property
def active_file(self) -> Optional[File]:
if self.active_idx is not None:
ans = self.files[self.active_idx]
if ans.state is FileState.transmitting:
return ans
return None
def activate_next_ready_file(self) -> Optional[File]:
if self.active_idx is not None:
paf = self.files[self.active_idx]
paf.transmit_ended_at = monotonic()
for i, f in enumerate(self.files):
if f.state is FileState.transmitting:
self.active_idx = i
self.update_collective_statuses()
self.progress.change_active_file(f)
return f
self.active_idx = None
self.update_collective_statuses()
return None
def update_collective_statuses(self) -> None:
found_not_started = found_not_done = False
has_rsync = has_transmitting = False
for f in self.files:
if f.state is not FileState.acknowledged:
found_not_done = True
if f.state is FileState.waiting_for_start:
found_not_started = True
elif f.state is FileState.transmitting:
has_transmitting = True
if f.ttype is TransmissionType.rsync:
has_rsync = True
self.all_acknowledged = not found_not_done
self.all_started = not found_not_started
self.has_rsync = has_rsync
self.has_transmitting = has_transmitting
def start_transfer(self) -> str:
return FileTransmissionCommand(action=Action.send, bypass=self.bypass).serialize()
def next_chunks(self) -> Iterator[str]:
if self.active_file is None:
self.activate_next_ready_file()
af = self.active_file
if af is None:
return
chunk = b''
self.current_chunk_uncompressed_sz = 0
while af.state is not FileState.finished and not chunk:
chunk, usz = af.next_chunk()
self.current_chunk_uncompressed_sz += usz
is_last = af.state is FileState.finished
if len(chunk):
for ftc in split_for_transfer(chunk, file_id=af.file_id, mark_last=is_last):
yield ftc.serialize()
elif is_last:
yield FileTransmissionCommand(action=Action.end_data, file_id=af.file_id, data=b'').serialize()
def send_file_metadata(self) -> Iterator[str]:
for f in self.files:
yield f.metadata_command(self.use_rsync).serialize()
def on_file_status_update(self, ftc: FileTransmissionCommand) -> None:
file = self.fid_map.get(ftc.file_id)
if file is None:
return
if ftc.status == 'STARTED':
file.remote_final_path = ftc.name
file.remote_initial_size = ftc.size
if file.file_type is FileType.directory:
file.state = FileState.finished
else:
file.state = FileState.waiting_for_data if ftc.ttype is TransmissionType.rsync else FileState.transmitting
if file.state is FileState.waiting_for_data:
file.signature_loader = LoadSignature()
self.update_collective_statuses()
elif ftc.status == 'PROGRESS':
self.last_progress_file = file
change = ftc.size - file.reported_progress
file.reported_progress = ftc.size
self.progress.on_file_progress(file, change)
self.file_progress(file, change)
else:
if ftc.name and not file.remote_final_path:
file.remote_final_path = ftc.name
file.state = FileState.acknowledged
if ftc.status == 'OK':
if ftc.size > 0:
change = ftc.size - file.reported_progress
file.reported_progress = ftc.size
self.progress.on_file_progress(file, change)
self.file_progress(file, change)
else:
file.err_msg = ftc.status
self.progress.on_file_done(file)
self.file_done(file)
if self.active_idx is not None and file is self.files[self.active_idx]:
self.active_idx = None
self.update_collective_statuses()
def on_signature_data_received(self, ftc: FileTransmissionCommand) -> None:
file = self.fid_map.get(ftc.file_id)
if file is None or file.state is not FileState.waiting_for_data:
return
sl = file.signature_loader
assert sl is not None
sl.add_chunk(ftc.data)
self.progress.signature_bytes += len(ftc.data)
if ftc.action is Action.end_data:
sl.commit()
file.start_delta_calculation()
self.update_collective_statuses()
def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None:
if ftc.action is Action.status:
if ftc.file_id:
self.on_file_status_update(ftc)
else:
self.state = SendState.permission_granted if ftc.status == 'OK' else SendState.permission_denied
elif ftc.action in (Action.data, Action.end_data):
if ftc.file_id:
self.on_signature_data_received(ftc)
class Send(Handler):
use_alternate_screen = False
def __init__(self, cli_opts: TransferCLIOptions, files: Tuple[File, ...]):
Handler.__init__(self)
self.manager = SendManager(
random_id(), files, cli_opts.permissions_bypass, cli_opts.transmit_deltas, self.on_file_progress, self.on_file_done)
self.cli_opts = cli_opts
self.transmit_started = False
self.file_metadata_sent = False
self.quit_after_write_code: Optional[int] = None
self.check_paths_printed = False
names = tuple(x.display_name for x in self.manager.files)
self.max_name_length = max(6, max(map(wcswidth, names)))
self.spinner = Spinner()
self.progress_drawn = True
self.done_files: List[File] = []
self.done_file_ids: Set[str] = set()
self.failed_files: List[File] = []
self.transmit_ok_checked = False
self.progress_update_call: Optional[TimerHandle] = None
def send_payload(self, payload: str) -> None:
self.write(self.manager.prefix)
self.write(payload)
self.write(self.manager.suffix)
def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None:
if ftc.id != self.manager.request_id:
return
if ftc.status == 'CANCELED' and ftc.action is Action.status:
self.quit_loop(1)
return
if self.quit_after_write_code is not None or self.manager.state is SendState.canceled:
return
before = self.manager.state
self.manager.on_file_transfer_response(ftc)
if before == SendState.waiting_for_permission:
if self.manager.state == SendState.permission_denied:
self.cmd.styled('Permission denied for this transfer', fg='red')
self.print()
self.quit_loop(1)
return
if self.manager.state == SendState.permission_granted:
self.cmd.styled('Permission granted for this transfer', fg='green')
self.print()
self.send_file_metadata()
self.asyncio_loop.call_soon(self.loop_tick)
def start_transfer(self) -> None:
if self.manager.active_file is None:
self.manager.activate_next_ready_file()
if self.manager.active_file is not None:
self.transmit_started = True
self.manager.progress.start_transfer()
self.transmit_next_chunk()
self.draw_progress()
def print_check_paths(self) -> None:
if self.check_paths_printed:
return
self.check_paths_printed = True
self.print('The following file transfers will be performed. A red destination means an existing file will be overwritten.')
for df in self.manager.files:
self.cmd.styled(df.file_type.short_text, fg=df.file_type.color)
self.print(end=' ')
self.print(df.display_name, '', end=' ')
self.cmd.styled(df.remote_final_path, fg='red' if df.remote_initial_size > -1 else None)
self.print()
self.print(f'Transferring {len(self.manager.files)} files of total size: {human_size(self.manager.progress.total_bytes_to_transfer)}')
self.print()
self.print_continue_msg()
def print_continue_msg(self) -> None:
self.print(
'Press', styled('y', fg='green', bold=True, fg_intense=True), 'to continue or',
styled('n', fg='red', bold=True, fg_intense=True), 'to abort')
def on_text(self, text: str, in_bracketed_paste: bool = False) -> None:
if self.quit_after_write_code is not None:
return
if self.check_paths_printed and not self.transmit_started:
if text.lower() == 'y':
self.start_transfer()
if self.manager.all_acknowledged:
self.refresh_progress()
self.transfer_finished()
return
if text.lower() == 'n':
del self.failed_files[:]
self.abort_transfer()
self.print('Sending cancel request to terminal')
return
self.print_continue_msg()
def on_key(self, key_event: KeyEventType) -> None:
if self.quit_after_write_code is not None:
return
if key_event.matches('esc'):
if self.check_paths_printed and not self.transmit_started:
del self.failed_files[:]
self.abort_transfer()
self.print('Sending cancel request to terminal')
else:
self.on_interrupt()
def check_for_transmit_ok(self) -> None:
if self.transmit_ok_checked:
return self.start_transfer()
if self.manager.state is not SendState.permission_granted:
return
if self.cli_opts.confirm_paths:
if self.manager.all_started:
self.print_check_paths()
return
self.transmit_ok_checked = True
self.start_transfer()
def transmit_next_chunk(self) -> None:
found_chunk = False
for chunk in self.manager.next_chunks():
self.send_payload(chunk)
found_chunk = True
if not found_chunk:
if self.manager.all_acknowledged:
self.transfer_finished()
def transfer_finished(self) -> None:
self.send_payload(FileTransmissionCommand(action=Action.finish).serialize())
self.quit_after_write_code = 1 if self.failed_files else 0
def on_writing_finished(self) -> None:
chunk_transmitted = self.manager.current_chunk_uncompressed_sz is not None
if chunk_transmitted:
self.manager.progress.on_transmit(self.manager.current_chunk_uncompressed_sz or 0)
self.manager.current_chunk_uncompressed_sz = None
if self.quit_after_write_code is not None:
self.quit_loop(self.quit_after_write_code)
return
if self.manager.state is SendState.permission_granted and (not self.transmit_started or chunk_transmitted):
self.asyncio_loop.call_soon(self.loop_tick)
def loop_tick(self) -> None:
if self.manager.state is SendState.waiting_for_permission:
return
if self.transmit_started:
self.transmit_next_chunk()
self.refresh_progress()
else:
self.check_for_transmit_ok()
def initialize(self) -> None:
self.send_payload(self.manager.start_transfer())
if self.cli_opts.permissions_bypass:
# dont wait for permission, not needed with a bypass and
# avoids a roundtrip
self.send_file_metadata()
self.cmd.set_cursor_visible(False)
def finalize(self) -> None:
self.cmd.set_cursor_visible(True)
def send_file_metadata(self) -> None:
if not self.file_metadata_sent:
for payload in self.manager.send_file_metadata():
self.send_payload(payload)
self.file_metadata_sent = True
def on_term(self) -> None:
if self.quit_after_write_code is not None:
return
self.cmd.styled('Terminate requested, cancelling transfer, transferred files are in undefined state', fg='red')
self.print()
self.abort_transfer(delay=2)
def on_interrupt(self) -> None:
if self.quit_after_write_code is not None:
return
if self.manager.state is SendState.canceled:
self.print('Waiting for canceled acknowledgement from terminal, will abort in a few seconds if no response received')
return
self.cmd.styled('Interrupt requested, cancelling transfer, transferred files are in undefined state', fg='red')
self.print()
self.abort_transfer()
def abort_transfer(self, delay: float = 5) -> None:
self.send_payload(FileTransmissionCommand(action=Action.cancel).serialize())
self.manager.state = SendState.canceled
self.asyncio_loop.call_later(delay, self.quit_loop, 1)
def render_progress(
self, name: str, spinner_char: str = ' ', bytes_so_far: int = 0, total_bytes: int = 0,
secs_so_far: float = 0., bytes_per_sec: float = 0., is_complete: bool = False
) -> None:
if is_complete:
bytes_so_far = total_bytes
self.write(render_progress_in_width(
name, width=self.screen_size.cols, max_path_length=self.max_name_length, spinner_char=spinner_char,
bytes_so_far=bytes_so_far, total_bytes=total_bytes, secs_so_far=secs_so_far,
bytes_per_sec=bytes_per_sec, is_complete=is_complete
))
def erase_progress(self) -> None:
if self.progress_drawn:
self.cmd.move_cursor_by(2, 'up')
self.write('\r')
self.cmd.clear_to_end_of_screen()
self.progress_drawn = False
def schedule_progress_update(self, delay: float = 0.1) -> None:
if self.progress_update_call is None:
self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress)
elif self.asyncio_loop.time() + delay < self.progress_update_call.when():
self.progress_update_call.cancel()
self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress)
def on_file_progress(self, file: File, change: int) -> None:
self.schedule_progress_update()
def on_file_done(self, file: File) -> None:
self.done_files.append(file)
if file.err_msg:
self.failed_files.append(file)
self.schedule_progress_update()
@Handler.atomic_update
def draw_progress(self) -> None:
with without_line_wrap(self.write):
for df in self.done_files:
sc = styled('', fg='green') if not df.err_msg else styled('', fg='red')
if df.file_type is FileType.regular:
self.draw_progress_for_current_file(df, spinner_char=sc, is_complete=True)
else:
self.write(f'{sc} {df.display_name} {styled(df.file_type.name, dim=True, italic=True)}')
self.print()
self.done_file_ids.add(df.file_id)
del self.done_files[:]
is_complete = self.quit_after_write_code is not None
if is_complete:
sc = styled('', fg='green') if self.quit_after_write_code == 0 else styled('', fg='red')
else:
sc = self.spinner()
p = self.manager.progress
now = monotonic()
if is_complete:
self.cmd.repeat('', self.screen_size.width)
else:
af = self.manager.last_progress_file
if af is None or af.file_id in self.done_file_ids:
if self.manager.has_rsync and not self.manager.has_transmitting:
self.print(sc, 'Transferring rsync signatures...', end='')
else:
self.print(sc, 'Transferring metadata...', end='')
else:
self.draw_progress_for_current_file(af, spinner_char=sc)
self.print()
if p.total_reported_progress > 0:
self.render_progress(
'Total', spinner_char=sc,
bytes_so_far=p.total_reported_progress, total_bytes=p.total_bytes_to_transfer,
secs_so_far=now - p.started_at, is_complete=is_complete,
bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval)
)
else:
self.print('File data transfer has not yet started', end='')
self.print()
self.schedule_progress_update(self.spinner.interval)
self.progress_drawn = True
def on_resize(self, screen_size: ScreenSize) -> None:
super().on_resize(screen_size)
if self.progress_drawn:
self.refresh_progress()
def refresh_progress(self) -> None:
if not self.transmit_started:
return
self.erase_progress()
self.draw_progress()
def draw_progress_for_current_file(self, af: File, spinner_char: str = ' ', is_complete: bool = False) -> None:
p = self.manager.progress
now = monotonic()
self.render_progress(
af.display_name, spinner_char=spinner_char, is_complete=is_complete,
bytes_so_far=af.reported_progress, total_bytes=af.bytes_to_transmit,
secs_so_far=(af.done_at or now) - af.transmit_started_at,
bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval)
)
def send_main(cli_opts: TransferCLIOptions, args: List[str]) -> None:
print('Scanning files…')
files = files_for_send(cli_opts, args)
print(f'Found {len(files)} files and directories, requesting transfer permission…')
loop = Loop()
handler = Send(cli_opts, files)
loop.loop(handler)
p = handler.manager.progress
if handler.manager.has_rsync and p.total_transferred + p.signature_bytes:
tsf = 0
for f in files:
if f.ttype is TransmissionType.rsync:
tsf += f.file_size
if tsf:
print_rsync_stats(tsf, p.total_transferred, p.signature_bytes)
if handler.failed_files:
print(f'Transfer of {len(handler.failed_files)} out of {len(handler.manager.files)} files failed')
for ff in handler.failed_files:
print(styled(ff.display_name, fg='red'))
print(' ', ff.err_msg)
raise SystemExit(loop.return_code)