More work on the transfer kitten
This commit is contained in:
parent
f277cbf3f3
commit
33a6abfc07
@ -5,18 +5,23 @@
|
|||||||
import os
|
import os
|
||||||
import stat
|
import stat
|
||||||
import sys
|
import sys
|
||||||
|
from collections import deque
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
from datetime import timedelta
|
||||||
from enum import auto
|
from enum import auto
|
||||||
from itertools import count
|
from itertools import count
|
||||||
from mimetypes import guess_type
|
from mimetypes import guess_type
|
||||||
|
from time import monotonic
|
||||||
from typing import (
|
from typing import (
|
||||||
IO, Callable, Dict, Generator, Iterable, Iterator, List, Optional,
|
IO, Callable, Deque, Dict, Generator, Iterable, Iterator, List, Optional,
|
||||||
Sequence, Tuple, Union, cast
|
Sequence, Tuple, Union, cast
|
||||||
)
|
)
|
||||||
|
|
||||||
from kitty.cli import parse_args
|
from kitty.cli import parse_args
|
||||||
from kitty.cli_stub import TransferCLIOptions
|
from kitty.cli_stub import TransferCLIOptions
|
||||||
from kitty.fast_data_types import FILE_TRANSFER_CODE
|
from kitty.fast_data_types import (
|
||||||
|
FILE_TRANSFER_CODE, truncate_point_for_length, wcswidth
|
||||||
|
)
|
||||||
from kitty.file_transmission import (
|
from kitty.file_transmission import (
|
||||||
Action, Compression, FileTransmissionCommand, FileType, NameReprEnum,
|
Action, Compression, FileTransmissionCommand, FileType, NameReprEnum,
|
||||||
TransmissionType, encode_password
|
TransmissionType, encode_password
|
||||||
@ -26,13 +31,106 @@ from kitty.typing import KeyEventType
|
|||||||
|
|
||||||
from ..tui.handler import Handler
|
from ..tui.handler import Handler
|
||||||
from ..tui.loop import Loop, debug
|
from ..tui.loop import Loop, debug
|
||||||
from ..tui.operations import styled
|
from ..tui.operations import styled, without_line_wrap
|
||||||
from ..tui.utils import human_size
|
from ..tui.progress import render_progress_bar
|
||||||
|
from ..tui.spinners import Spinner
|
||||||
|
from ..tui.utils import format_number, human_size
|
||||||
|
|
||||||
_cwd = _home = ''
|
_cwd = _home = ''
|
||||||
debug
|
debug
|
||||||
|
|
||||||
|
|
||||||
|
def reduce_to_single_grapheme(text: str) -> str:
|
||||||
|
x = 1
|
||||||
|
while True:
|
||||||
|
pos = truncate_point_for_length(text, x)
|
||||||
|
if pos > 0:
|
||||||
|
return text[:pos]
|
||||||
|
pos += 1
|
||||||
|
|
||||||
|
|
||||||
|
def render_path_in_width(path: str, width: int) -> str:
|
||||||
|
if os.altsep:
|
||||||
|
path = path.replace(os.altsep, os.sep)
|
||||||
|
if wcswidth(path) <= width:
|
||||||
|
return path
|
||||||
|
parts = path.split(os.sep)
|
||||||
|
reduced = os.sep.join(map(reduce_to_single_grapheme, parts[:-1]))
|
||||||
|
path = os.path.join(reduced, parts[-1])
|
||||||
|
if wcswidth(path) <= width:
|
||||||
|
return path
|
||||||
|
x = truncate_point_for_length(path, width - 1)
|
||||||
|
return path[:x] + '…'
|
||||||
|
|
||||||
|
|
||||||
|
def render_seconds(val: float) -> str:
|
||||||
|
ans = str(timedelta(seconds=int(val)))
|
||||||
|
if ',' in ans:
|
||||||
|
days = int(ans.split(' ')[0])
|
||||||
|
if days > 99:
|
||||||
|
ans = '∞'
|
||||||
|
else:
|
||||||
|
ans = f'>{days} days'
|
||||||
|
elif len(ans) == 7:
|
||||||
|
ans = '0' + ans
|
||||||
|
return ans.rjust(8)
|
||||||
|
|
||||||
|
|
||||||
|
def ljust(text: str, width: int) -> str:
|
||||||
|
w = wcswidth(text)
|
||||||
|
if w < width:
|
||||||
|
text += ' ' * (width - w)
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
|
def rjust(text: str, width: int) -> str:
|
||||||
|
w = wcswidth(text)
|
||||||
|
if w < width:
|
||||||
|
text = ' ' * (width - w) + text
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
|
def render_progress_in_width(
|
||||||
|
path: str,
|
||||||
|
max_path_length: int = 80,
|
||||||
|
spinner_char: str = '⠋',
|
||||||
|
bytes_per_sec: float = 1024,
|
||||||
|
secs_so_far: float = 100.,
|
||||||
|
bytes_so_far: int = 33070,
|
||||||
|
total_bytes: int = 50000,
|
||||||
|
width: int = 80
|
||||||
|
) -> str:
|
||||||
|
unit_style = styled('|', dim=True)
|
||||||
|
sep, trail = unit_style.split('|')
|
||||||
|
if bytes_so_far >= total_bytes:
|
||||||
|
ratio = human_size(total_bytes, sep=sep)
|
||||||
|
rate = human_size(int(total_bytes / secs_so_far), sep=sep) + '/s'
|
||||||
|
eta = render_seconds(secs_so_far)
|
||||||
|
else:
|
||||||
|
tb = human_size(total_bytes, sep=' ', max_num_of_decimals=1)
|
||||||
|
val = float(tb.split(' ', 1)[0])
|
||||||
|
ratio = format_number(val * bytes_so_far / total_bytes, max_num_of_decimals=1) + '/' + tb.replace(' ', sep)
|
||||||
|
rate = human_size(int(bytes_per_sec), sep=sep) + '/s'
|
||||||
|
bytes_left = total_bytes - bytes_so_far
|
||||||
|
eta_seconds = bytes_left / bytes_per_sec
|
||||||
|
eta = render_seconds(eta_seconds)
|
||||||
|
lft = f'{spinner_char} '
|
||||||
|
max_space_for_path = width // 2 - wcswidth(lft)
|
||||||
|
w = min(max_path_length, max_space_for_path)
|
||||||
|
p = lft + render_path_in_width(path, w)
|
||||||
|
w += wcswidth(lft)
|
||||||
|
p = ljust(p, w)
|
||||||
|
q = f'{ratio}{trail}{styled(" @ ", fg="yellow")}{rate}{trail}'
|
||||||
|
q = rjust(q, 25) + ' '
|
||||||
|
eta = ' ' + eta
|
||||||
|
extra = width - w - wcswidth(q) - wcswidth(eta)
|
||||||
|
if extra > 4:
|
||||||
|
q += render_progress_bar(bytes_so_far / total_bytes, extra) + eta
|
||||||
|
else:
|
||||||
|
q += eta.strip()
|
||||||
|
return p + q
|
||||||
|
|
||||||
|
|
||||||
def should_be_compressed(path: str) -> bool:
|
def should_be_compressed(path: str) -> bool:
|
||||||
ext = path.rpartition(os.extsep)[-1].lower()
|
ext = path.rpartition(os.extsep)[-1].lower()
|
||||||
if ext in ('zip', 'odt', 'odp', 'pptx', 'docx', 'gz', 'bz2', 'xz', 'svgz'):
|
if ext in ('zip', 'odt', 'odp', 'pptx', 'docx', 'gz', 'bz2', 'xz', 'svgz'):
|
||||||
@ -186,6 +284,7 @@ class File:
|
|||||||
self.err_msg = ''
|
self.err_msg = ''
|
||||||
self.actual_file: Optional[IO[bytes]] = None
|
self.actual_file: Optional[IO[bytes]] = None
|
||||||
self.transmitted_bytes = 0
|
self.transmitted_bytes = 0
|
||||||
|
self.transmit_started_at = self.transmit_ended_at = 0.
|
||||||
|
|
||||||
def next_chunk(self, sz: int = 1024 * 1024) -> Tuple[bytes, int]:
|
def next_chunk(self, sz: int = 1024 * 1024) -> Tuple[bytes, int]:
|
||||||
if self.file_type is FileType.symlink:
|
if self.file_type is FileType.symlink:
|
||||||
@ -310,9 +409,54 @@ class SendState(NameReprEnum):
|
|||||||
canceled = auto()
|
canceled = auto()
|
||||||
|
|
||||||
|
|
||||||
|
class Transfer:
|
||||||
|
|
||||||
|
def __init__(self, amt: int = 0):
|
||||||
|
self.amt = amt
|
||||||
|
self.at = monotonic()
|
||||||
|
|
||||||
|
def is_too_old(self, now: float) -> bool:
|
||||||
|
return now - self.at > 30
|
||||||
|
|
||||||
|
|
||||||
|
class ProgressTracker:
|
||||||
|
|
||||||
|
def __init__(self, total_size_of_all_files: int):
|
||||||
|
self.total_size_of_all_files = total_size_of_all_files
|
||||||
|
self.total_bytes_to_transfer = total_size_of_all_files
|
||||||
|
self.active_file: Optional[File] = None
|
||||||
|
self.total_transferred = 0
|
||||||
|
self.transfers: Deque[Transfer] = deque()
|
||||||
|
self.transfered_stats_amt = 0
|
||||||
|
self.transfered_stats_interval = 0.
|
||||||
|
self.started_at = 0.
|
||||||
|
|
||||||
|
def change_active_file(self, nf: File) -> None:
|
||||||
|
now = monotonic()
|
||||||
|
if self.active_file is not None:
|
||||||
|
self.active_file.transmit_ended_at = now
|
||||||
|
self.active_file = nf
|
||||||
|
nf.transmit_started_at = now
|
||||||
|
|
||||||
|
def start_transfer(self) -> None:
|
||||||
|
self.transfers.append(Transfer())
|
||||||
|
self.started_at = monotonic()
|
||||||
|
|
||||||
|
def on_transfer(self, amt: int) -> None:
|
||||||
|
if self.active_file is not None:
|
||||||
|
self.active_file.transmitted_bytes += amt
|
||||||
|
self.total_transferred += amt
|
||||||
|
self.transfers.append(Transfer(amt))
|
||||||
|
now = self.transfers[-1].at
|
||||||
|
while len(self.transfers) > 2 and self.transfers[0].is_too_old(now):
|
||||||
|
self.transfers.popleft()
|
||||||
|
self.transfered_stats_interval = now - self.transfers[0].at
|
||||||
|
self.transfered_stats_amt = sum(t.amt for t in self.transfers)
|
||||||
|
|
||||||
|
|
||||||
class SendManager:
|
class SendManager:
|
||||||
|
|
||||||
def __init__(self, request_id: str, files: Tuple[File, ...], pw: Optional[str] = None):
|
def __init__(self, request_id: str, files: Tuple[File, ...], pw: Optional[str] = None, file_done: Callable[[File], None] = lambda f: None):
|
||||||
self.files = files
|
self.files = files
|
||||||
self.password = encode_password(request_id, pw) if pw else ''
|
self.password = encode_password(request_id, pw) if pw else ''
|
||||||
self.fid_map = {f.file_id: f for f in self.files}
|
self.fid_map = {f.file_id: f for f in self.files}
|
||||||
@ -324,7 +468,8 @@ class SendManager:
|
|||||||
self.current_chunk_uncompressed_sz: Optional[int] = None
|
self.current_chunk_uncompressed_sz: Optional[int] = None
|
||||||
self.prefix = f'\x1b]{FILE_TRANSFER_CODE};id={self.request_id};'
|
self.prefix = f'\x1b]{FILE_TRANSFER_CODE};id={self.request_id};'
|
||||||
self.suffix = '\x1b\\'
|
self.suffix = '\x1b\\'
|
||||||
self.total_size_of_all_files = sum(df.file_size for df in self.files if df.file_size >= 0)
|
self.progress = ProgressTracker(sum(df.file_size for df in self.files if df.file_size >= 0))
|
||||||
|
self.file_done = file_done
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def active_file(self) -> Optional[File]:
|
def active_file(self) -> Optional[File]:
|
||||||
@ -334,10 +479,14 @@ class SendManager:
|
|||||||
return ans
|
return ans
|
||||||
|
|
||||||
def activate_next_ready_file(self) -> Optional[File]:
|
def activate_next_ready_file(self) -> Optional[File]:
|
||||||
|
af = self.active_file
|
||||||
|
if af is not None:
|
||||||
|
self.file_done(af)
|
||||||
for i, f in enumerate(self.files):
|
for i, f in enumerate(self.files):
|
||||||
if f.state is FileState.transmitting:
|
if f.state is FileState.transmitting:
|
||||||
self.active_idx = i
|
self.active_idx = i
|
||||||
self.update_collective_statuses()
|
self.update_collective_statuses()
|
||||||
|
self.progress.change_active_file(f)
|
||||||
return f
|
return f
|
||||||
self.active_idx = None
|
self.active_idx = None
|
||||||
self.update_collective_statuses()
|
self.update_collective_statuses()
|
||||||
@ -358,7 +507,7 @@ class SendManager:
|
|||||||
return FileTransmissionCommand(action=Action.send, password=self.password).serialize()
|
return FileTransmissionCommand(action=Action.send, password=self.password).serialize()
|
||||||
|
|
||||||
def next_chunks(self) -> Iterator[str]:
|
def next_chunks(self) -> Iterator[str]:
|
||||||
if self.active_file is None:
|
if self.active_file is None or self.active_file.state is FileState.finished:
|
||||||
self.activate_next_ready_file()
|
self.activate_next_ready_file()
|
||||||
af = self.active_file
|
af = self.active_file
|
||||||
if af is None:
|
if af is None:
|
||||||
@ -369,8 +518,6 @@ class SendManager:
|
|||||||
chunk, usz = af.next_chunk()
|
chunk, usz = af.next_chunk()
|
||||||
self.current_chunk_uncompressed_sz += usz
|
self.current_chunk_uncompressed_sz += usz
|
||||||
is_last = af.state is FileState.finished
|
is_last = af.state is FileState.finished
|
||||||
if is_last:
|
|
||||||
self.activate_next_ready_file()
|
|
||||||
mv = memoryview(chunk)
|
mv = memoryview(chunk)
|
||||||
pos = 0
|
pos = 0
|
||||||
limit = len(chunk)
|
limit = len(chunk)
|
||||||
@ -413,14 +560,18 @@ class SendManager:
|
|||||||
class Send(Handler):
|
class Send(Handler):
|
||||||
use_alternate_screen = False
|
use_alternate_screen = False
|
||||||
|
|
||||||
def __init__(self, cli_opts: TransferCLIOptions, manager: SendManager):
|
def __init__(self, cli_opts: TransferCLIOptions, files: Tuple[File, ...]):
|
||||||
Handler.__init__(self)
|
Handler.__init__(self)
|
||||||
self.manager = manager
|
self.manager = SendManager(random_id(), files, cli_opts.permissions_password, self.on_file_done)
|
||||||
self.cli_opts = cli_opts
|
self.cli_opts = cli_opts
|
||||||
self.transmit_started = False
|
self.transmit_started = False
|
||||||
self.file_metadata_sent = False
|
self.file_metadata_sent = False
|
||||||
self.quit_after_write_code: Optional[int] = None
|
self.quit_after_write_code: Optional[int] = None
|
||||||
self.check_paths_printed = False
|
self.check_paths_printed = False
|
||||||
|
names = tuple(x.local_path for x in self.manager.files)
|
||||||
|
self.max_name_length = max(6, max(map(wcswidth, names)))
|
||||||
|
self.spinner = Spinner()
|
||||||
|
self.progress_drawn = True
|
||||||
|
|
||||||
def send_payload(self, payload: str) -> None:
|
def send_payload(self, payload: str) -> None:
|
||||||
self.write(self.manager.prefix)
|
self.write(self.manager.prefix)
|
||||||
@ -456,7 +607,9 @@ class Send(Handler):
|
|||||||
self.manager.activate_next_ready_file()
|
self.manager.activate_next_ready_file()
|
||||||
if self.manager.active_file is not None:
|
if self.manager.active_file is not None:
|
||||||
self.transmit_started = True
|
self.transmit_started = True
|
||||||
|
self.manager.progress.start_transfer()
|
||||||
self.transmit_next_chunk()
|
self.transmit_next_chunk()
|
||||||
|
self.draw_progress()
|
||||||
|
|
||||||
def print_check_paths(self) -> None:
|
def print_check_paths(self) -> None:
|
||||||
if self.check_paths_printed:
|
if self.check_paths_printed:
|
||||||
@ -469,7 +622,7 @@ class Send(Handler):
|
|||||||
self.print(df.local_path, '→', end=' ')
|
self.print(df.local_path, '→', end=' ')
|
||||||
self.cmd.styled(df.remote_final_path, fg='red' if df.remote_initial_size > -1 else None)
|
self.cmd.styled(df.remote_final_path, fg='red' if df.remote_initial_size > -1 else None)
|
||||||
self.print()
|
self.print()
|
||||||
self.print(f'Transferring {len(self.manager.files)} files of total size: {human_size(self.manager.total_size_of_all_files)}')
|
self.print(f'Transferring {len(self.manager.files)} files of total size: {human_size(self.manager.progress.total_bytes_to_transfer)}')
|
||||||
self.print()
|
self.print()
|
||||||
self.print_continue_msg()
|
self.print_continue_msg()
|
||||||
|
|
||||||
@ -523,6 +676,7 @@ class Send(Handler):
|
|||||||
|
|
||||||
def on_writing_finished(self) -> None:
|
def on_writing_finished(self) -> None:
|
||||||
if self.manager.current_chunk_uncompressed_sz is not None:
|
if self.manager.current_chunk_uncompressed_sz is not None:
|
||||||
|
self.manager.progress.on_transfer(self.manager.current_chunk_uncompressed_sz)
|
||||||
self.manager.current_chunk_uncompressed_sz = None
|
self.manager.current_chunk_uncompressed_sz = None
|
||||||
if self.quit_after_write_code is not None:
|
if self.quit_after_write_code is not None:
|
||||||
self.quit_loop(self.quit_after_write_code)
|
self.quit_loop(self.quit_after_write_code)
|
||||||
@ -535,6 +689,7 @@ class Send(Handler):
|
|||||||
return
|
return
|
||||||
if self.transmit_started:
|
if self.transmit_started:
|
||||||
self.transmit_next_chunk()
|
self.transmit_next_chunk()
|
||||||
|
self.refresh_progress()
|
||||||
else:
|
else:
|
||||||
self.check_for_transmit_ok()
|
self.check_for_transmit_ok()
|
||||||
|
|
||||||
@ -577,13 +732,68 @@ class Send(Handler):
|
|||||||
self.manager.state = SendState.canceled
|
self.manager.state = SendState.canceled
|
||||||
self.asyncio_loop.call_later(delay, self.quit_loop, 1)
|
self.asyncio_loop.call_later(delay, self.quit_loop, 1)
|
||||||
|
|
||||||
|
def render_progress(
|
||||||
|
self, name: str, spinner_char: str = ' ', bytes_so_far: int = 0, total_bytes: int = 0,
|
||||||
|
secs_so_far: float = 0., bytes_per_sec: float = 0.
|
||||||
|
) -> None:
|
||||||
|
self.write(render_progress_in_width(
|
||||||
|
'Total', width=self.screen_size.cols, max_path_length=self.max_name_length, spinner_char=spinner_char,
|
||||||
|
bytes_so_far=bytes_so_far, total_bytes=total_bytes, secs_so_far=secs_so_far,
|
||||||
|
bytes_per_sec=bytes_per_sec
|
||||||
|
))
|
||||||
|
|
||||||
|
def erase_progress(self) -> None:
|
||||||
|
if self.progress_drawn:
|
||||||
|
self.cmd.move_cursor_by(2, 'up')
|
||||||
|
self.write('\r')
|
||||||
|
self.cmd.clear_to_end_of_screen()
|
||||||
|
self.progress_drawn = False
|
||||||
|
|
||||||
|
def on_file_done(self, file: File) -> None:
|
||||||
|
with self.pending_update(), without_line_wrap(self.write):
|
||||||
|
self.erase_progress()
|
||||||
|
self.draw_progress_for_current_file(file)
|
||||||
|
self.draw_progress()
|
||||||
|
|
||||||
|
def draw_progress(self) -> None:
|
||||||
|
with self.pending_update(), without_line_wrap(self.write):
|
||||||
|
sc = self.spinner()
|
||||||
|
p = self.manager.progress
|
||||||
|
af = self.manager.active_file
|
||||||
|
now = monotonic()
|
||||||
|
if af is not None:
|
||||||
|
self.draw_progress_for_current_file(af, spinner_char=sc)
|
||||||
|
self.print()
|
||||||
|
self.render_progress(
|
||||||
|
'Total', spinner_char=sc,
|
||||||
|
bytes_so_far=p.total_transferred, total_bytes=p.total_bytes_to_transfer, secs_so_far=now - p.started_at,
|
||||||
|
bytes_per_sec=p.transfered_stats_amt / p.transfered_stats_interval
|
||||||
|
)
|
||||||
|
self.print()
|
||||||
|
self.asyncio_loop.call_later(self.spinner.interval, self.refresh_progress)
|
||||||
|
self.progress_drawn = True
|
||||||
|
|
||||||
|
def refresh_progress(self) -> None:
|
||||||
|
self.erase_progress()
|
||||||
|
self.draw_progress()
|
||||||
|
|
||||||
|
def draw_progress_for_current_file(self, af: File, spinner_char: str = ' ') -> None:
|
||||||
|
p = self.manager.progress
|
||||||
|
now = monotonic()
|
||||||
|
self.render_progress(
|
||||||
|
af.local_path, spinner_char=spinner_char,
|
||||||
|
bytes_so_far=af.transmitted_bytes, total_bytes=af.bytes_to_transmit,
|
||||||
|
secs_so_far=(af.transmit_ended_at or now) - af.transmit_started_at,
|
||||||
|
bytes_per_sec=p.transfered_stats_amt / p.transfered_stats_interval
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def send_main(cli_opts: TransferCLIOptions, args: List[str]) -> None:
|
def send_main(cli_opts: TransferCLIOptions, args: List[str]) -> None:
|
||||||
print('Scanning files…')
|
print('Scanning files…')
|
||||||
files = files_for_send(cli_opts, args)
|
files = files_for_send(cli_opts, args)
|
||||||
print(f'Found {len(files)} files and directories, requesting transfer permission…')
|
print(f'Found {len(files)} files and directories, requesting transfer permission…')
|
||||||
loop = Loop()
|
loop = Loop()
|
||||||
handler = Send(cli_opts, SendManager(random_id(), files, cli_opts.permissions_password))
|
handler = Send(cli_opts, files)
|
||||||
loop.loop(handler)
|
loop.loop(handler)
|
||||||
raise SystemExit(loop.return_code)
|
raise SystemExit(loop.return_code)
|
||||||
|
|
||||||
|
|||||||
@ -105,6 +105,15 @@ def set_line_wrapping(yes_or_no: bool) -> str:
|
|||||||
return set_mode(Mode.DECAWM) if yes_or_no else reset_mode(Mode.DECAWM)
|
return set_mode(Mode.DECAWM) if yes_or_no else reset_mode(Mode.DECAWM)
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def without_line_wrap(write: Callable[[str], None]) -> Generator[None, None, None]:
|
||||||
|
write(set_line_wrapping(False))
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
write(set_line_wrapping(True))
|
||||||
|
|
||||||
|
|
||||||
@cmd
|
@cmd
|
||||||
def set_cursor_visible(yes_or_no: bool) -> str:
|
def set_cursor_visible(yes_or_no: bool) -> str:
|
||||||
return set_mode(Mode.DECTCEM) if yes_or_no else reset_mode(Mode.DECTCEM)
|
return set_mode(Mode.DECTCEM) if yes_or_no else reset_mode(Mode.DECTCEM)
|
||||||
|
|||||||
@ -30,6 +30,14 @@ def get_key_press(allowed: str, default: str) -> str:
|
|||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
def format_number(val: float, max_num_of_decimals: int = 2) -> str:
|
||||||
|
ans = str(val)
|
||||||
|
pos = ans.find('.')
|
||||||
|
if pos > -1:
|
||||||
|
ans = ans[:pos + max_num_of_decimals + 1]
|
||||||
|
return ans.rstrip('0').rstrip('.')
|
||||||
|
|
||||||
|
|
||||||
def human_size(
|
def human_size(
|
||||||
size: int, sep: str = ' ',
|
size: int, sep: str = ' ',
|
||||||
max_num_of_decimals: int = 2,
|
max_num_of_decimals: int = 2,
|
||||||
@ -40,8 +48,4 @@ def human_size(
|
|||||||
return f'{size}{sep}{unit_list[0]}'
|
return f'{size}{sep}{unit_list[0]}'
|
||||||
from math import log
|
from math import log
|
||||||
exponent = min(int(log(size, 1024)), len(unit_list) - 1)
|
exponent = min(int(log(size, 1024)), len(unit_list) - 1)
|
||||||
ans = str(size / 1024**exponent)
|
return format_number(size / 1024**exponent, max_num_of_decimals) + sep + unit_list[exponent]
|
||||||
pos = ans.find('.')
|
|
||||||
if pos > -1:
|
|
||||||
ans = ans[:pos + max_num_of_decimals + 1]
|
|
||||||
return ans.rstrip('0').rstrip('.') + sep + unit_list[exponent]
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user