import asyncio
import enum
import errno
import fnmatch
import json
import logging
import os
import re
import time
from dataclasses import dataclass
from email.utils import parsedate_to_datetime
from pathlib import Path
from stat import S_ISREG
from typing import (
AbstractSet,
Any,
AsyncGenerator,
AsyncIterator,
Awaitable,
Callable,
Dict,
Iterable,
Mapping,
Optional,
Tuple,
Union,
cast,
)
import aiohttp
import attr
from yarl import URL
from ._abc import (
AbstractDeleteProgress,
AbstractFileProgress,
AbstractRecursiveFileProgress,
StorageProgressComplete,
StorageProgressDelete,
StorageProgressEnterDir,
StorageProgressFail,
StorageProgressLeaveDir,
StorageProgressStart,
StorageProgressStep,
_AsyncAbstractDeleteProgress,
_AsyncAbstractFileProgress,
_AsyncAbstractRecursiveFileProgress,
)
from ._config import Config
from ._core import _Core
from ._errors import NDJSONError, ResourceNotFound
from ._file_filter import AsyncFilterFunc, FileFilter
from ._rewrite import rewrite_module
from ._url_utils import (
_extract_path,
normalize_local_path_uri,
normalize_storage_path_uri,
)
from ._users import Action
from ._utils import (
NoPublicConstructor,
QueuedCall,
aclosing,
asyncgeneratorcontextmanager,
queue_calls,
retries,
)
log = logging.getLogger(__package__)
MAX_OPEN_FILES = 20
READ_SIZE = 2**20 # 1 MiB
TIME_THRESHOLD = 1.0
Printer = Callable[[str], None]
@rewrite_module
class FileStatusType(str, enum.Enum):
DIRECTORY = "DIRECTORY"
FILE = "FILE"
SYMLINK = "SYMLINK"
UNKNOWN = "UNKNOWN"
@rewrite_module
@dataclass(frozen=True)
class FileStatus:
path: str
size: int
type: FileStatusType
modification_time: int
permission: Action
uri: URL
target: Optional[str] = None
[docs]
def is_file(self) -> bool:
return self.type == FileStatusType.FILE
[docs]
def is_dir(self) -> bool:
return self.type == FileStatusType.DIRECTORY
def is_symlink(self) -> bool:
return self.type == FileStatusType.SYMLINK
@property
def name(self) -> str:
return Path(self.path).name
@rewrite_module
@dataclass(frozen=True)
class DiskUsageInfo:
cluster_name: str
total: int
used: int
free: int
org_name: str
uri: Optional[URL] = None
@rewrite_module
class Storage(metaclass=NoPublicConstructor):
def __init__(self, core: _Core, config: Config) -> None:
self._core = core
self._config = config
self._file_sem = asyncio.BoundedSemaphore(MAX_OPEN_FILES)
self._min_time_diff = 0.0
self._max_time_diff = 0.0
def _normalize_uri(self, uri: URL) -> URL:
return normalize_storage_path_uri(
uri,
self._config.project_name_or_raise,
self._config.cluster_name,
self._config.org_name,
)
def _get_storage_url(self, uri: URL, *, normalized: bool = False) -> URL:
if not normalized:
uri = self._normalize_uri(uri)
assert uri.host is not None
return self._config.get_cluster(uri.host).storage_url / uri.path.lstrip("/")
def _set_time_diff(self, request_time: float, resp: aiohttp.ClientResponse) -> None:
response_time = time.time()
try:
server_dt = parsedate_to_datetime(resp.headers.get("Date", ""))
except ValueError:
return
server_time = server_dt.timestamp()
# Remove 1 because server time has been truncated
# and can be up to 1 second less than the actual value.
self._min_time_diff = request_time - server_time - 1.0
self._max_time_diff = response_time - server_time
def _check_upload(
self, local: os.stat_result, remote: FileStatus, update: bool, continue_: bool
) -> Optional[int]:
if (
local.st_mtime - remote.modification_time
> self._min_time_diff - TIME_THRESHOLD
):
# Local is likely newer.
return 0
# Remote is definitely newer.
if update:
return None
if continue_:
if local.st_size == remote.size: # complete
return None
if local.st_size > remote.size: # partial
return remote.size
return 0
def _check_download(
self, local: os.stat_result, remote: FileStatus, update: bool, continue_: bool
) -> Optional[int]:
# Add 1 because remote.modification_time has been truncated
# and can be up to 1 second less than the actual value.
if (
local.st_mtime - remote.modification_time
< self._max_time_diff + TIME_THRESHOLD + 1.0
):
# Remote is likely newer.
return 0
# Local is definitely newer.
if update:
return None
if continue_:
if local.st_size == remote.size: # complete
return None
if local.st_size < remote.size: # partial
return local.st_size
return 0
[docs]
@asyncgeneratorcontextmanager
async def list(self, uri: URL) -> AsyncIterator[FileStatus]:
uri = self._normalize_uri(uri)
url = self._get_storage_url(uri, normalized=True)
url = url.with_query(op="LISTSTATUS")
headers = {"Accept": "application/x-ndjson"}
request_time = time.time()
auth = await self._config._api_auth()
# NB: the storage server returns file names in FileStatus for LISTSTATUS
# but full path for GETFILESTATUS
async with self._core.request("GET", url, headers=headers, auth=auth) as resp:
self._set_time_diff(request_time, resp)
if resp.headers.get("Content-Type", "").startswith("application/x-ndjson"):
async for line in resp.content:
server_message = json.loads(line)
self.check_for_server_error(server_message)
status = server_message["FileStatus"]
yield _file_status_from_api_ls(uri, status)
else:
res = await resp.json()
for status in res["FileStatuses"]["FileStatus"]:
yield _file_status_from_api_ls(uri, status)
[docs]
@asyncgeneratorcontextmanager
async def glob(self, uri: URL, *, dironly: bool = False) -> AsyncIterator[URL]:
if not _has_magic(uri.path):
yield uri
return
basename = uri.name
glob_in_dir: Callable[[URL, str, bool], AsyncGenerator[URL, None]]
trailing_slash = uri.path.endswith("/")
if not _has_magic(basename):
glob_in_dir = self._glob0
elif not _isrecursive(basename):
glob_in_dir = self._glob1
else:
glob_in_dir = self._glob2
async with self.glob(uri.parent, dironly=True) as parent_iter:
async for parent in parent_iter:
async with aclosing(glob_in_dir(parent, basename, dironly)) as it:
async for x in it:
sx = str(x)
if trailing_slash and not sx.endswith("/"):
yield URL(sx + "/")
else:
yield x
async def _glob2(
self, parent: URL, pattern: str, dironly: bool
) -> AsyncGenerator[URL, None]:
assert _isrecursive(pattern)
yield parent
async with aclosing(self._rlistdir(parent, dironly)) as it:
async for x in it:
yield x
async def _glob1(
self, parent: URL, pattern: str, dironly: bool
) -> AsyncGenerator[URL, None]:
allow_hidden = _ishidden(pattern)
match = re.compile(fnmatch.translate(pattern)).fullmatch
async with aclosing(self._iterdir(parent, dironly)) as it:
async for stat in it:
name = stat.path
if (allow_hidden or not _ishidden(name)) and match(name):
yield parent / name
async def _glob0(
self, parent: URL, basename: str, dironly: bool
) -> AsyncGenerator[URL, None]:
uri = parent / basename
try:
await self.stat(uri)
except ResourceNotFound:
return
yield uri
async def _iterdir(
self, uri: URL, dironly: bool
) -> AsyncGenerator[FileStatus, None]:
async with self.list(uri) as it:
async for stat in it:
if not dironly or stat.is_dir():
yield stat
async def _rlistdir(self, uri: URL, dironly: bool) -> AsyncGenerator[URL, None]:
async with aclosing(self._iterdir(uri, dironly)) as it:
async for stat in it:
name = stat.path
if not _ishidden(name):
x = uri / name
yield x
if stat.is_dir():
async with aclosing(self._rlistdir(x, dironly)) as it2:
async for y in it2:
yield y
[docs]
async def mkdir(
self, uri: URL, *, parents: bool = False, exist_ok: bool = False
) -> None:
if not exist_ok:
try:
await self.stat(uri)
except ResourceNotFound:
pass
else:
raise FileExistsError(errno.EEXIST, "File exists", str(uri))
if not parents:
parent = uri
while not parent.name and parent != parent.parent:
parent = parent.parent
parent = parent.parent
if parent != parent.parent:
try:
await self.stat(parent)
except ResourceNotFound:
raise FileNotFoundError(
errno.ENOENT, "No such directory", str(parent)
)
url = self._get_storage_url(uri)
url = url.with_query(op="MKDIRS")
auth = await self._config._api_auth()
async with self._core.request("PUT", url, auth=auth) as resp:
resp # resp.status == 201
[docs]
async def create(self, uri: URL, data: Union[bytes, AsyncIterator[bytes]]) -> None:
url = self._get_storage_url(uri)
url = url.with_query(op="CREATE")
timeout = attr.evolve(self._core.timeout, sock_read=None)
auth = await self._config._api_auth()
async with self._core.request(
"PUT", url, data=data, timeout=timeout, auth=auth
) as resp:
resp # resp.status == 201
[docs]
async def write(self, uri: URL, data: bytes, offset: int) -> None:
if not data:
raise ValueError("empty data")
url = self._get_storage_url(uri)
url = url.with_query(op="WRITE")
timeout = attr.evolve(self._core.timeout, sock_read=None)
auth = await self._config._api_auth()
headers = {"Content-Range": f"bytes {offset}-{offset + len(data) - 1}/*"}
async with self._core.request(
"PATCH", url, data=data, timeout=timeout, auth=auth, headers=headers
) as resp:
resp # resp.status == 200
[docs]
async def stat(self, uri: URL) -> FileStatus:
uri = self._normalize_uri(uri)
assert uri.host is not None
url = self._get_storage_url(uri, normalized=True)
url = url.with_query(op="GETFILESTATUS")
auth = await self._config._api_auth()
request_time = time.time()
# NB: the storage server returns file names in FileStatus for LISTSTATUS
# but full path for GETFILESTATUS
async with self._core.request("GET", url, auth=auth) as resp:
self._set_time_diff(request_time, resp)
res = await resp.json()
return _file_status_from_api_stat(uri.host, res["FileStatus"])
[docs]
async def disk_usage(
self,
cluster_name: Optional[str] = None,
org_name: Optional[str] = None,
uri: Optional[URL] = None,
) -> DiskUsageInfo:
cluster_name = cluster_name or self._config.cluster_name
org_name_val = org_name or self._config.org_name
if uri:
url = self._get_storage_url(uri)
else:
url = self._get_storage_url(
self._create_disk_usage_uri(cluster_name, org_name_val), normalized=True
)
url = url.with_query(op="GETDISKUSAGE")
auth = await self._config._api_auth()
request_time = time.time()
async with self._core.request("GET", url, auth=auth) as resp:
self._set_time_diff(request_time, resp)
res = await resp.json()
return _disk_usage_from_api(cluster_name, org_name_val, uri, res)
def _create_disk_usage_uri(
self, cluster_name: Optional[str], org_name: Optional[str]
) -> URL:
if org_name:
uri = self._normalize_uri(
URL(
f"storage://{cluster_name}/{org_name}"
f"/{self._config.project_name_or_raise}"
)
)
else:
uri = self._normalize_uri(
URL(f"storage://{cluster_name}/{self._config.project_name_or_raise}")
)
assert uri.host is not None
return uri
[docs]
@asyncgeneratorcontextmanager
async def open(
self, uri: URL, offset: int = 0, size: Optional[int] = None
) -> AsyncIterator[bytes]:
url = self._get_storage_url(uri)
url = url.with_query(op="OPEN")
timeout = attr.evolve(self._core.timeout, sock_read=None)
auth = await self._config._api_auth()
if offset < 0:
raise ValueError("offset should be >= 0")
if size is None:
if offset:
partial = True
headers = {"Range": f"bytes={offset}-"}
else:
partial = False
headers = {}
elif size > 0:
partial = True
headers = {"Range": f"bytes={offset}-{offset + size - 1}"}
elif not size:
return
else:
raise ValueError("size should be >= 0")
async with self._core.request(
"GET", url, timeout=timeout, auth=auth, headers=headers
) as resp:
if partial:
if resp.status != aiohttp.web.HTTPPartialContent.status_code:
raise RuntimeError(f"Unexpected status code {resp.status}")
rng = _parse_content_range(resp.headers.get("Content-Range"))
if rng.start != offset:
raise RuntimeError("Invalid header Content-Range")
async for data in resp.content.iter_any():
yield data
[docs]
async def rm(
self,
uri: URL,
*,
recursive: bool = False,
progress: Optional[AbstractDeleteProgress] = None,
) -> None:
# TODO (asvetlov): add a minor protection against deleting everything from root
# or user volume root, however force operation here should allow user to delete
# everything.
#
# Now it doesn't make sense because URL for root folder (storage:///) is not
# supported
#
# parts = path.split('/')
# if final_path == root_data_path or final_path.parent == root_data_path:
# raise ValueError("Invalid path value.")
async_progress: _AsyncAbstractDeleteProgress
queue, async_progress = queue_calls(progress)
await run_progress(
queue, self._rm(uri, recursive=recursive, progress=async_progress)
)
def check_for_server_error(self, server_message: Mapping[str, Any]) -> None:
if "error" in server_message:
err_text = server_message["error"]
os_errno = server_message.get("errno", None)
if os_errno is not None:
os_errno = errno.__dict__.get(os_errno, os_errno)
raise OSError(os_errno, err_text)
raise NDJSONError(err_text)
async def _rm(
self, uri: URL, *, recursive: bool, progress: _AsyncAbstractDeleteProgress
) -> None:
uri = self._normalize_uri(uri)
assert uri.host is not None
url = self._get_storage_url(uri, normalized=True)
url = url.with_query(op="DELETE", recursive="true" if recursive else "false")
auth = await self._config._api_auth()
base_uri = URL.build(scheme="storage", authority=uri.host)
headers = {"Accept": "application/x-ndjson"}
async with self._core.request(
"DELETE", url, headers=headers, auth=auth
) as resp:
if resp.headers.get("Content-Type", "").startswith("application/x-ndjson"):
async for line in resp.content:
server_message = json.loads(line)
self.check_for_server_error(server_message)
await progress.delete(
StorageProgressDelete(
uri=base_uri / server_message["path"].lstrip("/"),
is_dir=server_message["is_dir"],
)
)
else:
pass # Old server versions do not support delete status streaming
[docs]
async def mv(self, src: URL, dst: URL) -> None:
src = self._normalize_uri(src)
dst = self._normalize_uri(dst)
assert src.host is not None
assert dst.host is not None
if src.host != dst.host:
raise ValueError("Cannot move cross-cluster")
url = self._get_storage_url(src)
url = url.with_query(op="RENAME", destination="/" + dst.path.lstrip("/"))
auth = await self._config._api_auth()
async with self._core.request("POST", url, auth=auth) as resp:
resp # resp.status == 204
# high-level helpers
[docs]
async def upload_file(
self,
src: URL,
dst: URL,
*,
update: bool = False,
continue_: bool = False,
progress: Optional[AbstractFileProgress] = None,
) -> None:
src = normalize_local_path_uri(src)
dst = self._normalize_uri(dst)
path = _extract_path(src)
try:
if not path.exists():
raise FileNotFoundError(errno.ENOENT, "No such file", str(path))
if path.is_dir():
raise IsADirectoryError(
errno.EISDIR, "Is a directory, use recursive copy", str(path)
)
except OSError as e:
if getattr(e, "winerror", None) not in (1, 87):
raise
# Ignore stat errors for device files like NUL or CON on Windows.
# See https://bugs.python.org/issue37074
offset: Optional[int] = 0
try:
dst_stat = await self.stat(dst)
if dst_stat.is_dir():
raise IsADirectoryError(errno.EISDIR, "Is a directory", str(dst))
except ResourceNotFound:
# target doesn't exist, lookup for parent dir
try:
dst_parent_stat = await self.stat(dst.parent)
if not dst_parent_stat.is_dir():
# parent path should be a folder
raise NotADirectoryError(
errno.ENOTDIR, "Not a directory", str(dst.parent)
)
except ResourceNotFound:
raise NotADirectoryError(
errno.ENOTDIR, "Not a directory", str(dst.parent)
)
else:
if update or continue_:
try:
src_stat = path.stat()
except OSError:
pass
else:
if S_ISREG(src_stat.st_mode):
offset = self._check_upload(
src_stat, dst_stat, update, continue_
)
if offset is None:
return
async_progress: _AsyncAbstractFileProgress
queue, async_progress = queue_calls(progress)
await run_progress(
queue, self._upload_file(path, dst, offset, progress=async_progress)
)
async def _upload_file(
self,
src_path: Path,
dst: URL,
offset: int,
*,
progress: _AsyncAbstractFileProgress,
) -> None:
src = URL(src_path.as_uri())
loop = asyncio.get_event_loop()
async with self._file_sem:
with src_path.open("rb") as stream:
size = os.stat(stream.fileno()).st_size
await progress.start(StorageProgressStart(src, dst, size))
if offset:
stream.seek(offset)
else:
chunk = await loop.run_in_executor(None, stream.read, READ_SIZE)
for retry in retries(f"Fail to upload {dst}"):
async with retry:
await self.create(dst, chunk)
offset = len(chunk)
if offset:
while True:
await progress.step(StorageProgressStep(src, dst, offset, size))
chunk = await loop.run_in_executor(None, stream.read, READ_SIZE)
if not chunk:
break
for retry in retries(f"Fail to upload {dst}"):
async with retry:
await self.write(dst, chunk, offset)
offset += len(chunk)
await progress.complete(StorageProgressComplete(src, dst, size))
[docs]
async def upload_dir(
self,
src: URL,
dst: URL,
*,
update: bool = False,
continue_: bool = False,
filter: Optional[AsyncFilterFunc] = None,
ignore_file_names: AbstractSet[str] = frozenset(),
progress: Optional[AbstractRecursiveFileProgress] = None,
) -> None:
src = normalize_local_path_uri(src)
dst = self._normalize_uri(dst)
path = _extract_path(src).resolve()
if not path.exists():
raise FileNotFoundError(errno.ENOENT, "No such file", str(path))
if not path.is_dir():
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", str(path))
if filter is None:
filter = _always
if ignore_file_names:
filter = load_parent_ignore_files(filter, ignore_file_names, path)
async_progress: _AsyncAbstractRecursiveFileProgress
queue, async_progress = queue_calls(progress)
await run_progress(
queue,
self._upload_dir(
src,
path,
dst,
"",
update=update,
continue_=continue_,
filter=filter,
ignore_file_names=ignore_file_names,
progress=async_progress,
),
)
async def _upload_dir(
self,
src: URL,
src_path: Path,
dst: URL,
rel_path: str,
*,
update: bool,
continue_: bool,
filter: AsyncFilterFunc,
ignore_file_names: AbstractSet[str],
progress: _AsyncAbstractRecursiveFileProgress,
) -> None:
tasks = []
try:
exists = False
if update or continue_:
try:
for retry in retries(f"Fail to list {dst}"):
async with retry:
async with self.list(dst) as it:
dst_files = {
item.name: item
async for item in it
if item.is_file()
}
exists = True
except ResourceNotFound:
update = continue_ = False
if not exists:
for retry in retries(f"Fail to create {dst}"):
async with retry:
await self.mkdir(dst, exist_ok=True)
except FileExistsError:
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", str(dst))
await progress.enter(StorageProgressEnterDir(src, dst))
loop = asyncio.get_event_loop()
async with self._file_sem:
folder = await loop.run_in_executor(None, lambda: list(src_path.iterdir()))
if ignore_file_names:
for child in folder:
if child.name in ignore_file_names and child.is_file():
log.debug(f"Load ignore file {rel_path}{child.name}")
file_filter = FileFilter(filter)
file_filter.read_from_file(child, prefix=rel_path)
filter = file_filter.match
for child in folder:
name = child.name
child_rel_path = f"{rel_path}{name}"
if child.is_dir():
child_rel_path += "/"
if not await filter(child_rel_path):
log.debug(f"Skip {child_rel_path}")
continue
if child.is_file():
offset: Optional[int] = 0
if (update or continue_) and name in dst_files:
offset = self._check_upload(
child.stat(), dst_files[name], update, continue_
)
if offset is None:
continue
tasks.append(
self._upload_file(
src_path / name, dst / name, offset, progress=progress
)
)
elif child.is_dir():
tasks.append(
self._upload_dir(
src / name,
src_path / name,
dst / name,
child_rel_path,
update=update,
continue_=continue_,
filter=filter,
ignore_file_names=ignore_file_names,
progress=progress,
)
)
else:
# This case is for uploading non-regular file,
# e.g. blocking device or unix socket
# Coverage temporary skipped, the line is waiting for a champion
await progress.fail(
StorageProgressFail(
src / name,
dst / name,
f"Cannot upload {child}, not regular file/directory",
),
) # pragma: no cover
await run_concurrently(tasks)
await progress.leave(StorageProgressLeaveDir(src, dst))
[docs]
async def download_file(
self,
src: URL,
dst: URL,
*,
update: bool = False,
continue_: bool = False,
progress: Optional[AbstractFileProgress] = None,
) -> None:
src = self._normalize_uri(src)
dst = normalize_local_path_uri(dst)
path = _extract_path(dst)
src_stat = await self.stat(src)
if not src_stat.is_file():
raise IsADirectoryError(errno.EISDIR, "Is a directory", str(src))
offset: Optional[int] = 0
if update or continue_:
try:
dst_stat = path.stat()
except OSError:
pass
else:
if S_ISREG(dst_stat.st_mode):
offset = self._check_download(dst_stat, src_stat, update, continue_)
if offset is None:
return
async_progress: _AsyncAbstractFileProgress
queue, async_progress = queue_calls(progress)
await run_progress(
queue,
self._download_file(
src, dst, path, src_stat.size, offset, progress=async_progress
),
)
async def _download_file(
self,
src: URL,
dst: URL,
dst_path: Path,
size: int,
offset: int,
*,
progress: _AsyncAbstractFileProgress,
) -> None:
loop = asyncio.get_event_loop()
async with self._file_sem:
await progress.start(StorageProgressStart(src, dst, size))
with dst_path.open("rb+" if offset else "wb") as stream:
if offset:
stream.seek(offset)
for retry in retries(f"Fail to download {src}"):
pos = stream.tell()
if pos >= size:
break
async with retry:
async with self.open(src, offset=pos) as it:
async for chunk in it:
pos += len(chunk)
await progress.step(
StorageProgressStep(src, dst, pos, size)
)
await loop.run_in_executor(None, stream.write, chunk)
if chunk:
retry.reset()
await progress.complete(StorageProgressComplete(src, dst, size))
[docs]
async def download_dir(
self,
src: URL,
dst: URL,
*,
update: bool = False,
continue_: bool = False,
filter: Optional[AsyncFilterFunc] = None,
progress: Optional[AbstractRecursiveFileProgress] = None,
) -> None:
if filter is None:
filter = _always
src = self._normalize_uri(src)
dst = normalize_local_path_uri(dst)
path = _extract_path(dst)
async_progress: _AsyncAbstractRecursiveFileProgress
queue, async_progress = queue_calls(progress)
await run_progress(
queue,
self._download_dir(
src,
dst,
path,
"",
update=update,
continue_=continue_,
filter=filter,
progress=async_progress,
),
)
async def _download_dir(
self,
src: URL,
dst: URL,
dst_path: Path,
rel_path: str,
*,
update: bool,
continue_: bool,
filter: AsyncFilterFunc,
progress: _AsyncAbstractRecursiveFileProgress,
) -> None:
dst_path.mkdir(parents=True, exist_ok=True)
await progress.enter(StorageProgressEnterDir(src, dst))
tasks = []
if update or continue_:
loop = asyncio.get_event_loop()
async with self._file_sem:
dst_files = await loop.run_in_executor(
None,
lambda: {
item.name: item for item in dst_path.iterdir() if item.is_file()
},
)
for retry in retries(f"Fail to list {src}"):
async with retry:
async with self.list(src) as it:
folder = [item async for item in it]
for child in folder:
name = child.name
child_rel_path = f"{rel_path}{name}"
if child.is_dir():
child_rel_path += "/"
if not await filter(child_rel_path):
log.debug(f"Skip {child_rel_path}")
continue
if child.is_file():
offset: Optional[int] = 0
if (update or continue_) and name in dst_files:
offset = self._check_download(
dst_files[name].stat(), child, update, continue_
)
if offset is None:
continue
tasks.append(
self._download_file(
src / name,
dst / name,
dst_path / name,
child.size,
offset,
progress=progress,
)
)
elif child.is_dir():
tasks.append(
self._download_dir(
src / name,
dst / name,
dst_path / name,
child_rel_path,
update=update,
continue_=continue_,
filter=filter,
progress=progress,
)
)
else:
await progress.fail(
StorageProgressFail(
src / name,
dst / name,
f"Cannot download {child}, not regular file/directory",
),
) # pragma: no cover
await run_concurrently(tasks)
await progress.leave(StorageProgressLeaveDir(src, dst))
_magic_check = re.compile("(?:[*?[])")
def _has_magic(s: str) -> bool:
return _magic_check.search(s) is not None
def _ishidden(name: str) -> bool:
return name.startswith(".")
def _isrecursive(pattern: str) -> bool:
return pattern == "**"
def _file_status_from_api_ls(base_uri: URL, values: Dict[str, Any]) -> FileStatus:
path = values["path"]
try:
type = FileStatusType(values["type"])
except ValueError:
type = FileStatusType.UNKNOWN
return FileStatus(
path=path,
type=type,
size=int(values["length"]),
modification_time=int(values["modificationTime"]),
permission=Action(values["permission"]),
uri=base_uri / path,
target=values.get("target"),
)
def _file_status_from_api_stat(cluster_name: str, values: Dict[str, Any]) -> FileStatus:
base_uri = URL.build(scheme="storage", authority=cluster_name)
path = values["path"]
try:
type = FileStatusType(values["type"])
except ValueError:
type = FileStatusType.UNKNOWN
return FileStatus(
path=path,
type=type,
size=int(values["length"]),
modification_time=int(values["modificationTime"]),
permission=Action(values["permission"]),
uri=base_uri / path.lstrip("/"),
target=values.get("target"),
)
def _disk_usage_from_api(
cluster_name: str,
org_name: str,
uri: Optional[URL],
values: Dict[str, Any],
) -> DiskUsageInfo:
return DiskUsageInfo(
cluster_name=cluster_name,
org_name=org_name,
uri=uri,
total=values["total"],
used=values["used"],
free=values["free"],
)
def _parse_content_range(rng_str: Optional[str]) -> slice:
if rng_str is None:
raise RuntimeError("Missed header Content-Range")
m = re.fullmatch(r"bytes (\d+)-(\d+)/(\d+|\*)", rng_str)
if not m:
raise RuntimeError("Invalid header Content-Range")
start = int(m[1])
end = int(m[2])
if end < start:
raise RuntimeError("Invalid header Content-Range" + rng_str)
return slice(start, end + 1)
ProgressQueueItem = Optional[Tuple[Callable[[Any], None], Any]]
async def run_progress(
queue: "asyncio.Queue[QueuedCall]", coro: Awaitable[None]
) -> None:
async def wrapped() -> None:
try:
await coro
finally:
# Add special marker to queue to allow loop below to exit
await queue.put(cast(QueuedCall, None))
loop = asyncio.get_event_loop()
task = loop.create_task(wrapped())
while True:
item = await queue.get()
if item is None:
break
item()
await task
async def run_concurrently(coros: Iterable[Awaitable[Any]]) -> None:
loop = asyncio.get_event_loop()
tasks: "Iterable[asyncio.Future[Any]]" = [loop.create_task(coro) for coro in coros] # type: ignore # noqa
if not tasks:
return
try:
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
for task in done:
await task
except: # noqa
for task in tasks:
task.cancel()
# wait for actual cancellation, ignore all exceptions raised from tasks
if tasks:
await asyncio.wait(tasks)
raise # pragma: no cover
async def _always(path: str) -> bool:
return True
def load_parent_ignore_files(
filter: AsyncFilterFunc,
ignore_file_names: AbstractSet[str],
path: Path,
rel_path: str = "",
) -> AsyncFilterFunc:
if path == path.parent:
return filter
rel_path = f"{path.name}/{rel_path}"
path = path.parent
filter = load_parent_ignore_files(filter, ignore_file_names, path, rel_path)
for name in ignore_file_names:
config_path = path / name
if config_path.exists():
log.debug(f"Load ignore file {str(config_path)!r}")
file_filter = FileFilter(filter)
file_filter.read_from_file(config_path, "", rel_path)
filter = file_filter.match
return filter