#!/usr/bin/env python # License: GPLv3 Copyright: 2021, Kovid Goyal import os import stat from asyncio import TimerHandle from collections import deque from enum import auto from itertools import count from time import monotonic from typing import IO, Callable, Deque, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Tuple, Union from kitty.cli_stub import TransferCLIOptions from kitty.fast_data_types import FILE_TRANSFER_CODE, wcswidth from kitty.file_transmission import Action, Compression, FileTransmissionCommand, FileType, NameReprEnum, TransmissionType, encode_bypass, split_for_transfer from kitty.typing import KeyEventType, ScreenSize from kitty.utils import sanitize_control_codes from ..tui.handler import Handler from ..tui.loop import Loop, debug from ..tui.operations import styled, without_line_wrap from ..tui.spinners import Spinner from ..tui.utils import human_size from .librsync import LoadSignature, delta_for_file from .utils import ( IdentityCompressor, ZlibCompressor, abspath, expand_home, home_path, print_rsync_stats, random_id, render_progress_in_width, safe_divide, should_be_compressed, ) debug def get_remote_path(local_path: str, remote_base: str) -> str: if not remote_base: return local_path.replace(os.sep, '/') if remote_base.endswith('/'): return os.path.join(remote_base, os.path.basename(local_path)) return remote_base class FileState(NameReprEnum): waiting_for_start = auto() waiting_for_data = auto() transmitting = auto() finished = auto() acknowledged = auto() class File: def __init__( self, local_path: str, expanded_local_path: str, file_id: int, stat_result: os.stat_result, remote_base: str, file_type: FileType, ) -> None: self.state = FileState.waiting_for_start self.local_path = local_path self.display_name = sanitize_control_codes(local_path) self.expanded_local_path = expanded_local_path self.permissions = stat.S_IMODE(stat_result.st_mode) self.mtime = stat_result.st_mtime_ns self.file_size = self.bytes_to_transmit = stat_result.st_size self.file_hash = stat_result.st_dev, stat_result.st_ino self.remote_path = get_remote_path(self.local_path, remote_base) self.remote_path = self.remote_path.replace(os.sep, '/') self.file_id = hex(file_id)[2:] self.hard_link_target = '' self.symbolic_link_target = '' self.stat_result = stat_result self.file_type = file_type self.rsync_capable = self.file_type is FileType.regular and self.file_size > 4096 self.compression_capable = self.file_type is FileType.regular and self.file_size > 4096 and should_be_compressed(self.expanded_local_path) self.remote_final_path = '' self.remote_initial_size = -1 self.err_msg = '' self.actual_file: Optional[IO[bytes]] = None self.transmitted_bytes = 0 self.reported_progress = 0 self.transmit_started_at = self.transmit_ended_at = self.done_at = 0. self.signature_loader: Optional[LoadSignature] = None self.delta_loader: Optional[Iterator[memoryview]] = None def start_delta_calculation(self) -> None: sl = self.signature_loader assert sl is not None self.state = FileState.transmitting self.delta_loader = delta_for_file(self.expanded_local_path, sl.signature) def __repr__(self) -> str: return f'File(name={self.display_name}, ft={self.file_type}, state={self.state})' def next_chunk(self, sz: int = 1024 * 1024) -> Tuple[bytes, int]: if self.file_type is FileType.symlink: self.state = FileState.finished ans = self.symbolic_link_target.encode('utf-8') return ans, len(ans) if self.file_type is FileType.link: self.state = FileState.finished ans = self.hard_link_target.encode('utf-8') return ans, len(ans) is_last = False if self.delta_loader is not None: try: chunk: Union[bytes, memoryview] = next(self.delta_loader) except StopIteration: is_last = True self.delta_loader = None chunk = b'' else: if self.actual_file is None: self.actual_file = open(self.expanded_local_path, 'rb') chunk = self.actual_file.read(sz) is_last = not chunk or self.actual_file.tell() >= self.file_size uncompressed_sz = len(chunk) cchunk = self.compressor.compress(chunk) if is_last and not isinstance(self.compressor, IdentityCompressor): cchunk += self.compressor.flush() if is_last: self.state = FileState.finished if self.actual_file is not None: self.actual_file.close() self.actual_file = None return cchunk, uncompressed_sz def metadata_command(self, use_rsync: bool = False) -> FileTransmissionCommand: self.ttype = TransmissionType.rsync if self.rsync_capable and use_rsync else TransmissionType.simple self.compression = Compression.zlib if self.compression_capable else Compression.none self.compressor: Union[ZlibCompressor, IdentityCompressor] = ZlibCompressor() if self.compression is Compression.zlib else IdentityCompressor() return FileTransmissionCommand( action=Action.file, compression=self.compression, ftype=self.file_type, name=self.remote_path, permissions=self.permissions, mtime=self.mtime, file_id=self.file_id, ttype=self.ttype ) def process(cli_opts: TransferCLIOptions, paths: Iterable[str], remote_base: str, counter: Iterator[int]) -> Iterator[File]: for x in paths: expanded = expand_home(x) try: s = os.stat(expanded, follow_symlinks=False) except OSError as err: raise SystemExit(f'Failed to stat {x} with error: {err}') from err if stat.S_ISDIR(s.st_mode): yield File(x, expanded, next(counter), s, remote_base, FileType.directory) new_remote_base = remote_base if new_remote_base: new_remote_base = new_remote_base.rstrip('/') + '/' + os.path.basename(x) + '/' else: new_remote_base = x.replace(os.sep, '/').rstrip('/') + '/' yield from process(cli_opts, [os.path.join(x, y) for y in os.listdir(expanded)], new_remote_base, counter) elif stat.S_ISLNK(s.st_mode): yield File(x, expanded, next(counter), s, remote_base, FileType.symlink) elif stat.S_ISREG(s.st_mode): yield File(x, expanded, next(counter), s, remote_base, FileType.regular) def process_mirrored_files(cli_opts: TransferCLIOptions, args: Sequence[str]) -> Iterator[File]: paths = [abspath(x) for x in args] try: common_path = os.path.commonpath(paths) except ValueError: common_path = '' home = home_path().rstrip(os.sep) if common_path and common_path.startswith(home + os.sep): paths = [os.path.join('~', os.path.relpath(x, home)) for x in paths] yield from process(cli_opts, paths, '', count(1)) def process_normal_files(cli_opts: TransferCLIOptions, args: Sequence[str]) -> Iterator[File]: if len(args) < 2: raise SystemExit('Must specify at least one local path and one remote path') args = list(args) remote_base = args.pop().replace(os.sep, '/') if len(args) > 1 and not remote_base.endswith('/'): remote_base += '/' paths = [abspath(x) for x in args] yield from process(cli_opts, paths, remote_base, count(1)) def files_for_send(cli_opts: TransferCLIOptions, args: List[str]) -> Tuple[File, ...]: if cli_opts.mode == 'mirror': files = list(process_mirrored_files(cli_opts, args)) else: files = list(process_normal_files(cli_opts, args)) groups: Dict[Tuple[int, int], List[File]] = {} # detect hard links for f in files: groups.setdefault(f.file_hash, []).append(f) for group in groups.values(): if len(group) > 1: for lf in group[1:]: lf.file_type = FileType.link lf.hard_link_target = group[0].file_id # detect symlinks to other transferred files for f in tuple(files): if f.file_type is FileType.symlink: try: link_dest = os.readlink(f.local_path) except OSError: files.remove(f) continue f.symbolic_link_target = f'path:{link_dest}' is_abs = os.path.isabs(link_dest) q = link_dest if is_abs else os.path.join(os.path.dirname(f.local_path), link_dest) try: st = os.stat(q) except OSError: pass else: fh = st.st_dev, st.st_ino if fh in groups: g = tuple(x for x in groups[fh] if os.path.samestat(st, x.stat_result)) if g: t = g[0] prefix = 'fid_abs' if is_abs else 'fid' f.symbolic_link_target = f'{prefix}:{t.file_id}' return tuple(files) class SendState(NameReprEnum): waiting_for_permission = auto() permission_granted = auto() permission_denied = auto() canceled = auto() class Transfer: def __init__(self, amt: int = 0): self.amt = amt self.at = monotonic() def is_too_old(self, now: float) -> bool: return now - self.at > 30 class ProgressTracker: def __init__(self, total_size_of_all_files: int): self.total_size_of_all_files = total_size_of_all_files self.total_bytes_to_transfer = total_size_of_all_files self.active_file: Optional[File] = None self.total_transferred = 0 self.transfers: Deque[Transfer] = deque() self.transfered_stats_amt = 0 self.transfered_stats_interval = 0. self.started_at = 0. self.signature_bytes = 0 self.total_reported_progress = 0 def change_active_file(self, nf: File) -> None: now = monotonic() self.active_file = nf nf.transmit_started_at = now def start_transfer(self) -> None: self.transfers.append(Transfer()) self.started_at = monotonic() def on_transmit(self, amt: int) -> None: if self.active_file is not None: self.active_file.transmitted_bytes += amt self.total_transferred += amt self.transfers.append(Transfer(amt)) now = self.transfers[-1].at while len(self.transfers) > 2 and self.transfers[0].is_too_old(now): self.transfers.popleft() self.transfered_stats_interval = now - self.transfers[0].at self.transfered_stats_amt = sum(t.amt for t in self.transfers) def on_file_progress(self, af: File, delta: int) -> None: if delta > 0: self.total_reported_progress += delta def on_file_done(self, af: File) -> None: af.done_at = monotonic() class SendManager: def __init__( self, request_id: str, files: Tuple[File, ...], bypass: Optional[str] = None, use_rsync: bool = False, file_progress: Callable[[File, int], None] = lambda f, i: None, file_done: Callable[[File], None] = lambda f: None, ): self.use_rsync = use_rsync self.files = files self.bypass = encode_bypass(request_id, bypass) if bypass else '' self.fid_map = {f.file_id: f for f in self.files} self.request_id = request_id self.state = SendState.waiting_for_permission self.all_acknowledged = self.all_started = self.has_transmitting = self.has_rsync = False self.active_idx: Optional[int] = None self.current_chunk_uncompressed_sz: Optional[int] = None self.prefix = f'\x1b]{FILE_TRANSFER_CODE};id={self.request_id};' self.suffix = '\x1b\\' self.progress = ProgressTracker(sum(df.file_size for df in self.files if df.file_size >= 0)) self.file_done = file_done self.file_progress = file_progress self.last_progress_file: Optional[File] = None @property def active_file(self) -> Optional[File]: if self.active_idx is not None: ans = self.files[self.active_idx] if ans.state is FileState.transmitting: return ans return None def activate_next_ready_file(self) -> Optional[File]: if self.active_idx is not None: paf = self.files[self.active_idx] paf.transmit_ended_at = monotonic() for i, f in enumerate(self.files): if f.state is FileState.transmitting: self.active_idx = i self.update_collective_statuses() self.progress.change_active_file(f) return f self.active_idx = None self.update_collective_statuses() return None def update_collective_statuses(self) -> None: found_not_started = found_not_done = False has_rsync = has_transmitting = False for f in self.files: if f.state is not FileState.acknowledged: found_not_done = True if f.state is FileState.waiting_for_start: found_not_started = True elif f.state is FileState.transmitting: has_transmitting = True if f.ttype is TransmissionType.rsync: has_rsync = True self.all_acknowledged = not found_not_done self.all_started = not found_not_started self.has_rsync = has_rsync self.has_transmitting = has_transmitting def start_transfer(self) -> str: return FileTransmissionCommand(action=Action.send, bypass=self.bypass).serialize() def next_chunks(self) -> Iterator[str]: if self.active_file is None: self.activate_next_ready_file() af = self.active_file if af is None: return chunk = b'' self.current_chunk_uncompressed_sz = 0 while af.state is not FileState.finished and not chunk: chunk, usz = af.next_chunk() self.current_chunk_uncompressed_sz += usz is_last = af.state is FileState.finished if len(chunk): for ftc in split_for_transfer(chunk, file_id=af.file_id, mark_last=is_last): yield ftc.serialize() elif is_last: yield FileTransmissionCommand(action=Action.end_data, file_id=af.file_id, data=b'').serialize() def send_file_metadata(self) -> Iterator[str]: for f in self.files: yield f.metadata_command(self.use_rsync).serialize() def on_file_status_update(self, ftc: FileTransmissionCommand) -> None: file = self.fid_map.get(ftc.file_id) if file is None: return if ftc.status == 'STARTED': file.remote_final_path = ftc.name file.remote_initial_size = ftc.size if file.file_type is FileType.directory: file.state = FileState.finished else: file.state = FileState.waiting_for_data if ftc.ttype is TransmissionType.rsync else FileState.transmitting if file.state is FileState.waiting_for_data: file.signature_loader = LoadSignature() self.update_collective_statuses() elif ftc.status == 'PROGRESS': self.last_progress_file = file change = ftc.size - file.reported_progress file.reported_progress = ftc.size self.progress.on_file_progress(file, change) self.file_progress(file, change) else: if ftc.name and not file.remote_final_path: file.remote_final_path = ftc.name file.state = FileState.acknowledged if ftc.status == 'OK': if ftc.size > 0: change = ftc.size - file.reported_progress file.reported_progress = ftc.size self.progress.on_file_progress(file, change) self.file_progress(file, change) else: file.err_msg = ftc.status self.progress.on_file_done(file) self.file_done(file) if self.active_idx is not None and file is self.files[self.active_idx]: self.active_idx = None self.update_collective_statuses() def on_signature_data_received(self, ftc: FileTransmissionCommand) -> None: file = self.fid_map.get(ftc.file_id) if file is None or file.state is not FileState.waiting_for_data: return sl = file.signature_loader assert sl is not None sl.add_chunk(ftc.data) self.progress.signature_bytes += len(ftc.data) if ftc.action is Action.end_data: sl.commit() file.start_delta_calculation() self.update_collective_statuses() def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None: if ftc.action is Action.status: if ftc.file_id: self.on_file_status_update(ftc) else: self.state = SendState.permission_granted if ftc.status == 'OK' else SendState.permission_denied elif ftc.action in (Action.data, Action.end_data): if ftc.file_id: self.on_signature_data_received(ftc) class Send(Handler): use_alternate_screen = False def __init__(self, cli_opts: TransferCLIOptions, files: Tuple[File, ...]): Handler.__init__(self) self.manager = SendManager( random_id(), files, cli_opts.permissions_bypass, cli_opts.transmit_deltas, self.on_file_progress, self.on_file_done) self.cli_opts = cli_opts self.transmit_started = False self.file_metadata_sent = False self.quit_after_write_code: Optional[int] = None self.check_paths_printed = False names = tuple(x.display_name for x in self.manager.files) self.max_name_length = max(6, max(map(wcswidth, names))) self.spinner = Spinner() self.progress_drawn = True self.done_files: List[File] = [] self.done_file_ids: Set[str] = set() self.failed_files: List[File] = [] self.transmit_ok_checked = False self.progress_update_call: Optional[TimerHandle] = None def send_payload(self, payload: str) -> None: self.write(self.manager.prefix) self.write(payload) self.write(self.manager.suffix) def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None: if ftc.id != self.manager.request_id: return if ftc.status == 'CANCELED' and ftc.action is Action.status: self.quit_loop(1) return if self.quit_after_write_code is not None or self.manager.state is SendState.canceled: return before = self.manager.state self.manager.on_file_transfer_response(ftc) if before == SendState.waiting_for_permission: if self.manager.state == SendState.permission_denied: self.cmd.styled('Permission denied for this transfer', fg='red') self.print() self.quit_loop(1) return if self.manager.state == SendState.permission_granted: self.cmd.styled('Permission granted for this transfer', fg='green') self.print() self.send_file_metadata() self.asyncio_loop.call_soon(self.loop_tick) def start_transfer(self) -> None: if self.manager.active_file is None: self.manager.activate_next_ready_file() if self.manager.active_file is not None: self.transmit_started = True self.manager.progress.start_transfer() self.transmit_next_chunk() self.draw_progress() def print_check_paths(self) -> None: if self.check_paths_printed: return self.check_paths_printed = True self.print('The following file transfers will be performed. A red destination means an existing file will be overwritten.') for df in self.manager.files: self.cmd.styled(df.file_type.short_text, fg=df.file_type.color) self.print(end=' ') self.print(df.display_name, '→', end=' ') self.cmd.styled(df.remote_final_path, fg='red' if df.remote_initial_size > -1 else None) self.print() self.print(f'Transferring {len(self.manager.files)} files of total size: {human_size(self.manager.progress.total_bytes_to_transfer)}') self.print() self.print_continue_msg() def print_continue_msg(self) -> None: self.print( 'Press', styled('y', fg='green', bold=True, fg_intense=True), 'to continue or', styled('n', fg='red', bold=True, fg_intense=True), 'to abort') def on_text(self, text: str, in_bracketed_paste: bool = False) -> None: if self.quit_after_write_code is not None: return if self.check_paths_printed and not self.transmit_started: if text.lower() == 'y': self.start_transfer() if self.manager.all_acknowledged: self.refresh_progress() self.transfer_finished() return if text.lower() == 'n': del self.failed_files[:] self.abort_transfer() self.print('Sending cancel request to terminal') return self.print_continue_msg() def on_key(self, key_event: KeyEventType) -> None: if self.quit_after_write_code is not None: return if key_event.matches('esc'): if self.check_paths_printed and not self.transmit_started: del self.failed_files[:] self.abort_transfer() self.print('Sending cancel request to terminal') else: self.on_interrupt() def check_for_transmit_ok(self) -> None: if self.transmit_ok_checked: return self.start_transfer() if self.manager.state is not SendState.permission_granted: return if self.cli_opts.confirm_paths: if self.manager.all_started: self.print_check_paths() return self.transmit_ok_checked = True self.start_transfer() def transmit_next_chunk(self) -> None: found_chunk = False for chunk in self.manager.next_chunks(): self.send_payload(chunk) found_chunk = True if not found_chunk: if self.manager.all_acknowledged: self.transfer_finished() def transfer_finished(self) -> None: self.send_payload(FileTransmissionCommand(action=Action.finish).serialize()) self.quit_after_write_code = 1 if self.failed_files else 0 def on_writing_finished(self) -> None: chunk_transmitted = self.manager.current_chunk_uncompressed_sz is not None if chunk_transmitted: self.manager.progress.on_transmit(self.manager.current_chunk_uncompressed_sz or 0) self.manager.current_chunk_uncompressed_sz = None if self.quit_after_write_code is not None: self.quit_loop(self.quit_after_write_code) return if self.manager.state is SendState.permission_granted and (not self.transmit_started or chunk_transmitted): self.asyncio_loop.call_soon(self.loop_tick) def loop_tick(self) -> None: if self.manager.state is SendState.waiting_for_permission: return if self.transmit_started: self.transmit_next_chunk() self.refresh_progress() else: self.check_for_transmit_ok() def initialize(self) -> None: self.send_payload(self.manager.start_transfer()) if self.cli_opts.permissions_bypass: # dont wait for permission, not needed with a bypass and # avoids a roundtrip self.send_file_metadata() self.cmd.set_cursor_visible(False) def finalize(self) -> None: self.cmd.set_cursor_visible(True) def send_file_metadata(self) -> None: if not self.file_metadata_sent: for payload in self.manager.send_file_metadata(): self.send_payload(payload) self.file_metadata_sent = True def on_term(self) -> None: if self.quit_after_write_code is not None: return self.cmd.styled('Terminate requested, cancelling transfer, transferred files are in undefined state', fg='red') self.print() self.abort_transfer(delay=2) def on_interrupt(self) -> None: if self.quit_after_write_code is not None: return if self.manager.state is SendState.canceled: self.print('Waiting for canceled acknowledgement from terminal, will abort in a few seconds if no response received') return self.cmd.styled('Interrupt requested, cancelling transfer, transferred files are in undefined state', fg='red') self.print() self.abort_transfer() def abort_transfer(self, delay: float = 5) -> None: self.send_payload(FileTransmissionCommand(action=Action.cancel).serialize()) self.manager.state = SendState.canceled self.asyncio_loop.call_later(delay, self.quit_loop, 1) def render_progress( self, name: str, spinner_char: str = ' ', bytes_so_far: int = 0, total_bytes: int = 0, secs_so_far: float = 0., bytes_per_sec: float = 0., is_complete: bool = False ) -> None: if is_complete: bytes_so_far = total_bytes self.write(render_progress_in_width( name, width=self.screen_size.cols, max_path_length=self.max_name_length, spinner_char=spinner_char, bytes_so_far=bytes_so_far, total_bytes=total_bytes, secs_so_far=secs_so_far, bytes_per_sec=bytes_per_sec, is_complete=is_complete )) def erase_progress(self) -> None: if self.progress_drawn: self.cmd.move_cursor_by(2, 'up') self.write('\r') self.cmd.clear_to_end_of_screen() self.progress_drawn = False def schedule_progress_update(self, delay: float = 0.1) -> None: if self.progress_update_call is None: self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress) elif self.asyncio_loop.time() + delay < self.progress_update_call.when(): self.progress_update_call.cancel() self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress) def on_file_progress(self, file: File, change: int) -> None: self.schedule_progress_update() def on_file_done(self, file: File) -> None: self.done_files.append(file) if file.err_msg: self.failed_files.append(file) self.schedule_progress_update() @Handler.atomic_update def draw_progress(self) -> None: with without_line_wrap(self.write): for df in self.done_files: sc = styled('✔', fg='green') if not df.err_msg else styled('✘', fg='red') if df.file_type is FileType.regular: self.draw_progress_for_current_file(df, spinner_char=sc, is_complete=True) else: self.write(f'{sc} {df.display_name} {styled(df.file_type.name, dim=True, italic=True)}') self.print() self.done_file_ids.add(df.file_id) del self.done_files[:] is_complete = self.quit_after_write_code is not None if is_complete: sc = styled('✔', fg='green') if self.quit_after_write_code == 0 else styled('✘', fg='red') else: sc = self.spinner() p = self.manager.progress now = monotonic() if is_complete: self.cmd.repeat('─', self.screen_size.width) else: af = self.manager.last_progress_file if af is None or af.file_id in self.done_file_ids: if self.manager.has_rsync and not self.manager.has_transmitting: self.print(sc, 'Transferring rsync signatures...', end='') else: self.print(sc, 'Transferring metadata...', end='') else: self.draw_progress_for_current_file(af, spinner_char=sc) self.print() if p.total_reported_progress > 0: self.render_progress( 'Total', spinner_char=sc, bytes_so_far=p.total_reported_progress, total_bytes=p.total_bytes_to_transfer, secs_so_far=now - p.started_at, is_complete=is_complete, bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval) ) else: self.print('File data transfer has not yet started', end='') self.print() self.schedule_progress_update(self.spinner.interval) self.progress_drawn = True def on_resize(self, screen_size: ScreenSize) -> None: super().on_resize(screen_size) if self.progress_drawn: self.refresh_progress() def refresh_progress(self) -> None: if not self.transmit_started: return self.erase_progress() self.draw_progress() def draw_progress_for_current_file(self, af: File, spinner_char: str = ' ', is_complete: bool = False) -> None: p = self.manager.progress now = monotonic() self.render_progress( af.display_name, spinner_char=spinner_char, is_complete=is_complete, bytes_so_far=af.reported_progress, total_bytes=af.bytes_to_transmit, secs_so_far=(af.done_at or now) - af.transmit_started_at, bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval) ) def send_main(cli_opts: TransferCLIOptions, args: List[str]) -> None: print('Scanning files…') files = files_for_send(cli_opts, args) print(f'Found {len(files)} files and directories, requesting transfer permission…') loop = Loop() handler = Send(cli_opts, files) loop.loop(handler) p = handler.manager.progress if handler.manager.has_rsync and p.total_transferred + p.signature_bytes: tsf = 0 for f in files: if f.ttype is TransmissionType.rsync: tsf += f.file_size if tsf: print_rsync_stats(tsf, p.total_transferred, p.signature_bytes) if handler.failed_files: print(f'Transfer of {len(handler.failed_files)} out of {len(handler.manager.files)} files failed') for ff in handler.failed_files: print(styled(ff.display_name, fg='red')) print(' ', ff.err_msg) raise SystemExit(loop.return_code)