From c5c3811901b508040d4ec32961a6174b0a0d9178 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Tue, 18 Jan 2022 16:17:48 +0900 Subject: [PATCH 01/15] feat: draft for manager filebrowser requests management --- src/ai/backend/manager/api/filebrowser.py | 111 +++++++++++++ src/ai/backend/manager/api/vfolder.py | 3 + src/ai/backend/manager/models/filebrowser.py | 163 +++++++++++++++++++ src/ai/backend/manager/models/storage.py | 7 + src/ai/backend/manager/server.py | 8 + 5 files changed, 292 insertions(+) create mode 100644 src/ai/backend/manager/api/filebrowser.py create mode 100644 src/ai/backend/manager/models/filebrowser.py diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py new file mode 100644 index 000000000..64fccfeac --- /dev/null +++ b/src/ai/backend/manager/api/filebrowser.py @@ -0,0 +1,111 @@ +import asyncio +from datetime import datetime +import functools +import json +import logging +import math +from pathlib import Path +import stat +from typing import ( + Any, + Awaitable, + Callable, + Dict, + List, + Mapping, + MutableMapping, + Sequence, + Set, + TYPE_CHECKING, + Tuple, + Iterable +) +import uuid + +import aiohttp +from aiohttp import web +import aiohttp_cors +import sqlalchemy as sa +import trafaret as t + +from ai.backend.common import validators as tx +from ai.backend.common.logging import BraceStyleAdapter + +from ..models import ( + agents, + kernels, + users, groups, keypairs, + vfolders, vfolder_invitations, vfolder_permissions, + AgentStatus, + KernelStatus, + VFolderInvitationState, + VFolderOwnershipType, + VFolderPermission, + VFolderPermissionValidator, + VFolderUsageMode, + UserRole, + query_accessible_vfolders, + query_owned_dotfiles, + get_allowed_vfolder_hosts_by_group, + get_allowed_vfolder_hosts_by_user, + verify_vfolder_name, +) +from .auth import admin_required, auth_required, superadmin_required +from .exceptions import ( + VFolderCreationFailed, VFolderNotFound, VFolderAlreadyExists, VFolderOperationFailed, + GenericForbidden, GenericNotFound, InvalidAPIParameters, ServerMisconfiguredError, + BackendAgentError, InternalServerError, GroupNotFound, +) +from .manager import ( + READ_ALLOWED, ALL_ALLOWED, + server_status_required, +) +from .resource import get_watcher_info +from .utils import check_api_params + +from .context import RootContext +from .types import CORSOptions, WebMiddleware + +log = BraceStyleAdapter(logging.getLogger(__name__)) + +VFolderRow = Mapping[str, Any] +# https://127.0.0.1:6022 folder/create {'X-BackendAI-Storage-Auth-Token': '4574e5afc312cd8a3ad8e0966d1f1d82435df2f3c15200dcd7f607d7024dfe38' +@auth_required +@server_status_required(READ_ALLOWED) +async def create_or_update_filebrowser(request: web.Request) -> web.Response: + print(request) + print("*********************************") + root_ctx: RootContext = request.app['_root.context'] + + try: + async with aiohttp.ClientSession() as session: + async with session.post('http://python.org') as response: + + print("Status:", response.status) + print("Content-type:", response.headers['content-type']) + + html = await response.text() + print("Body:", html[:15], "...") + pass + except aiohttp.ClientResponseError: + raise + return request + +async def init(app: web.Application) -> None: + pass + +async def shutdown(app: web.Application) -> None: + pass + +def create_app(default_cors_options: CORSOptions) -> Tuple[web.Application, Iterable[WebMiddleware]]: + app = web.Application() + app['prefix'] = 'browser' + print("FileBrowser Server started...") + app['api_versions'] = ( 2, 3,4) + app.on_startup.append(init) + app.on_shutdown.append(shutdown) + cors = aiohttp_cors.setup(app, defaults=default_cors_options) + + cors.add(app.router.add_route('POST', r'/create', create_or_update_filebrowser)) + + return app, [] diff --git a/src/ai/backend/manager/api/vfolder.py b/src/ai/backend/manager/api/vfolder.py index 6a350e173..1ca24189c 100644 --- a/src/ai/backend/manager/api/vfolder.py +++ b/src/ai/backend/manager/api/vfolder.py @@ -321,6 +321,8 @@ async def create(request: web.Request, params: Any) -> web.Response: folder_id = uuid.uuid4() if not unmanaged_path: # Try to create actual only if vFolder is managed one + print("debug ", folder_host, root_ctx.storage_manager.split_host(folder_host)) + async with root_ctx.storage_manager.request( folder_host, 'POST', 'folder/create', json={ @@ -887,6 +889,7 @@ async def update_vfolder_options(request: web.Request, params: Any, row: VFolder })) async def mkdir(request: web.Request, params: Any, row: VFolderRow) -> web.Response: root_ctx: RootContext = request.app['_root.context'] + root_ctx.storage_manager.request folder_name = request.match_info['name'] access_key = request['keypair']['access_key'] log.info('VFOLDER.MKDIR (ak:{}, vf:{}, path:{})', access_key, folder_name, params['path']) diff --git a/src/ai/backend/manager/models/filebrowser.py b/src/ai/backend/manager/models/filebrowser.py new file mode 100644 index 000000000..98cd59b9b --- /dev/null +++ b/src/ai/backend/manager/models/filebrowser.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +import asyncio +from contextlib import asynccontextmanager as actxmgr +from contextvars import ContextVar +import itertools +import logging +from typing import ( + Any, + AsyncIterator, + Final, + Iterable, + List, + Mapping, + Sequence, + Tuple, + TypedDict, + TYPE_CHECKING, +) +from uuid import UUID + +from ai.backend.common.logging import BraceStyleAdapter +from ai.backend.common.types import HardwareMetadata + +import aiohttp +import attr +import graphene +import yarl + +from .base import ( + Item, PaginatedList, +) +from ..exceptions import InvalidArgument +if TYPE_CHECKING: + from .gql import GraphQueryContext + +__all__ = ( + 'StorageProxyInfo', + 'VolumeInfo', + 'BrowserSessionManager', + 'StorageVolume', +) + +log = BraceStyleAdapter(logging.getLogger(__name__)) + + +@attr.s(auto_attribs=True, slots=True, frozen=True) +class StorageProxyInfo: + session: aiohttp.ClientSession + secret: str + client_api_url: yarl.URL + manager_api_url: yarl.URL + + +AUTH_TOKEN_HDR: Final = 'X-BackendAI-Storage-Auth-Token' + +_ctx_volumes_cache: ContextVar[List[Tuple[str, VolumeInfo]]] = ContextVar('_ctx_volumes') + + +class VolumeInfo(TypedDict): + name: str + backend: str + path: str + fsprefix: str + capabilities: List[str] + + +class BrowserSessionManager: + + _proxies: Mapping[str, StorageProxyInfo] + + def __init__(self, storage_config: Mapping[str, Any]) -> None: + self.config = storage_config + self._proxies = {} + for proxy_name, proxy_config in self.config['proxies'].items(): + connector = aiohttp.TCPConnector(ssl=proxy_config['ssl_verify']) + session = aiohttp.ClientSession(connector=connector) + self._proxies[proxy_name] = StorageProxyInfo( + session=session, + secret=proxy_config['secret'], + client_api_url=yarl.URL(proxy_config['client_api']), + manager_api_url=yarl.URL(proxy_config['manager_api']), + ) + + async def aclose(self) -> None: + close_aws = [] + for proxy_info in self._proxies.values(): + close_aws.append(proxy_info.session.close()) + await asyncio.gather(*close_aws, return_exceptions=True) + + @staticmethod + def split_host(vfolder_host: str) -> Tuple[str, str]: + proxy_name, _, volume_name = vfolder_host.partition(':') + print("Split host result: ", proxy_name, volume_name) + return proxy_name, volume_name + + async def get_all_volumes(self) -> Iterable[Tuple[str, VolumeInfo]]: + try: + # per-asyncio-task cache + return _ctx_volumes_cache.get() + except LookupError: + pass + fetch_aws = [] + + async def _fetch( + proxy_name: str, + proxy_info: StorageProxyInfo, + ) -> Iterable[Tuple[str, VolumeInfo]]: + print("Proxy address: ", proxy_info.manager_api_url / 'volumes') + async with proxy_info.session.request( + 'GET', proxy_info.manager_api_url / 'volumes', + raise_for_status=True, + headers={AUTH_TOKEN_HDR: proxy_info.secret}, + ) as resp: + reply = await resp.json() + print("volume_Data: ", reply) + return ((proxy_name, volume_data) for volume_data in reply['volumes']) + + for proxy_name, proxy_info in self._proxies.items(): + fetch_aws.append(_fetch(proxy_name, proxy_info)) + results = [*itertools.chain(*await asyncio.gather(*fetch_aws))] + _ctx_volumes_cache.set(results) + return results + + async def get_mount_path(self, vfolder_host: str, vfolder_id: UUID) -> str: + async with self.request( + vfolder_host, 'GET', 'folder/mount', + json={ + 'volume': self.split_host(vfolder_host)[1], + 'vfid': str(vfolder_id), + }, + ) as (_, resp): + reply = await resp.json() + return reply['path'] + + @actxmgr + async def request( + self, + vfolder_host_or_proxy_name: str, + method: str, + request_relpath: str, + /, + *args, + **kwargs, + ) -> AsyncIterator[Tuple[yarl.URL, aiohttp.ClientResponse]]: + proxy_name, _ = self.split_host(vfolder_host_or_proxy_name) + try: + proxy_info = self._proxies[proxy_name] + except KeyError: + raise InvalidArgument('There is no such storage proxy', proxy_name) + headers = kwargs.pop('headers', {}) + headers[AUTH_TOKEN_HDR] = proxy_info.secret + print("sp requests: ", method, proxy_info.manager_api_url, request_relpath, headers) + print("client api: ", proxy_info.client_api_url) + print('args ', args, kwargs) + async with proxy_info.session.request( + method, proxy_info.manager_api_url / request_relpath, + *args, + headers=headers, + **kwargs, + ) as client_resp: + print("client response: ", proxy_info.client_api_url, client_resp) + yield proxy_info.client_api_url, client_resp diff --git a/src/ai/backend/manager/models/storage.py b/src/ai/backend/manager/models/storage.py index 75edaf2a8..0d59c87da 100644 --- a/src/ai/backend/manager/models/storage.py +++ b/src/ai/backend/manager/models/storage.py @@ -91,6 +91,7 @@ async def aclose(self) -> None: @staticmethod def split_host(vfolder_host: str) -> Tuple[str, str]: proxy_name, _, volume_name = vfolder_host.partition(':') + print("Debug. Split host result: ", proxy_name, volume_name) return proxy_name, volume_name async def get_all_volumes(self) -> Iterable[Tuple[str, VolumeInfo]]: @@ -105,12 +106,14 @@ async def _fetch( proxy_name: str, proxy_info: StorageProxyInfo, ) -> Iterable[Tuple[str, VolumeInfo]]: + print("Debug. Proxy address: ", proxy_info.manager_api_url / 'volumes') async with proxy_info.session.request( 'GET', proxy_info.manager_api_url / 'volumes', raise_for_status=True, headers={AUTH_TOKEN_HDR: proxy_info.secret}, ) as resp: reply = await resp.json() + print("Debug. volume_Data: ", reply) return ((proxy_name, volume_data) for volume_data in reply['volumes']) for proxy_name, proxy_info in self._proxies.items(): @@ -147,12 +150,16 @@ async def request( raise InvalidArgument('There is no such storage proxy', proxy_name) headers = kwargs.pop('headers', {}) headers[AUTH_TOKEN_HDR] = proxy_info.secret + print("Debug. sp requests: ", method, proxy_info.manager_api_url, request_relpath, headers) + print("client api: ", proxy_info.client_api_url) + print('args ', args, kwargs) async with proxy_info.session.request( method, proxy_info.manager_api_url / request_relpath, *args, headers=headers, **kwargs, ) as client_resp: + print("Debug. client response: ", proxy_info.client_api_url, client_resp) yield proxy_info.client_api_url, client_resp diff --git a/src/ai/backend/manager/server.py b/src/ai/backend/manager/server.py index 93e6b9739..4265b93df 100644 --- a/src/ai/backend/manager/server.py +++ b/src/ai/backend/manager/server.py @@ -348,6 +348,13 @@ async def storage_manager_ctx(root_ctx: RootContext) -> AsyncIterator[None]: yield await root_ctx.storage_manager.aclose() +@actxmgr +async def filebrowser_ctx(root_ctx: RootContext) -> AsyncIterator[None]: + raw_vol_config = await root_ctx.shared_config.etcd.get_prefix('volumes') + config = volume_config_iv.check(raw_vol_config) + root_ctx.browser_manager = BrowserSessionManager(config) + yield + await root_ctx.browser_manager.aclose() @actxmgr async def hook_plugin_ctx(root_ctx: RootContext) -> AsyncIterator[None]: @@ -576,6 +583,7 @@ async def server_main( '.domainconfig', '.groupconfig', '.logs', + '.filebrowser', ] root_app = build_root_app(pidx, _args[0], subapp_pkgs=subapp_pkgs) root_ctx: RootContext = root_app['_root.context'] From bee80c94df9780c78f09595c7108b746a50dc3e8 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Thu, 20 Jan 2022 17:42:08 +0900 Subject: [PATCH 02/15] fix: add vfid for vfolder name in requests --- src/ai/backend/manager/api/filebrowser.py | 52 +++++++++++++++----- src/ai/backend/manager/models/filebrowser.py | 29 +---------- 2 files changed, 40 insertions(+), 41 deletions(-) diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py index 64fccfeac..2f5364108 100644 --- a/src/ai/backend/manager/api/filebrowser.py +++ b/src/ai/backend/manager/api/filebrowser.py @@ -69,24 +69,51 @@ log = BraceStyleAdapter(logging.getLogger(__name__)) VFolderRow = Mapping[str, Any] -# https://127.0.0.1:6022 folder/create {'X-BackendAI-Storage-Auth-Token': '4574e5afc312cd8a3ad8e0966d1f1d82435df2f3c15200dcd7f607d7024dfe38' + +async def get_vfid(root_ctx: RootContext, name: str) -> str: + async with root_ctx.db.begin() as conn: + query = ( + sa.select([vfolders.c.id]) + .select_from(vfolders) + .where(vfolders.c.name == name) + ) + folder_id = await conn.scalar(query) + + query = (sa.delete(vfolders).where(vfolders.c.id == folder_id)) + + return folder_id.hex + + @auth_required @server_status_required(READ_ALLOWED) async def create_or_update_filebrowser(request: web.Request) -> web.Response: + json = await request.json() print(request) - print("*********************************") + root_ctx: RootContext = request.app['_root.context'] + access_key = request['keypair']['access_key'] + print(access_key) - try: - async with aiohttp.ClientSession() as session: - async with session.post('http://python.org') as response: + vfolders = [] + for vfolder_name in json['vfolders']: + vfolders.append( + { + "name":vfolder_name, + "vfid": await get_vfid(root_ctx, vfolder_name) + + } + ) + - print("Status:", response.status) - print("Content-type:", response.headers['content-type']) + print("Vfolder name with vfid", vfolders) - html = await response.text() - print("Body:", html[:15], "...") - pass + try: + async with root_ctx.storage_manager.request('local:volume1', 'POST', 'browser/create', + + json = { "vfolders": vfolders }, + raise_for_status=True, + ): + pass except aiohttp.ClientResponseError: raise return request @@ -101,11 +128,10 @@ def create_app(default_cors_options: CORSOptions) -> Tuple[web.Application, Iter app = web.Application() app['prefix'] = 'browser' print("FileBrowser Server started...") - app['api_versions'] = ( 2, 3,4) + app['api_versions'] = (2, 3, ) app.on_startup.append(init) app.on_shutdown.append(shutdown) cors = aiohttp_cors.setup(app, defaults=default_cors_options) - + cors.add(app.router.add_route('POST', r'/create', create_or_update_filebrowser)) - return app, [] diff --git a/src/ai/backend/manager/models/filebrowser.py b/src/ai/backend/manager/models/filebrowser.py index 98cd59b9b..dc14edca2 100644 --- a/src/ai/backend/manager/models/filebrowser.py +++ b/src/ai/backend/manager/models/filebrowser.py @@ -94,34 +94,6 @@ def split_host(vfolder_host: str) -> Tuple[str, str]: print("Split host result: ", proxy_name, volume_name) return proxy_name, volume_name - async def get_all_volumes(self) -> Iterable[Tuple[str, VolumeInfo]]: - try: - # per-asyncio-task cache - return _ctx_volumes_cache.get() - except LookupError: - pass - fetch_aws = [] - - async def _fetch( - proxy_name: str, - proxy_info: StorageProxyInfo, - ) -> Iterable[Tuple[str, VolumeInfo]]: - print("Proxy address: ", proxy_info.manager_api_url / 'volumes') - async with proxy_info.session.request( - 'GET', proxy_info.manager_api_url / 'volumes', - raise_for_status=True, - headers={AUTH_TOKEN_HDR: proxy_info.secret}, - ) as resp: - reply = await resp.json() - print("volume_Data: ", reply) - return ((proxy_name, volume_data) for volume_data in reply['volumes']) - - for proxy_name, proxy_info in self._proxies.items(): - fetch_aws.append(_fetch(proxy_name, proxy_info)) - results = [*itertools.chain(*await asyncio.gather(*fetch_aws))] - _ctx_volumes_cache.set(results) - return results - async def get_mount_path(self, vfolder_host: str, vfolder_id: UUID) -> str: async with self.request( vfolder_host, 'GET', 'folder/mount', @@ -146,6 +118,7 @@ async def request( proxy_name, _ = self.split_host(vfolder_host_or_proxy_name) try: proxy_info = self._proxies[proxy_name] + print("proxy info ", proxy_info) except KeyError: raise InvalidArgument('There is no such storage proxy', proxy_name) headers = kwargs.pop('headers', {}) From 6ffd6fc1085001a931577d8d83f289a06708477c Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Fri, 21 Jan 2022 18:33:30 +0900 Subject: [PATCH 03/15] feat: add support for File Browser on manager side --- src/ai/backend/manager/api/filebrowser.py | 152 ++++++++----------- src/ai/backend/manager/models/filebrowser.py | 136 ----------------- src/ai/backend/manager/models/storage.py | 6 - 3 files changed, 66 insertions(+), 228 deletions(-) delete mode 100644 src/ai/backend/manager/models/filebrowser.py diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py index 2f5364108..87930532a 100644 --- a/src/ai/backend/manager/api/filebrowser.py +++ b/src/ai/backend/manager/api/filebrowser.py @@ -1,75 +1,26 @@ -import asyncio -from datetime import datetime -import functools -import json import logging -import math -from pathlib import Path -import stat -from typing import ( - Any, - Awaitable, - Callable, - Dict, - List, - Mapping, - MutableMapping, - Sequence, - Set, - TYPE_CHECKING, - Tuple, - Iterable -) -import uuid +from typing import Any, Iterable, Mapping, Tuple import aiohttp -from aiohttp import web import aiohttp_cors import sqlalchemy as sa import trafaret as t - -from ai.backend.common import validators as tx from ai.backend.common.logging import BraceStyleAdapter +from aiohttp import web -from ..models import ( - agents, - kernels, - users, groups, keypairs, - vfolders, vfolder_invitations, vfolder_permissions, - AgentStatus, - KernelStatus, - VFolderInvitationState, - VFolderOwnershipType, - VFolderPermission, - VFolderPermissionValidator, - VFolderUsageMode, - UserRole, - query_accessible_vfolders, - query_owned_dotfiles, - get_allowed_vfolder_hosts_by_group, - get_allowed_vfolder_hosts_by_user, - verify_vfolder_name, -) -from .auth import admin_required, auth_required, superadmin_required -from .exceptions import ( - VFolderCreationFailed, VFolderNotFound, VFolderAlreadyExists, VFolderOperationFailed, - GenericForbidden, GenericNotFound, InvalidAPIParameters, ServerMisconfiguredError, - BackendAgentError, InternalServerError, GroupNotFound, -) -from .manager import ( - READ_ALLOWED, ALL_ALLOWED, - server_status_required, -) -from .resource import get_watcher_info -from .utils import check_api_params - +from ..exceptions import InvalidArgument +from ..models import vfolders +from .auth import auth_required from .context import RootContext +from .manager import READ_ALLOWED, server_status_required from .types import CORSOptions, WebMiddleware +from .utils import check_api_params log = BraceStyleAdapter(logging.getLogger(__name__)) VFolderRow = Mapping[str, Any] + async def get_vfid(root_ctx: RootContext, name: str) -> str: async with root_ctx.db.begin() as conn: query = ( @@ -79,59 +30,88 @@ async def get_vfid(root_ctx: RootContext, name: str) -> str: ) folder_id = await conn.scalar(query) - query = (sa.delete(vfolders).where(vfolders.c.id == folder_id)) + query = sa.delete(vfolders).where(vfolders.c.id == folder_id) return folder_id.hex +async def get_volume(root_ctx: RootContext, vfid: str) -> str: + async with root_ctx.db.begin() as conn: + query = ( + sa.select([vfolders.c.host]) + .select_from(vfolders) + .where(vfolders.c.id == vfid) + ) + host = await conn.scalar(query) + return host + + @auth_required @server_status_required(READ_ALLOWED) -async def create_or_update_filebrowser(request: web.Request) -> web.Response: - json = await request.json() - print(request) - - root_ctx: RootContext = request.app['_root.context'] - access_key = request['keypair']['access_key'] - print(access_key) - +@check_api_params( + t.Dict( + { + t.Key("vfolders"): t.List(t.String), + }, + ), +) +async def create_or_update_filebrowser( + request: web.Request, params: Any, +) -> web.Response: + + root_ctx: RootContext = request.app["_root.context"] + vfolders = [] - for vfolder_name in json['vfolders']: + for vfolder_name in params["vfolders"]: vfolders.append( - { - "name":vfolder_name, - "vfid": await get_vfid(root_ctx, vfolder_name) - - } + {"name": vfolder_name, "vfid": await get_vfid(root_ctx, vfolder_name)}, ) + host = await get_volume(root_ctx, await get_vfid(root_ctx, vfolder_name)) + + proxy_name, _ = root_ctx.storage_manager.split_host(host) + + try: + proxy_info = root_ctx.storage_manager._proxies[proxy_name] + except KeyError: + raise InvalidArgument("There is no such storage proxy", proxy_name) - print("Vfolder name with vfid", vfolders) + headers = {} + headers["X-BackendAI-Storage-Auth-Token"] = proxy_info.secret - try: - async with root_ctx.storage_manager.request('local:volume1', 'POST', 'browser/create', - - json = { "vfolders": vfolders }, - raise_for_status=True, - ): - pass + try: + async with proxy_info.session.request( + "POST", + proxy_info.manager_api_url / "browser/create", + headers=headers, + json={"vfolders": vfolders}, + ) as client_resp: + return web.json_response(await client_resp.json()) except aiohttp.ClientResponseError: raise - return request + async def init(app: web.Application) -> None: pass + async def shutdown(app: web.Application) -> None: pass -def create_app(default_cors_options: CORSOptions) -> Tuple[web.Application, Iterable[WebMiddleware]]: + +def create_app( + default_cors_options: CORSOptions, +) -> Tuple[web.Application, Iterable[WebMiddleware]]: app = web.Application() - app['prefix'] = 'browser' - print("FileBrowser Server started...") - app['api_versions'] = (2, 3, ) + app["prefix"] = "browser" + app["api_versions"] = ( + 2, + 3, + 4, + ) app.on_startup.append(init) app.on_shutdown.append(shutdown) cors = aiohttp_cors.setup(app, defaults=default_cors_options) - cors.add(app.router.add_route('POST', r'/create', create_or_update_filebrowser)) + cors.add(app.router.add_route("POST", r"/create", create_or_update_filebrowser)) return app, [] diff --git a/src/ai/backend/manager/models/filebrowser.py b/src/ai/backend/manager/models/filebrowser.py deleted file mode 100644 index dc14edca2..000000000 --- a/src/ai/backend/manager/models/filebrowser.py +++ /dev/null @@ -1,136 +0,0 @@ -from __future__ import annotations - -import asyncio -from contextlib import asynccontextmanager as actxmgr -from contextvars import ContextVar -import itertools -import logging -from typing import ( - Any, - AsyncIterator, - Final, - Iterable, - List, - Mapping, - Sequence, - Tuple, - TypedDict, - TYPE_CHECKING, -) -from uuid import UUID - -from ai.backend.common.logging import BraceStyleAdapter -from ai.backend.common.types import HardwareMetadata - -import aiohttp -import attr -import graphene -import yarl - -from .base import ( - Item, PaginatedList, -) -from ..exceptions import InvalidArgument -if TYPE_CHECKING: - from .gql import GraphQueryContext - -__all__ = ( - 'StorageProxyInfo', - 'VolumeInfo', - 'BrowserSessionManager', - 'StorageVolume', -) - -log = BraceStyleAdapter(logging.getLogger(__name__)) - - -@attr.s(auto_attribs=True, slots=True, frozen=True) -class StorageProxyInfo: - session: aiohttp.ClientSession - secret: str - client_api_url: yarl.URL - manager_api_url: yarl.URL - - -AUTH_TOKEN_HDR: Final = 'X-BackendAI-Storage-Auth-Token' - -_ctx_volumes_cache: ContextVar[List[Tuple[str, VolumeInfo]]] = ContextVar('_ctx_volumes') - - -class VolumeInfo(TypedDict): - name: str - backend: str - path: str - fsprefix: str - capabilities: List[str] - - -class BrowserSessionManager: - - _proxies: Mapping[str, StorageProxyInfo] - - def __init__(self, storage_config: Mapping[str, Any]) -> None: - self.config = storage_config - self._proxies = {} - for proxy_name, proxy_config in self.config['proxies'].items(): - connector = aiohttp.TCPConnector(ssl=proxy_config['ssl_verify']) - session = aiohttp.ClientSession(connector=connector) - self._proxies[proxy_name] = StorageProxyInfo( - session=session, - secret=proxy_config['secret'], - client_api_url=yarl.URL(proxy_config['client_api']), - manager_api_url=yarl.URL(proxy_config['manager_api']), - ) - - async def aclose(self) -> None: - close_aws = [] - for proxy_info in self._proxies.values(): - close_aws.append(proxy_info.session.close()) - await asyncio.gather(*close_aws, return_exceptions=True) - - @staticmethod - def split_host(vfolder_host: str) -> Tuple[str, str]: - proxy_name, _, volume_name = vfolder_host.partition(':') - print("Split host result: ", proxy_name, volume_name) - return proxy_name, volume_name - - async def get_mount_path(self, vfolder_host: str, vfolder_id: UUID) -> str: - async with self.request( - vfolder_host, 'GET', 'folder/mount', - json={ - 'volume': self.split_host(vfolder_host)[1], - 'vfid': str(vfolder_id), - }, - ) as (_, resp): - reply = await resp.json() - return reply['path'] - - @actxmgr - async def request( - self, - vfolder_host_or_proxy_name: str, - method: str, - request_relpath: str, - /, - *args, - **kwargs, - ) -> AsyncIterator[Tuple[yarl.URL, aiohttp.ClientResponse]]: - proxy_name, _ = self.split_host(vfolder_host_or_proxy_name) - try: - proxy_info = self._proxies[proxy_name] - print("proxy info ", proxy_info) - except KeyError: - raise InvalidArgument('There is no such storage proxy', proxy_name) - headers = kwargs.pop('headers', {}) - headers[AUTH_TOKEN_HDR] = proxy_info.secret - print("sp requests: ", method, proxy_info.manager_api_url, request_relpath, headers) - print("client api: ", proxy_info.client_api_url) - print('args ', args, kwargs) - async with proxy_info.session.request( - method, proxy_info.manager_api_url / request_relpath, - *args, - headers=headers, - **kwargs, - ) as client_resp: - print("client response: ", proxy_info.client_api_url, client_resp) - yield proxy_info.client_api_url, client_resp diff --git a/src/ai/backend/manager/models/storage.py b/src/ai/backend/manager/models/storage.py index 0d59c87da..c44c37854 100644 --- a/src/ai/backend/manager/models/storage.py +++ b/src/ai/backend/manager/models/storage.py @@ -106,14 +106,12 @@ async def _fetch( proxy_name: str, proxy_info: StorageProxyInfo, ) -> Iterable[Tuple[str, VolumeInfo]]: - print("Debug. Proxy address: ", proxy_info.manager_api_url / 'volumes') async with proxy_info.session.request( 'GET', proxy_info.manager_api_url / 'volumes', raise_for_status=True, headers={AUTH_TOKEN_HDR: proxy_info.secret}, ) as resp: reply = await resp.json() - print("Debug. volume_Data: ", reply) return ((proxy_name, volume_data) for volume_data in reply['volumes']) for proxy_name, proxy_info in self._proxies.items(): @@ -150,16 +148,12 @@ async def request( raise InvalidArgument('There is no such storage proxy', proxy_name) headers = kwargs.pop('headers', {}) headers[AUTH_TOKEN_HDR] = proxy_info.secret - print("Debug. sp requests: ", method, proxy_info.manager_api_url, request_relpath, headers) - print("client api: ", proxy_info.client_api_url) - print('args ', args, kwargs) async with proxy_info.session.request( method, proxy_info.manager_api_url / request_relpath, *args, headers=headers, **kwargs, ) as client_resp: - print("Debug. client response: ", proxy_info.client_api_url, client_resp) yield proxy_info.client_api_url, client_resp From 8b5132fa038ead348cd57689a3ea1098a1cf7725 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Fri, 21 Jan 2022 20:39:25 +0900 Subject: [PATCH 04/15] fix: add 510.feature news fragment --- changes/510.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/510.feature diff --git a/changes/510.feature b/changes/510.feature new file mode 100644 index 000000000..324a1271b --- /dev/null +++ b/changes/510.feature @@ -0,0 +1 @@ +Implementation of Manager Facing API for File Browser in Storage Proxy. From 7bea3f187bfd2c1c90084085b43527dea941d803 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Fri, 21 Jan 2022 20:43:31 +0900 Subject: [PATCH 05/15] fix: update news fragment --- changes/{510.feature => 521.feature} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changes/{510.feature => 521.feature} (100%) diff --git a/changes/510.feature b/changes/521.feature similarity index 100% rename from changes/510.feature rename to changes/521.feature From 7bd094c264754d86a24f0a4f6262cf981c393279 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Fri, 21 Jan 2022 20:51:07 +0900 Subject: [PATCH 06/15] fix: accidental file edits --- src/ai/backend/manager/api/vfolder.py | 3 --- src/ai/backend/manager/models/storage.py | 1 - src/ai/backend/manager/server.py | 8 -------- 3 files changed, 12 deletions(-) diff --git a/src/ai/backend/manager/api/vfolder.py b/src/ai/backend/manager/api/vfolder.py index 1ca24189c..6a350e173 100644 --- a/src/ai/backend/manager/api/vfolder.py +++ b/src/ai/backend/manager/api/vfolder.py @@ -321,8 +321,6 @@ async def create(request: web.Request, params: Any) -> web.Response: folder_id = uuid.uuid4() if not unmanaged_path: # Try to create actual only if vFolder is managed one - print("debug ", folder_host, root_ctx.storage_manager.split_host(folder_host)) - async with root_ctx.storage_manager.request( folder_host, 'POST', 'folder/create', json={ @@ -889,7 +887,6 @@ async def update_vfolder_options(request: web.Request, params: Any, row: VFolder })) async def mkdir(request: web.Request, params: Any, row: VFolderRow) -> web.Response: root_ctx: RootContext = request.app['_root.context'] - root_ctx.storage_manager.request folder_name = request.match_info['name'] access_key = request['keypair']['access_key'] log.info('VFOLDER.MKDIR (ak:{}, vf:{}, path:{})', access_key, folder_name, params['path']) diff --git a/src/ai/backend/manager/models/storage.py b/src/ai/backend/manager/models/storage.py index c44c37854..75edaf2a8 100644 --- a/src/ai/backend/manager/models/storage.py +++ b/src/ai/backend/manager/models/storage.py @@ -91,7 +91,6 @@ async def aclose(self) -> None: @staticmethod def split_host(vfolder_host: str) -> Tuple[str, str]: proxy_name, _, volume_name = vfolder_host.partition(':') - print("Debug. Split host result: ", proxy_name, volume_name) return proxy_name, volume_name async def get_all_volumes(self) -> Iterable[Tuple[str, VolumeInfo]]: diff --git a/src/ai/backend/manager/server.py b/src/ai/backend/manager/server.py index 83d11e975..aa16a67bb 100644 --- a/src/ai/backend/manager/server.py +++ b/src/ai/backend/manager/server.py @@ -349,13 +349,6 @@ async def storage_manager_ctx(root_ctx: RootContext) -> AsyncIterator[None]: yield await root_ctx.storage_manager.aclose() -@actxmgr -async def filebrowser_ctx(root_ctx: RootContext) -> AsyncIterator[None]: - raw_vol_config = await root_ctx.shared_config.etcd.get_prefix('volumes') - config = volume_config_iv.check(raw_vol_config) - root_ctx.browser_manager = BrowserSessionManager(config) - yield - await root_ctx.browser_manager.aclose() @actxmgr async def hook_plugin_ctx(root_ctx: RootContext) -> AsyncIterator[None]: @@ -587,7 +580,6 @@ async def server_main( '.domainconfig', '.groupconfig', '.logs', - '.filebrowser', ] root_app = build_root_app(pidx, _args[0], subapp_pkgs=subapp_pkgs) root_ctx: RootContext = root_app['_root.context'] From 2301ffd867aa2774aa8d0230538e5a4110054a49 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Sun, 23 Jan 2022 20:54:23 +0900 Subject: [PATCH 07/15] fix: add browser destroy command --- src/ai/backend/manager/api/filebrowser.py | 41 ++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py index 87930532a..acbce1689 100644 --- a/src/ai/backend/manager/api/filebrowser.py +++ b/src/ai/backend/manager/api/filebrowser.py @@ -56,7 +56,8 @@ async def get_volume(root_ctx: RootContext, vfid: str) -> str: ), ) async def create_or_update_filebrowser( - request: web.Request, params: Any, + request: web.Request, + params: Any, ) -> web.Response: root_ctx: RootContext = request.app["_root.context"] @@ -91,6 +92,42 @@ async def create_or_update_filebrowser( raise +@auth_required +@server_status_required(READ_ALLOWED) +async def destroy_filebrowser( + request: web.Request, + params: Any, +) -> web.Response: + + root_ctx: RootContext = request.app["_root.context"] + + vfolders = [] + for vfolder_name in params["vfolders"]: + vfolders.append( + {"name": vfolder_name, "vfid": await get_vfid(root_ctx, vfolder_name)}, + ) + + host = await get_volume(root_ctx, await get_vfid(root_ctx, vfolder_name)) + + proxy_name, _ = root_ctx.storage_manager.split_host(host) + + try: + proxy_info = root_ctx.storage_manager._proxies[proxy_name] + except KeyError: + raise InvalidArgument("There is no such storage proxy", proxy_name) + + headers = {} + headers["X-BackendAI-Storage-Auth-Token"] = proxy_info.secret + + try: + async with proxy_info.session.request( + "POST", proxy_info.manager_api_url / "browser/destroy", headers=headers, + ) as client_resp: + return web.json_response(await client_resp.json()) + except aiohttp.ClientResponseError: + raise + + async def init(app: web.Application) -> None: pass @@ -114,4 +151,6 @@ def create_app( cors = aiohttp_cors.setup(app, defaults=default_cors_options) cors.add(app.router.add_route("POST", r"/create", create_or_update_filebrowser)) + cors.add(app.router.add_route("POST", r"/destroy", destroy_filebrowser)) + return app, [] From c2914a67132f0f6f8e935461da49626ce00d86f7 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Mon, 24 Jan 2022 10:22:29 +0900 Subject: [PATCH 08/15] fix: filebrowser launch and add destory function --- src/ai/backend/manager/api/filebrowser.py | 20 +- src/ai/backend/manager/server.py | 524 ++++++++++++---------- 2 files changed, 297 insertions(+), 247 deletions(-) diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py index acbce1689..fdcd857bd 100644 --- a/src/ai/backend/manager/api/filebrowser.py +++ b/src/ai/backend/manager/api/filebrowser.py @@ -96,20 +96,10 @@ async def create_or_update_filebrowser( @server_status_required(READ_ALLOWED) async def destroy_filebrowser( request: web.Request, - params: Any, ) -> web.Response: - root_ctx: RootContext = request.app["_root.context"] - vfolders = [] - for vfolder_name in params["vfolders"]: - vfolders.append( - {"name": vfolder_name, "vfid": await get_vfid(root_ctx, vfolder_name)}, - ) - - host = await get_volume(root_ctx, await get_vfid(root_ctx, vfolder_name)) - - proxy_name, _ = root_ctx.storage_manager.split_host(host) + proxy_name, _ = root_ctx.storage_manager.split_host("local: volume1") try: proxy_info = root_ctx.storage_manager._proxies[proxy_name] @@ -121,8 +111,11 @@ async def destroy_filebrowser( try: async with proxy_info.session.request( - "POST", proxy_info.manager_api_url / "browser/destroy", headers=headers, + "DELETE", + proxy_info.manager_api_url / "browser/destroy", + headers=headers, ) as client_resp: + print("real url ", client_resp.real_url()) return web.json_response(await client_resp.json()) except aiohttp.ClientResponseError: raise @@ -149,8 +142,7 @@ def create_app( app.on_startup.append(init) app.on_shutdown.append(shutdown) cors = aiohttp_cors.setup(app, defaults=default_cors_options) - cors.add(app.router.add_route("POST", r"/create", create_or_update_filebrowser)) - cors.add(app.router.add_route("POST", r"/destroy", destroy_filebrowser)) + cors.add(app.router.add_route("DELETE", "/destroy", destroy_filebrowser)) return app, [] diff --git a/src/ai/backend/manager/server.py b/src/ai/backend/manager/server.py index aa16a67bb..ce97137da 100644 --- a/src/ai/backend/manager/server.py +++ b/src/ai/backend/manager/server.py @@ -1,16 +1,19 @@ from __future__ import annotations import asyncio -from contextlib import asynccontextmanager as actxmgr, closing -from datetime import datetime import functools +import grp import importlib import logging import os -import pwd, grp +import pwd import ssl import sys import traceback +from contextlib import asynccontextmanager as actxmgr +from contextlib import closing +from datetime import datetime +from pathlib import Path from typing import ( Any, AsyncIterator, @@ -23,50 +26,41 @@ cast, ) -from aiohttp import web import aiohttp_cors +import aiomonitor import aiotools import click -from pathlib import Path -from setproctitle import setproctitle -import aiomonitor - from ai.backend.common import redis from ai.backend.common.cli import LazyGroup from ai.backend.common.events import EventDispatcher, EventProducer -from ai.backend.common.utils import env_info -from ai.backend.common.logging import Logger, BraceStyleAdapter -from ai.backend.common.plugin.hook import HookPluginContext, ALL_COMPLETED, PASSED +from ai.backend.common.logging import BraceStyleAdapter, Logger +from ai.backend.common.plugin.hook import ALL_COMPLETED, PASSED, HookPluginContext from ai.backend.common.plugin.monitor import ( + INCREMENT, ErrorPluginContext, StatsPluginContext, - INCREMENT, ) +from ai.backend.common.utils import env_info +from aiohttp import web +from setproctitle import setproctitle from . import __version__ from .api.context import RootContext from .api.exceptions import ( BackendError, - MethodNotAllowed, - GenericNotFound, GenericBadRequest, + GenericNotFound, InternalServerError, InvalidAPIParameters, + MethodNotAllowed, ) from .api.manager import ManagerStatus -from .api.types import ( - AppCreator, - WebRequestHandler, WebMiddleware, - CleanupContext, -) +from .api.types import AppCreator, CleanupContext, WebMiddleware, WebRequestHandler from .background import BackgroundTaskManager -from .config import ( - LocalConfig, - SharedConfig, - load as load_config, - volume_config_iv, -) -from .defs import REDIS_STAT_DB, REDIS_LIVE_DB, REDIS_IMAGE_DB, REDIS_STREAM_DB +from .config import LocalConfig, SharedConfig +from .config import load as load_config +from .config import volume_config_iv +from .defs import REDIS_IMAGE_DB, REDIS_LIVE_DB, REDIS_STAT_DB, REDIS_STREAM_DB from .exceptions import InvalidArgument from .idle import create_idle_checkers from .models.storage import StorageSessionManager @@ -75,69 +69,65 @@ from .registry import AgentRegistry from .scheduler.dispatcher import SchedulerDispatcher -VALID_VERSIONS: Final = frozenset([ - # 'v1.20160915', # deprecated - # 'v2.20170315', # deprecated - # 'v3.20170615', # deprecated - - # authentication changed not to use request bodies - 'v4.20181215', - - # added & enabled streaming-execute API - 'v4.20190115', - - # changed resource/image formats - 'v4.20190315', - - # added user mgmt and ID/password authentication - # added domain/group/scaling-group - # added domain/group/scaling-group ref. fields to user/keypair/vfolder objects - 'v4.20190615', - - # added mount_map parameter when creating kernel - # changed GraphQL query structures for multi-container bundled sessions - 'v5.20191215', - - # rewrote vfolder upload/download APIs to migrate to external storage proxies - 'v6.20200815', - - # added standard-compliant /admin/gql endpoint - # deprecated /admin/graphql endpoint (still present for backward compatibility) - # added "groups_by_name" GQL query - # added "filter" and "order" arg to all paginated GQL queries with their own expression mini-langs - # removed "order_key" and "order_asc" arguments from all paginated GQL queries (never used!) - 'v6.20210815', -]) +VALID_VERSIONS: Final = frozenset( + [ + # 'v1.20160915', # deprecated + # 'v2.20170315', # deprecated + # 'v3.20170615', # deprecated + # authentication changed not to use request bodies + "v4.20181215", + # added & enabled streaming-execute API + "v4.20190115", + # changed resource/image formats + "v4.20190315", + # added user mgmt and ID/password authentication + # added domain/group/scaling-group + # added domain/group/scaling-group ref. fields to user/keypair/vfolder objects + "v4.20190615", + # added mount_map parameter when creating kernel + # changed GraphQL query structures for multi-container bundled sessions + "v5.20191215", + # rewrote vfolder upload/download APIs to migrate to external storage proxies + "v6.20200815", + # added standard-compliant /admin/gql endpoint + # deprecated /admin/graphql endpoint (still present for backward compatibility) + # added "groups_by_name" GQL query + # added "filter" and "order" arg to all paginated GQL queries + # with their own expression mini-langs + # removed "order_key" and "order_asc" arguments from all paginated GQL queries (never used!) + "v6.20210815", + ], +) LATEST_REV_DATES: Final = { - 1: '20160915', - 2: '20170915', - 3: '20181215', - 4: '20190615', - 5: '20191215', - 6: '20210815', + 1: "20160915", + 2: "20170915", + 3: "20181215", + 4: "20190615", + 5: "20191215", + 6: "20210815", } -LATEST_API_VERSION: Final = 'v6.20210815' +LATEST_API_VERSION: Final = "v6.20210815" log = BraceStyleAdapter(logging.getLogger(__name__)) PUBLIC_INTERFACES: Final = [ - 'pidx', - 'background_task_manager', - 'local_config', - 'shared_config', - 'db', - 'registry', - 'redis_live', - 'redis_stat', - 'redis_image', - 'redis_stream', - 'event_dispatcher', - 'event_producer', - 'idle_checkers', - 'storage_manager', - 'stats_monitor', - 'error_monitor', - 'hook_plugin_ctx', + "pidx", + "background_task_manager", + "local_config", + "shared_config", + "db", + "registry", + "redis_live", + "redis_stat", + "redis_image", + "redis_stream", + "event_dispatcher", + "event_producer", + "idle_checkers", + "storage_manager", + "stats_monitor", + "error_monitor", + "hook_plugin_ctx", ] public_interface_objs: MutableMapping[str, Any] = {} @@ -147,98 +137,115 @@ async def hello(request: web.Request) -> web.Response: """ Returns the API version number. """ - return web.json_response({ - 'version': LATEST_API_VERSION, - 'manager': __version__, - }) + return web.json_response( + { + "version": LATEST_API_VERSION, + "manager": __version__, + }, + ) async def on_prepare(request: web.Request, response: web.StreamResponse) -> None: - response.headers['Server'] = 'BackendAI' + response.headers["Server"] = "BackendAI" @web.middleware -async def api_middleware(request: web.Request, - handler: WebRequestHandler) -> web.StreamResponse: +async def api_middleware( + request: web.Request, + handler: WebRequestHandler, +) -> web.StreamResponse: _handler = handler - method_override = request.headers.get('X-Method-Override', None) + method_override = request.headers.get("X-Method-Override", None) if method_override: request = request.clone(method=method_override) new_match_info = await request.app.router.resolve(request) if new_match_info is None: - raise InternalServerError('No matching method handler found') + raise InternalServerError("No matching method handler found") _handler = new_match_info.handler request._match_info = new_match_info # type: ignore # this is a hack ex = request.match_info.http_exception if ex is not None: # handled by exception_middleware raise ex - new_api_version = request.headers.get('X-BackendAI-Version') - legacy_api_version = request.headers.get('X-Sorna-Version') + new_api_version = request.headers.get("X-BackendAI-Version") + legacy_api_version = request.headers.get("X-Sorna-Version") api_version = new_api_version or legacy_api_version try: if api_version is None: - path_major_version = int(request.match_info.get('version', 5)) + path_major_version = int(request.match_info.get("version", 5)) revision_date = LATEST_REV_DATES[path_major_version] - request['api_version'] = (path_major_version, revision_date) + request["api_version"] = (path_major_version, revision_date) elif api_version in VALID_VERSIONS: - hdr_major_version, revision_date = api_version.split('.', maxsplit=1) - request['api_version'] = (int(hdr_major_version[1:]), revision_date) + hdr_major_version, revision_date = api_version.split(".", maxsplit=1) + request["api_version"] = (int(hdr_major_version[1:]), revision_date) else: - return GenericBadRequest('Unsupported API version.') + return GenericBadRequest("Unsupported API version.") except (ValueError, KeyError): - return GenericBadRequest('Unsupported API version.') - resp = (await _handler(request)) + return GenericBadRequest("Unsupported API version.") + resp = await _handler(request) return resp @web.middleware -async def exception_middleware(request: web.Request, - handler: WebRequestHandler) -> web.StreamResponse: - root_ctx: RootContext = request.app['_root.context'] +async def exception_middleware( + request: web.Request, + handler: WebRequestHandler, +) -> web.StreamResponse: + root_ctx: RootContext = request.app["_root.context"] error_monitor = root_ctx.error_monitor stats_monitor = root_ctx.stats_monitor try: - await stats_monitor.report_metric(INCREMENT, 'ai.backend.manager.api.requests') - resp = (await handler(request)) + await stats_monitor.report_metric(INCREMENT, "ai.backend.manager.api.requests") + resp = await handler(request) except InvalidArgument as ex: if len(ex.args) > 1: - raise InvalidAPIParameters(f"{ex.args[0]}: {', '.join(map(str, ex.args[1:]))}") + raise InvalidAPIParameters( + f"{ex.args[0]}: {', '.join(map(str, ex.args[1:]))}", + ) elif len(ex.args) == 1: raise InvalidAPIParameters(ex.args[0]) else: raise InvalidAPIParameters() except BackendError as ex: if ex.status_code == 500: - log.warning('Internal server error raised inside handlers') + log.warning("Internal server error raised inside handlers") await error_monitor.capture_exception() - await stats_monitor.report_metric(INCREMENT, 'ai.backend.manager.api.failures') - await stats_monitor.report_metric(INCREMENT, f'ai.backend.manager.api.status.{ex.status_code}') + await stats_monitor.report_metric(INCREMENT, "ai.backend.manager.api.failures") + await stats_monitor.report_metric( + INCREMENT, + f"ai.backend.manager.api.status.{ex.status_code}", + ) raise except web.HTTPException as ex: - await stats_monitor.report_metric(INCREMENT, 'ai.backend.manager.api.failures') - await stats_monitor.report_metric(INCREMENT, f'ai.backend.manager.api.status.{ex.status_code}') + await stats_monitor.report_metric(INCREMENT, "ai.backend.manager.api.failures") + await stats_monitor.report_metric( + INCREMENT, + f"ai.backend.manager.api.status.{ex.status_code}", + ) if ex.status_code == 404: raise GenericNotFound(f"Unknown URL path: {request.path}") if ex.status_code == 405: concrete_ex = cast(web.HTTPMethodNotAllowed, ex) raise MethodNotAllowed(concrete_ex.method, concrete_ex.allowed_methods) - log.warning('Bad request: {0!r}', ex) + log.warning("Bad request: {0!r}", ex) raise GenericBadRequest except asyncio.CancelledError as e: # The server is closing or the client has disconnected in the middle of # request. Atomic requests are still executed to their ends. - log.debug('Request cancelled ({0} {1})', request.method, request.rel_url) + log.debug("Request cancelled ({0} {1})", request.method, request.rel_url) raise e except Exception as e: await error_monitor.capture_exception() - log.exception('Uncaught exception in HTTP request handlers {0!r}', e) - if root_ctx.local_config['debug']['enabled']: + log.exception("Uncaught exception in HTTP request handlers {0!r}", e) + if root_ctx.local_config["debug"]["enabled"]: raise InternalServerError(traceback.format_exc()) else: raise InternalServerError() else: - await stats_monitor.report_metric(INCREMENT, f'ai.backend.manager.api.status.{resp.status}') + await stats_monitor.report_metric( + INCREMENT, + f"ai.backend.manager.api.status.{resp.status}", + ) return resp @@ -246,10 +253,10 @@ async def exception_middleware(request: web.Request, async def shared_config_ctx(root_ctx: RootContext) -> AsyncIterator[None]: # populate public interfaces root_ctx.shared_config = SharedConfig( - root_ctx.local_config['etcd']['addr'], - root_ctx.local_config['etcd']['user'], - root_ctx.local_config['etcd']['password'], - root_ctx.local_config['etcd']['namespace'], + root_ctx.local_config["etcd"]["addr"], + root_ctx.local_config["etcd"]["user"], + root_ctx.local_config["etcd"]["password"], + root_ctx.local_config["etcd"]["namespace"], ) await root_ctx.shared_config.reload() yield @@ -258,14 +265,16 @@ async def shared_config_ctx(root_ctx: RootContext) -> AsyncIterator[None]: @actxmgr async def webapp_plugin_ctx(root_app: web.Application) -> AsyncIterator[None]: - root_ctx: RootContext = root_app['_root.context'] + root_ctx: RootContext = root_app["_root.context"] plugin_ctx = WebappPluginContext(root_ctx.shared_config.etcd, root_ctx.local_config) await plugin_ctx.init() root_ctx.webapp_plugin_ctx = plugin_ctx for plugin_name, plugin_instance in plugin_ctx.plugins.items(): if root_ctx.pidx == 0: - log.info('Loading webapp plugin: {0}', plugin_name) - subapp, global_middlewares = await plugin_instance.create_app(root_ctx.cors_options) + log.info("Loading webapp plugin: {0}", plugin_name) + subapp, global_middlewares = await plugin_instance.create_app( + root_ctx.cors_options, + ) _init_subapp(plugin_name, root_app, subapp, global_middlewares) yield await plugin_ctx.cleanup() @@ -275,26 +284,37 @@ async def webapp_plugin_ctx(root_app: web.Application) -> AsyncIterator[None]: async def manager_status_ctx(root_ctx: RootContext) -> AsyncIterator[None]: if root_ctx.pidx == 0: mgr_status = await root_ctx.shared_config.get_manager_status() - if mgr_status is None or mgr_status not in (ManagerStatus.RUNNING, ManagerStatus.FROZEN): + if mgr_status is None or mgr_status not in ( + ManagerStatus.RUNNING, + ManagerStatus.FROZEN, + ): # legacy transition: we now have only RUNNING or FROZEN for HA setup. await root_ctx.shared_config.update_manager_status(ManagerStatus.RUNNING) mgr_status = ManagerStatus.RUNNING - log.info('Manager status: {}', mgr_status) - tz = root_ctx.shared_config['system']['timezone'] - log.info('Configured timezone: {}', tz.tzname(datetime.now())) + log.info("Manager status: {}", mgr_status) + tz = root_ctx.shared_config["system"]["timezone"] + log.info("Configured timezone: {}", tz.tzname(datetime.now())) yield @actxmgr async def redis_ctx(root_ctx: RootContext) -> AsyncIterator[None]: - root_ctx.redis_live = redis.get_redis_object(root_ctx.shared_config.data['redis'], db=REDIS_LIVE_DB) - root_ctx.redis_stat = redis.get_redis_object(root_ctx.shared_config.data['redis'], db=REDIS_STAT_DB) + root_ctx.redis_live = redis.get_redis_object( + root_ctx.shared_config.data["redis"], + db=REDIS_LIVE_DB, + ) + root_ctx.redis_stat = redis.get_redis_object( + root_ctx.shared_config.data["redis"], + db=REDIS_STAT_DB, + ) root_ctx.redis_image = redis.get_redis_object( - root_ctx.shared_config.data['redis'], db=REDIS_IMAGE_DB, + root_ctx.shared_config.data["redis"], + db=REDIS_IMAGE_DB, ) root_ctx.redis_stream = redis.get_redis_object( - root_ctx.shared_config.data['redis'], db=REDIS_STREAM_DB, + root_ctx.shared_config.data["redis"], + db=REDIS_STREAM_DB, ) yield await root_ctx.redis_stream.close() @@ -314,14 +334,14 @@ async def database_ctx(root_ctx: RootContext) -> AsyncIterator[None]: async def event_dispatcher_ctx(root_ctx: RootContext) -> AsyncIterator[None]: root_ctx.event_producer = await EventProducer.new( - root_ctx.shared_config.data['redis'], + root_ctx.shared_config.data["redis"], db=REDIS_STREAM_DB, ) root_ctx.event_dispatcher = await EventDispatcher.new( - root_ctx.shared_config.data['redis'], + root_ctx.shared_config.data["redis"], db=REDIS_STREAM_DB, - log_events=root_ctx.local_config['debug']['log-events'], - node_id=root_ctx.local_config['manager']['id'], + log_events=root_ctx.local_config["debug"]["log-events"], + node_id=root_ctx.local_config["manager"]["id"], ) yield await root_ctx.event_dispatcher.close() @@ -343,7 +363,7 @@ async def idle_checker_ctx(root_ctx: RootContext) -> AsyncIterator[None]: @actxmgr async def storage_manager_ctx(root_ctx: RootContext) -> AsyncIterator[None]: - raw_vol_config = await root_ctx.shared_config.etcd.get_prefix('volumes') + raw_vol_config = await root_ctx.shared_config.etcd.get_prefix("volumes") config = volume_config_iv.check(raw_vol_config) root_ctx.storage_manager = StorageSessionManager(config) yield @@ -356,12 +376,12 @@ async def hook_plugin_ctx(root_ctx: RootContext) -> AsyncIterator[None]: root_ctx.hook_plugin_ctx = ctx await ctx.init() hook_result = await ctx.dispatch( - 'ACTIVATE_MANAGER', + "ACTIVATE_MANAGER", (), return_when=ALL_COMPLETED, ) if hook_result.status != PASSED: - raise RuntimeError('Could not activate the manager instance.') + raise RuntimeError("Could not activate the manager instance.") yield await ctx.cleanup() @@ -387,8 +407,10 @@ async def agent_registry_ctx(root_ctx: RootContext) -> AsyncIterator[None]: @actxmgr async def sched_dispatcher_ctx(root_ctx: RootContext) -> AsyncIterator[None]: sched_dispatcher = await SchedulerDispatcher.new( - root_ctx.local_config, root_ctx.shared_config, - root_ctx.event_dispatcher, root_ctx.event_producer, + root_ctx.local_config, + root_ctx.shared_config, + root_ctx.event_dispatcher, + root_ctx.event_producer, root_ctx.registry, ) yield @@ -399,7 +421,7 @@ async def sched_dispatcher_ctx(root_ctx: RootContext) -> AsyncIterator[None]: async def monitoring_ctx(root_ctx: RootContext) -> AsyncIterator[None]: ectx = ErrorPluginContext(root_ctx.shared_config.etcd, root_ctx.local_config) sctx = StatsPluginContext(root_ctx.shared_config.etcd, root_ctx.local_config) - await ectx.init(context={'_root.context': root_ctx}) + await ectx.init(context={"_root.context": root_ctx}) await sctx.init() root_ctx.error_monitor = ectx root_ctx.stats_monitor = sctx @@ -409,18 +431,19 @@ async def monitoring_ctx(root_ctx: RootContext) -> AsyncIterator[None]: class background_task_ctx: - def __init__(self, root_ctx: RootContext) -> None: self.root_ctx = root_ctx async def __aenter__(self) -> None: - self.root_ctx.background_task_manager = BackgroundTaskManager(self.root_ctx.event_producer) + self.root_ctx.background_task_manager = BackgroundTaskManager( + self.root_ctx.event_producer, + ) async def __aexit__(self, *exc_info) -> None: pass async def shutdown(self) -> None: - if hasattr(self.root_ctx, 'background_task_manager'): + if hasattr(self.root_ctx, "background_task_manager"): await self.root_ctx.background_task_manager.shutdown() @@ -429,18 +452,20 @@ def handle_loop_error( loop: asyncio.AbstractEventLoop, context: Mapping[str, Any], ) -> None: - exception = context.get('exception') - msg = context.get('message', '(empty message)') + exception = context.get("exception") + msg = context.get("message", "(empty message)") if exception is not None: if sys.exc_info()[0] is not None: - log.exception('Error inside event loop: {0}', msg) - if (error_monitor := getattr(root_ctx, 'error_monitor', None)) is not None: + log.exception("Error inside event loop: {0}", msg) + if (error_monitor := getattr(root_ctx, "error_monitor", None)) is not None: loop.create_task(error_monitor.capture_exception()) else: exc_info = (type(exception), exception, exception.__traceback__) - log.error('Error inside event loop: {0}', msg, exc_info=exc_info) - if (error_monitor := getattr(root_ctx, 'error_monitor', None)) is not None: - loop.create_task(error_monitor.capture_exception(exc_instance=exception)) + log.error("Error inside event loop: {0}", msg, exc_info=exc_info) + if (error_monitor := getattr(root_ctx, "error_monitor", None)) is not None: + loop.create_task( + error_monitor.capture_exception(exc_instance=exception), + ) def _init_subapp( @@ -454,51 +479,60 @@ def _init_subapp( async def _set_root_ctx(subapp: web.Application): # Allow subapp's access to the root app properties. # These are the public APIs exposed to plugins as well. - subapp['_root.context'] = root_app['_root.context'] + subapp["_root.context"] = root_app["_root.context"] # We must copy the public interface prior to all user-defined startup signal handlers. subapp.on_startup.insert(0, _set_root_ctx) - prefix = subapp.get('prefix', pkg_name.split('.')[-1].replace('_', '-')) - root_app.add_subapp('/' + prefix, subapp) + prefix = subapp.get("prefix", pkg_name.split(".")[-1].replace("_", "-")) + root_app.add_subapp("/" + prefix, subapp) root_app.middlewares.extend(global_middlewares) -def init_subapp(pkg_name: str, root_app: web.Application, create_subapp: AppCreator) -> None: - root_ctx: RootContext = root_app['_root.context'] +def init_subapp( + pkg_name: str, + root_app: web.Application, + create_subapp: AppCreator, +) -> None: + root_ctx: RootContext = root_app["_root.context"] subapp, global_middlewares = create_subapp(root_ctx.cors_options) _init_subapp(pkg_name, root_app, subapp, global_middlewares) def build_root_app( pidx: int, - local_config: LocalConfig, *, + local_config: LocalConfig, + *, cleanup_contexts: Sequence[CleanupContext] = None, subapp_pkgs: Sequence[str] = None, scheduler_opts: Mapping[str, Any] = None, ) -> web.Application: public_interface_objs.clear() - app = web.Application(middlewares=[ - exception_middleware, - api_middleware, - ]) + app = web.Application( + middlewares=[ + exception_middleware, + api_middleware, + ], + ) root_ctx = RootContext() global_exception_handler = functools.partial(handle_loop_error, root_ctx) loop = asyncio.get_running_loop() loop.set_exception_handler(global_exception_handler) - app['_root.context'] = root_ctx + app["_root.context"] = root_ctx root_ctx.local_config = local_config root_ctx.pidx = pidx root_ctx.cors_options = { - '*': aiohttp_cors.ResourceOptions( + "*": aiohttp_cors.ResourceOptions( allow_credentials=False, - expose_headers="*", allow_headers="*"), + expose_headers="*", + allow_headers="*", + ), } default_scheduler_opts = { - 'limit': 2048, - 'close_timeout': 30, - 'exception_handler': global_exception_handler, + "limit": 2048, + "close_timeout": 30, + "exception_handler": global_exception_handler, } - app['scheduler_opts'] = { + app["scheduler_opts"] = { **default_scheduler_opts, **(scheduler_opts if scheduler_opts is not None else {}), } @@ -519,26 +553,33 @@ def build_root_app( background_task_ctx, ] - async def _cleanup_context_wrapper(cctx, app: web.Application) -> AsyncIterator[None]: + async def _cleanup_context_wrapper( + cctx, + app: web.Application, + ) -> AsyncIterator[None]: # aiohttp's cleanup contexts are just async generators, not async context managers. - cctx_instance = cctx(app['_root.context']) - app['_cctx_instances'].append(cctx_instance) + cctx_instance = cctx(app["_root.context"]) + app["_cctx_instances"].append(cctx_instance) try: async with cctx_instance: yield except Exception as e: exc_info = (type(e), e, e.__traceback__) - log.error('Error initializing cleanup_contexts: {0}', cctx.__name__, exc_info=exc_info) + log.error( + "Error initializing cleanup_contexts: {0}", + cctx.__name__, + exc_info=exc_info, + ) async def _call_cleanup_context_shutdown_handlers(app: web.Application) -> None: - for cctx in app['_cctx_instances']: - if hasattr(cctx, 'shutdown'): + for cctx in app["_cctx_instances"]: + if hasattr(cctx, "shutdown"): try: await cctx.shutdown() except Exception: log.exception("error while shutting down a cleanup context") - app['_cctx_instances'] = [] + app["_cctx_instances"] = [] app.on_shutdown.append(_call_cleanup_context_shutdown_handlers) for cleanup_ctx in cleanup_contexts: app.cleanup_ctx.append( @@ -546,15 +587,15 @@ async def _call_cleanup_context_shutdown_handlers(app: web.Application) -> None: ) cors = aiohttp_cors.setup(app, defaults=root_ctx.cors_options) # should be done in create_app() in other modules. - cors.add(app.router.add_route('GET', r'', hello)) - cors.add(app.router.add_route('GET', r'/', hello)) + cors.add(app.router.add_route("GET", r"", hello)) + cors.add(app.router.add_route("GET", r"/", hello)) if subapp_pkgs is None: subapp_pkgs = [] for pkg_name in subapp_pkgs: if pidx == 0: - log.info('Loading module: {0}', pkg_name[1:]) - subapp_mod = importlib.import_module(pkg_name, 'ai.backend.manager.api') - init_subapp(pkg_name, app, getattr(subapp_mod, 'create_app')) + log.info("Loading module: {0}", pkg_name[1:]) + subapp_mod = importlib.import_module(pkg_name, "ai.backend.manager.api") + init_subapp(pkg_name, app, getattr(subapp_mod, "create_app")) return app @@ -565,30 +606,34 @@ async def server_main( _args: List[Any], ) -> AsyncIterator[None]: subapp_pkgs = [ - '.etcd', '.events', - '.auth', '.ratelimit', - '.vfolder', '.admin', - '.session', - '.stream', - '.manager', - '.resource', - '.scaling_group', - '.cluster_template', - '.session_template', - '.image', - '.userconfig', - '.domainconfig', - '.groupconfig', - '.logs', + ".etcd", + ".events", + ".auth", + ".ratelimit", + ".vfolder", + ".filebrowser", + ".admin", + ".session", + ".stream", + ".manager", + ".resource", + ".scaling_group", + ".cluster_template", + ".session_template", + ".image", + ".userconfig", + ".domainconfig", + ".groupconfig", + ".logs", ] root_app = build_root_app(pidx, _args[0], subapp_pkgs=subapp_pkgs) - root_ctx: RootContext = root_app['_root.context'] + root_ctx: RootContext = root_app["_root.context"] # Start aiomonitor. # Port is set by config (default=50001). m = aiomonitor.Monitor( loop, - port=root_ctx.local_config['manager']['aiomonitor-port'] + pidx, + port=root_ctx.local_config["manager"]["aiomonitor-port"] + pidx, console_enabled=False, ) m.prompt = f"monitor (manager[{pidx}@{os.getpid()}]) >>> " @@ -602,16 +647,16 @@ async def server_main( webapp_plugin_ctx(root_app), ): ssl_ctx = None - if root_ctx.local_config['manager']['ssl-enabled']: + if root_ctx.local_config["manager"]["ssl-enabled"]: ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_ctx.load_cert_chain( - str(root_ctx.local_config['manager']['ssl-cert']), - str(root_ctx.local_config['manager']['ssl-privkey']), + str(root_ctx.local_config["manager"]["ssl-cert"]), + str(root_ctx.local_config["manager"]["ssl-privkey"]), ) runner = web.AppRunner(root_app, keepalive_timeout=30.0) await runner.setup() - service_addr = root_ctx.local_config['manager']['service-addr'] + service_addr = root_ctx.local_config["manager"]["service-addr"] site = web.TCPSite( runner, str(service_addr.host), @@ -623,21 +668,24 @@ async def server_main( await site.start() if os.geteuid() == 0: - uid = root_ctx.local_config['manager']['user'] - gid = root_ctx.local_config['manager']['group'] - os.setgroups([ - g.gr_gid for g in grp.getgrall() - if pwd.getpwuid(uid).pw_name in g.gr_mem - ]) + uid = root_ctx.local_config["manager"]["user"] + gid = root_ctx.local_config["manager"]["group"] + os.setgroups( + [ + g.gr_gid + for g in grp.getgrall() + if pwd.getpwuid(uid).pw_name in g.gr_mem + ], + ) os.setgid(gid) os.setuid(uid) - log.info('changed process uid and gid to {}:{}', uid, gid) - log.info('started handling API requests at {}', service_addr) + log.info("changed process uid and gid to {}:{}", uid, gid) + log.info("started handling API requests at {}", service_addr) try: yield finally: - log.info('shutting down...') + log.info("shutting down...") await runner.cleanup() @@ -649,7 +697,7 @@ async def server_main_logwrapper( ) -> AsyncIterator[None]: setproctitle(f"backend.ai: manager worker-{pidx}") log_endpoint = _args[1] - logger = Logger(_args[0]['logging'], is_master=False, log_endpoint=log_endpoint) + logger = Logger(_args[0]["logging"], is_master=False, log_endpoint=log_endpoint) try: with logger: async with server_main(loop, pidx, _args): @@ -659,10 +707,19 @@ async def server_main_logwrapper( @click.group(invoke_without_command=True) -@click.option('-f', '--config-path', '--config', type=Path, default=None, - help='The config file path. (default: ./manager.toml and /etc/backend.ai/manager.toml)') -@click.option('--debug', is_flag=True, - help='Enable the debug mode and override the global log level to DEBUG.') +@click.option( + "-f", + "--config-path", + "--config", + type=Path, + default=None, + help="The config file path. (default: ./manager.toml and /etc/backend.ai/manager.toml)", +) +@click.option( + "--debug", + is_flag=True, + help="Enable the debug mode and override the global log level to DEBUG.", +) @click.pass_context def main(ctx: click.Context, config_path: Path, debug: bool) -> None: """ @@ -672,44 +729,45 @@ def main(ctx: click.Context, config_path: Path, debug: bool) -> None: cfg = load_config(config_path, debug) if ctx.invoked_subcommand is None: - cfg['manager']['pid-file'].write_text(str(os.getpid())) - log_sockpath = Path(f'/tmp/backend.ai/ipc/manager-logger-{os.getpid()}.sock') + cfg["manager"]["pid-file"].write_text(str(os.getpid())) + log_sockpath = Path(f"/tmp/backend.ai/ipc/manager-logger-{os.getpid()}.sock") log_sockpath.parent.mkdir(parents=True, exist_ok=True) - log_endpoint = f'ipc://{log_sockpath}' + log_endpoint = f"ipc://{log_sockpath}" try: - logger = Logger(cfg['logging'], is_master=True, log_endpoint=log_endpoint) + logger = Logger(cfg["logging"], is_master=True, log_endpoint=log_endpoint) with logger: - ns = cfg['etcd']['namespace'] + ns = cfg["etcd"]["namespace"] setproctitle(f"backend.ai: manager {ns}") - log.info('Backend.AI Manager {0}', __version__) - log.info('runtime: {0}', env_info()) - log_config = logging.getLogger('ai.backend.manager.config') - log_config.debug('debug mode enabled.') - if cfg['manager']['event-loop'] == 'uvloop': + log.info("Backend.AI Manager {0}", __version__) + log.info("runtime: {0}", env_info()) + log_config = logging.getLogger("ai.backend.manager.config") + log_config.debug("debug mode enabled.") + if cfg["manager"]["event-loop"] == "uvloop": import uvloop + uvloop.install() - log.info('Using uvloop as the event loop backend') + log.info("Using uvloop as the event loop backend") try: aiotools.start_server( server_main_logwrapper, - num_workers=cfg['manager']['num-proc'], + num_workers=cfg["manager"]["num-proc"], args=(cfg, log_endpoint), ) finally: - log.info('terminated.') + log.info("terminated.") finally: - if cfg['manager']['pid-file'].is_file(): + if cfg["manager"]["pid-file"].is_file(): # check is_file() to prevent deleting /dev/null! - cfg['manager']['pid-file'].unlink() + cfg["manager"]["pid-file"].unlink() else: # Click is going to invoke a subcommand. pass -@main.group(cls=LazyGroup, import_name='ai.backend.manager.api.auth:cli') +@main.group(cls=LazyGroup, import_name="ai.backend.manager.api.auth:cli") def auth() -> None: pass -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) From 7c58cdc449a83f1eceb1896b8a8af33221bd1f97 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Tue, 25 Jan 2022 15:46:19 +0900 Subject: [PATCH 09/15] fix: add filebrowser destroy feature --- src/ai/backend/manager/api/filebrowser.py | 59 ++++++++++++++--------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py index fdcd857bd..34ed73c8c 100644 --- a/src/ai/backend/manager/api/filebrowser.py +++ b/src/ai/backend/manager/api/filebrowser.py @@ -63,13 +63,14 @@ async def create_or_update_filebrowser( root_ctx: RootContext = request.app["_root.context"] vfolders = [] + + # Search for vfid based on vfolder name. And then get relevant host address and volume. for vfolder_name in params["vfolders"]: vfolders.append( {"name": vfolder_name, "vfid": await get_vfid(root_ctx, vfolder_name)}, ) host = await get_volume(root_ctx, await get_vfid(root_ctx, vfolder_name)) - proxy_name, _ = root_ctx.storage_manager.split_host(host) try: @@ -94,31 +95,43 @@ async def create_or_update_filebrowser( @auth_required @server_status_required(READ_ALLOWED) +@check_api_params( + t.Dict( + { + t.Key("container_id"): t.String, + }, + ), +) async def destroy_filebrowser( request: web.Request, + params: Any, ) -> web.Response: root_ctx: RootContext = request.app["_root.context"] - - proxy_name, _ = root_ctx.storage_manager.split_host("local: volume1") - - try: - proxy_info = root_ctx.storage_manager._proxies[proxy_name] - except KeyError: - raise InvalidArgument("There is no such storage proxy", proxy_name) - - headers = {} - headers["X-BackendAI-Storage-Auth-Token"] = proxy_info.secret - - try: - async with proxy_info.session.request( - "DELETE", - proxy_info.manager_api_url / "browser/destroy", - headers=headers, - ) as client_resp: - print("real url ", client_resp.real_url()) - return web.json_response(await client_resp.json()) - except aiohttp.ClientResponseError: - raise + container_id = params["container_id"] + + volumes = await root_ctx.storage_manager.get_all_volumes() + + # search for volume among available volumes which has file browser container id in order to destroy + for volume in volumes: + proxy_name = volume[0] + try: + proxy_info = root_ctx.storage_manager._proxies[proxy_name] + except KeyError: + raise InvalidArgument("There is no such storage proxy", proxy_name) + + headers = {} + headers["X-BackendAI-Storage-Auth-Token"] = proxy_info.secret + auth_token = proxy_info.secret + try: + async with proxy_info.session.request( + "DELETE", + proxy_info.manager_api_url / "browser/destroy", + headers=headers, + json={"container_id": container_id, "auth_token": auth_token}, + ) as client_resp: + return web.json_response(await client_resp.json()) + except aiohttp.ClientResponseError: + raise async def init(app: web.Application) -> None: @@ -143,6 +156,6 @@ def create_app( app.on_shutdown.append(shutdown) cors = aiohttp_cors.setup(app, defaults=default_cors_options) cors.add(app.router.add_route("POST", r"/create", create_or_update_filebrowser)) - cors.add(app.router.add_route("DELETE", "/destroy", destroy_filebrowser)) + cors.add(app.router.add_route("DELETE", r"/destroy", destroy_filebrowser)) return app, [] From d22d954509bcc9051cf433ac7390543a42e75ba9 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Tue, 25 Jan 2022 16:17:48 +0900 Subject: [PATCH 10/15] fix: mypy error --- src/ai/backend/manager/api/filebrowser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py index 34ed73c8c..81f23adb5 100644 --- a/src/ai/backend/manager/api/filebrowser.py +++ b/src/ai/backend/manager/api/filebrowser.py @@ -131,7 +131,7 @@ async def destroy_filebrowser( ) as client_resp: return web.json_response(await client_resp.json()) except aiohttp.ClientResponseError: - raise + return web.json_response({"status": 'fail'}) async def init(app: web.Application) -> None: From c2f61d528df08007e133a77edcdf5c3a5014ad4e Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Tue, 25 Jan 2022 16:21:31 +0900 Subject: [PATCH 11/15] fix: mypy error with return statement --- src/ai/backend/manager/api/filebrowser.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py index 81f23adb5..1fa904bb1 100644 --- a/src/ai/backend/manager/api/filebrowser.py +++ b/src/ai/backend/manager/api/filebrowser.py @@ -122,16 +122,14 @@ async def destroy_filebrowser( headers = {} headers["X-BackendAI-Storage-Auth-Token"] = proxy_info.secret auth_token = proxy_info.secret - try: - async with proxy_info.session.request( - "DELETE", - proxy_info.manager_api_url / "browser/destroy", - headers=headers, - json={"container_id": container_id, "auth_token": auth_token}, - ) as client_resp: - return web.json_response(await client_resp.json()) - except aiohttp.ClientResponseError: - return web.json_response({"status": 'fail'}) + + async with proxy_info.session.request( + "DELETE", + proxy_info.manager_api_url / "browser/destroy", + headers=headers, + json={"container_id": container_id, "auth_token": auth_token}, + ) as client_resp: + return web.json_response(await client_resp.json()) async def init(app: web.Application) -> None: From b152e7bcdfc49956121803bfeb338b2feb131fd1 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Tue, 25 Jan 2022 16:33:12 +0900 Subject: [PATCH 12/15] fix: mypy return statement --- src/ai/backend/manager/api/filebrowser.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py index 1fa904bb1..e431a7563 100644 --- a/src/ai/backend/manager/api/filebrowser.py +++ b/src/ai/backend/manager/api/filebrowser.py @@ -122,14 +122,17 @@ async def destroy_filebrowser( headers = {} headers["X-BackendAI-Storage-Auth-Token"] = proxy_info.secret auth_token = proxy_info.secret - - async with proxy_info.session.request( - "DELETE", - proxy_info.manager_api_url / "browser/destroy", - headers=headers, - json={"container_id": container_id, "auth_token": auth_token}, - ) as client_resp: - return web.json_response(await client_resp.json()) + try: + async with proxy_info.session.request( + "DELETE", + proxy_info.manager_api_url / "browser/destroy", + headers=headers, + json={"container_id": container_id, "auth_token": auth_token}, + ) as client_resp: + return web.json_response(await client_resp.json()) + except aiohttp.ClientResponseError: + raise + return web.json_response({"status": "fail"}) async def init(app: web.Application) -> None: From d2a325b5a6096faba5b1c615b049f8118c0ae1a8 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Tue, 22 Feb 2022 17:25:00 +0900 Subject: [PATCH 13/15] fix: to resolve the git feedback --- src/ai/backend/manager/api/filebrowser.py | 12 +- src/ai/backend/manager/server.py | 525 ++++++++++------------ 2 files changed, 239 insertions(+), 298 deletions(-) diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py index e431a7563..3b0502cd6 100644 --- a/src/ai/backend/manager/api/filebrowser.py +++ b/src/ai/backend/manager/api/filebrowser.py @@ -22,7 +22,7 @@ async def get_vfid(root_ctx: RootContext, name: str) -> str: - async with root_ctx.db.begin() as conn: + async with root_ctx.db.begin_readonly() as conn: query = ( sa.select([vfolders.c.id]) .select_from(vfolders) @@ -30,13 +30,11 @@ async def get_vfid(root_ctx: RootContext, name: str) -> str: ) folder_id = await conn.scalar(query) - query = sa.delete(vfolders).where(vfolders.c.id == folder_id) - return folder_id.hex async def get_volume(root_ctx: RootContext, vfid: str) -> str: - async with root_ctx.db.begin() as conn: + async with root_ctx.db.begin_readonly() as conn: query = ( sa.select([vfolders.c.host]) .select_from(vfolders) @@ -84,7 +82,7 @@ async def create_or_update_filebrowser( try: async with proxy_info.session.request( "POST", - proxy_info.manager_api_url / "browser/create", + proxy_info.manager_api_url / "storage/filebrowser/create", headers=headers, json={"vfolders": vfolders}, ) as client_resp: @@ -125,7 +123,7 @@ async def destroy_filebrowser( try: async with proxy_info.session.request( "DELETE", - proxy_info.manager_api_url / "browser/destroy", + proxy_info.manager_api_url / "storage/filebrowser/destroy", headers=headers, json={"container_id": container_id, "auth_token": auth_token}, ) as client_resp: @@ -147,7 +145,7 @@ def create_app( default_cors_options: CORSOptions, ) -> Tuple[web.Application, Iterable[WebMiddleware]]: app = web.Application() - app["prefix"] = "browser" + app["prefix"] = "storage/filebrowser" app["api_versions"] = ( 2, 3, diff --git a/src/ai/backend/manager/server.py b/src/ai/backend/manager/server.py index ce97137da..db7fa8872 100644 --- a/src/ai/backend/manager/server.py +++ b/src/ai/backend/manager/server.py @@ -1,19 +1,16 @@ from __future__ import annotations import asyncio +from contextlib import asynccontextmanager as actxmgr, closing +from datetime import datetime import functools -import grp import importlib import logging import os -import pwd +import pwd, grp import ssl import sys import traceback -from contextlib import asynccontextmanager as actxmgr -from contextlib import closing -from datetime import datetime -from pathlib import Path from typing import ( Any, AsyncIterator, @@ -26,41 +23,50 @@ cast, ) +from aiohttp import web import aiohttp_cors -import aiomonitor import aiotools import click +from pathlib import Path +from setproctitle import setproctitle +import aiomonitor + from ai.backend.common import redis from ai.backend.common.cli import LazyGroup from ai.backend.common.events import EventDispatcher, EventProducer -from ai.backend.common.logging import BraceStyleAdapter, Logger -from ai.backend.common.plugin.hook import ALL_COMPLETED, PASSED, HookPluginContext +from ai.backend.common.utils import env_info +from ai.backend.common.logging import Logger, BraceStyleAdapter +from ai.backend.common.plugin.hook import HookPluginContext, ALL_COMPLETED, PASSED from ai.backend.common.plugin.monitor import ( - INCREMENT, ErrorPluginContext, StatsPluginContext, + INCREMENT, ) -from ai.backend.common.utils import env_info -from aiohttp import web -from setproctitle import setproctitle from . import __version__ from .api.context import RootContext from .api.exceptions import ( BackendError, - GenericBadRequest, + MethodNotAllowed, GenericNotFound, + GenericBadRequest, InternalServerError, InvalidAPIParameters, - MethodNotAllowed, ) from .api.manager import ManagerStatus -from .api.types import AppCreator, CleanupContext, WebMiddleware, WebRequestHandler +from .api.types import ( + AppCreator, + WebRequestHandler, WebMiddleware, + CleanupContext, +) from .background import BackgroundTaskManager -from .config import LocalConfig, SharedConfig -from .config import load as load_config -from .config import volume_config_iv -from .defs import REDIS_IMAGE_DB, REDIS_LIVE_DB, REDIS_STAT_DB, REDIS_STREAM_DB +from .config import ( + LocalConfig, + SharedConfig, + load as load_config, + volume_config_iv, +) +from .defs import REDIS_STAT_DB, REDIS_LIVE_DB, REDIS_IMAGE_DB, REDIS_STREAM_DB from .exceptions import InvalidArgument from .idle import create_idle_checkers from .models.storage import StorageSessionManager @@ -69,65 +75,69 @@ from .registry import AgentRegistry from .scheduler.dispatcher import SchedulerDispatcher -VALID_VERSIONS: Final = frozenset( - [ - # 'v1.20160915', # deprecated - # 'v2.20170315', # deprecated - # 'v3.20170615', # deprecated - # authentication changed not to use request bodies - "v4.20181215", - # added & enabled streaming-execute API - "v4.20190115", - # changed resource/image formats - "v4.20190315", - # added user mgmt and ID/password authentication - # added domain/group/scaling-group - # added domain/group/scaling-group ref. fields to user/keypair/vfolder objects - "v4.20190615", - # added mount_map parameter when creating kernel - # changed GraphQL query structures for multi-container bundled sessions - "v5.20191215", - # rewrote vfolder upload/download APIs to migrate to external storage proxies - "v6.20200815", - # added standard-compliant /admin/gql endpoint - # deprecated /admin/graphql endpoint (still present for backward compatibility) - # added "groups_by_name" GQL query - # added "filter" and "order" arg to all paginated GQL queries - # with their own expression mini-langs - # removed "order_key" and "order_asc" arguments from all paginated GQL queries (never used!) - "v6.20210815", - ], -) +VALID_VERSIONS: Final = frozenset([ + # 'v1.20160915', # deprecated + # 'v2.20170315', # deprecated + # 'v3.20170615', # deprecated + + # authentication changed not to use request bodies + 'v4.20181215', + + # added & enabled streaming-execute API + 'v4.20190115', + + # changed resource/image formats + 'v4.20190315', + + # added user mgmt and ID/password authentication + # added domain/group/scaling-group + # added domain/group/scaling-group ref. fields to user/keypair/vfolder objects + 'v4.20190615', + + # added mount_map parameter when creating kernel + # changed GraphQL query structures for multi-container bundled sessions + 'v5.20191215', + + # rewrote vfolder upload/download APIs to migrate to external storage proxies + 'v6.20200815', + + # added standard-compliant /admin/gql endpoint + # deprecated /admin/graphql endpoint (still present for backward compatibility) + # added "groups_by_name" GQL query + # added "filter" and "order" arg to all paginated GQL queries with their own expression mini-langs + # removed "order_key" and "order_asc" arguments from all paginated GQL queries (never used!) + 'v6.20210815', +]) LATEST_REV_DATES: Final = { - 1: "20160915", - 2: "20170915", - 3: "20181215", - 4: "20190615", - 5: "20191215", - 6: "20210815", + 1: '20160915', + 2: '20170915', + 3: '20181215', + 4: '20190615', + 5: '20191215', + 6: '20210815', } -LATEST_API_VERSION: Final = "v6.20210815" +LATEST_API_VERSION: Final = 'v6.20210815' log = BraceStyleAdapter(logging.getLogger(__name__)) PUBLIC_INTERFACES: Final = [ - "pidx", - "background_task_manager", - "local_config", - "shared_config", - "db", - "registry", - "redis_live", - "redis_stat", - "redis_image", - "redis_stream", - "event_dispatcher", - "event_producer", - "idle_checkers", - "storage_manager", - "stats_monitor", - "error_monitor", - "hook_plugin_ctx", + 'pidx', + 'background_task_manager', + 'local_config', + 'shared_config', + 'db', + 'registry', + 'redis_live', + 'redis_stat', + 'redis_image', + 'redis_stream', + 'event_dispatcher', + 'event_producer', + 'idle_checkers', + 'storage_manager', + 'stats_monitor', + 'error_monitor', + 'hook_plugin_ctx', ] public_interface_objs: MutableMapping[str, Any] = {} @@ -137,115 +147,98 @@ async def hello(request: web.Request) -> web.Response: """ Returns the API version number. """ - return web.json_response( - { - "version": LATEST_API_VERSION, - "manager": __version__, - }, - ) + return web.json_response({ + 'version': LATEST_API_VERSION, + 'manager': __version__, + }) async def on_prepare(request: web.Request, response: web.StreamResponse) -> None: - response.headers["Server"] = "BackendAI" + response.headers['Server'] = 'BackendAI' @web.middleware -async def api_middleware( - request: web.Request, - handler: WebRequestHandler, -) -> web.StreamResponse: +async def api_middleware(request: web.Request, + handler: WebRequestHandler) -> web.StreamResponse: _handler = handler - method_override = request.headers.get("X-Method-Override", None) + method_override = request.headers.get('X-Method-Override', None) if method_override: request = request.clone(method=method_override) new_match_info = await request.app.router.resolve(request) if new_match_info is None: - raise InternalServerError("No matching method handler found") + raise InternalServerError('No matching method handler found') _handler = new_match_info.handler request._match_info = new_match_info # type: ignore # this is a hack ex = request.match_info.http_exception if ex is not None: # handled by exception_middleware raise ex - new_api_version = request.headers.get("X-BackendAI-Version") - legacy_api_version = request.headers.get("X-Sorna-Version") + new_api_version = request.headers.get('X-BackendAI-Version') + legacy_api_version = request.headers.get('X-Sorna-Version') api_version = new_api_version or legacy_api_version try: if api_version is None: - path_major_version = int(request.match_info.get("version", 5)) + path_major_version = int(request.match_info.get('version', 5)) revision_date = LATEST_REV_DATES[path_major_version] - request["api_version"] = (path_major_version, revision_date) + request['api_version'] = (path_major_version, revision_date) elif api_version in VALID_VERSIONS: - hdr_major_version, revision_date = api_version.split(".", maxsplit=1) - request["api_version"] = (int(hdr_major_version[1:]), revision_date) + hdr_major_version, revision_date = api_version.split('.', maxsplit=1) + request['api_version'] = (int(hdr_major_version[1:]), revision_date) else: - return GenericBadRequest("Unsupported API version.") + return GenericBadRequest('Unsupported API version.') except (ValueError, KeyError): - return GenericBadRequest("Unsupported API version.") - resp = await _handler(request) + return GenericBadRequest('Unsupported API version.') + resp = (await _handler(request)) return resp @web.middleware -async def exception_middleware( - request: web.Request, - handler: WebRequestHandler, -) -> web.StreamResponse: - root_ctx: RootContext = request.app["_root.context"] +async def exception_middleware(request: web.Request, + handler: WebRequestHandler) -> web.StreamResponse: + root_ctx: RootContext = request.app['_root.context'] error_monitor = root_ctx.error_monitor stats_monitor = root_ctx.stats_monitor try: - await stats_monitor.report_metric(INCREMENT, "ai.backend.manager.api.requests") - resp = await handler(request) + await stats_monitor.report_metric(INCREMENT, 'ai.backend.manager.api.requests') + resp = (await handler(request)) except InvalidArgument as ex: if len(ex.args) > 1: - raise InvalidAPIParameters( - f"{ex.args[0]}: {', '.join(map(str, ex.args[1:]))}", - ) + raise InvalidAPIParameters(f"{ex.args[0]}: {', '.join(map(str, ex.args[1:]))}") elif len(ex.args) == 1: raise InvalidAPIParameters(ex.args[0]) else: raise InvalidAPIParameters() except BackendError as ex: if ex.status_code == 500: - log.warning("Internal server error raised inside handlers") + log.warning('Internal server error raised inside handlers') await error_monitor.capture_exception() - await stats_monitor.report_metric(INCREMENT, "ai.backend.manager.api.failures") - await stats_monitor.report_metric( - INCREMENT, - f"ai.backend.manager.api.status.{ex.status_code}", - ) + await stats_monitor.report_metric(INCREMENT, 'ai.backend.manager.api.failures') + await stats_monitor.report_metric(INCREMENT, f'ai.backend.manager.api.status.{ex.status_code}') raise except web.HTTPException as ex: - await stats_monitor.report_metric(INCREMENT, "ai.backend.manager.api.failures") - await stats_monitor.report_metric( - INCREMENT, - f"ai.backend.manager.api.status.{ex.status_code}", - ) + await stats_monitor.report_metric(INCREMENT, 'ai.backend.manager.api.failures') + await stats_monitor.report_metric(INCREMENT, f'ai.backend.manager.api.status.{ex.status_code}') if ex.status_code == 404: raise GenericNotFound(f"Unknown URL path: {request.path}") if ex.status_code == 405: concrete_ex = cast(web.HTTPMethodNotAllowed, ex) raise MethodNotAllowed(concrete_ex.method, concrete_ex.allowed_methods) - log.warning("Bad request: {0!r}", ex) + log.warning('Bad request: {0!r}', ex) raise GenericBadRequest except asyncio.CancelledError as e: # The server is closing or the client has disconnected in the middle of # request. Atomic requests are still executed to their ends. - log.debug("Request cancelled ({0} {1})", request.method, request.rel_url) + log.debug('Request cancelled ({0} {1})', request.method, request.rel_url) raise e except Exception as e: await error_monitor.capture_exception() - log.exception("Uncaught exception in HTTP request handlers {0!r}", e) - if root_ctx.local_config["debug"]["enabled"]: + log.exception('Uncaught exception in HTTP request handlers {0!r}', e) + if root_ctx.local_config['debug']['enabled']: raise InternalServerError(traceback.format_exc()) else: raise InternalServerError() else: - await stats_monitor.report_metric( - INCREMENT, - f"ai.backend.manager.api.status.{resp.status}", - ) + await stats_monitor.report_metric(INCREMENT, f'ai.backend.manager.api.status.{resp.status}') return resp @@ -253,10 +246,10 @@ async def exception_middleware( async def shared_config_ctx(root_ctx: RootContext) -> AsyncIterator[None]: # populate public interfaces root_ctx.shared_config = SharedConfig( - root_ctx.local_config["etcd"]["addr"], - root_ctx.local_config["etcd"]["user"], - root_ctx.local_config["etcd"]["password"], - root_ctx.local_config["etcd"]["namespace"], + root_ctx.local_config['etcd']['addr'], + root_ctx.local_config['etcd']['user'], + root_ctx.local_config['etcd']['password'], + root_ctx.local_config['etcd']['namespace'], ) await root_ctx.shared_config.reload() yield @@ -265,16 +258,14 @@ async def shared_config_ctx(root_ctx: RootContext) -> AsyncIterator[None]: @actxmgr async def webapp_plugin_ctx(root_app: web.Application) -> AsyncIterator[None]: - root_ctx: RootContext = root_app["_root.context"] + root_ctx: RootContext = root_app['_root.context'] plugin_ctx = WebappPluginContext(root_ctx.shared_config.etcd, root_ctx.local_config) await plugin_ctx.init() root_ctx.webapp_plugin_ctx = plugin_ctx for plugin_name, plugin_instance in plugin_ctx.plugins.items(): if root_ctx.pidx == 0: - log.info("Loading webapp plugin: {0}", plugin_name) - subapp, global_middlewares = await plugin_instance.create_app( - root_ctx.cors_options, - ) + log.info('Loading webapp plugin: {0}', plugin_name) + subapp, global_middlewares = await plugin_instance.create_app(root_ctx.cors_options) _init_subapp(plugin_name, root_app, subapp, global_middlewares) yield await plugin_ctx.cleanup() @@ -284,37 +275,26 @@ async def webapp_plugin_ctx(root_app: web.Application) -> AsyncIterator[None]: async def manager_status_ctx(root_ctx: RootContext) -> AsyncIterator[None]: if root_ctx.pidx == 0: mgr_status = await root_ctx.shared_config.get_manager_status() - if mgr_status is None or mgr_status not in ( - ManagerStatus.RUNNING, - ManagerStatus.FROZEN, - ): + if mgr_status is None or mgr_status not in (ManagerStatus.RUNNING, ManagerStatus.FROZEN): # legacy transition: we now have only RUNNING or FROZEN for HA setup. await root_ctx.shared_config.update_manager_status(ManagerStatus.RUNNING) mgr_status = ManagerStatus.RUNNING - log.info("Manager status: {}", mgr_status) - tz = root_ctx.shared_config["system"]["timezone"] - log.info("Configured timezone: {}", tz.tzname(datetime.now())) + log.info('Manager status: {}', mgr_status) + tz = root_ctx.shared_config['system']['timezone'] + log.info('Configured timezone: {}', tz.tzname(datetime.now())) yield @actxmgr async def redis_ctx(root_ctx: RootContext) -> AsyncIterator[None]: - root_ctx.redis_live = redis.get_redis_object( - root_ctx.shared_config.data["redis"], - db=REDIS_LIVE_DB, - ) - root_ctx.redis_stat = redis.get_redis_object( - root_ctx.shared_config.data["redis"], - db=REDIS_STAT_DB, - ) + root_ctx.redis_live = redis.get_redis_object(root_ctx.shared_config.data['redis'], db=REDIS_LIVE_DB) + root_ctx.redis_stat = redis.get_redis_object(root_ctx.shared_config.data['redis'], db=REDIS_STAT_DB) root_ctx.redis_image = redis.get_redis_object( - root_ctx.shared_config.data["redis"], - db=REDIS_IMAGE_DB, + root_ctx.shared_config.data['redis'], db=REDIS_IMAGE_DB, ) root_ctx.redis_stream = redis.get_redis_object( - root_ctx.shared_config.data["redis"], - db=REDIS_STREAM_DB, + root_ctx.shared_config.data['redis'], db=REDIS_STREAM_DB, ) yield await root_ctx.redis_stream.close() @@ -334,14 +314,14 @@ async def database_ctx(root_ctx: RootContext) -> AsyncIterator[None]: async def event_dispatcher_ctx(root_ctx: RootContext) -> AsyncIterator[None]: root_ctx.event_producer = await EventProducer.new( - root_ctx.shared_config.data["redis"], + root_ctx.shared_config.data['redis'], db=REDIS_STREAM_DB, ) root_ctx.event_dispatcher = await EventDispatcher.new( - root_ctx.shared_config.data["redis"], + root_ctx.shared_config.data['redis'], db=REDIS_STREAM_DB, - log_events=root_ctx.local_config["debug"]["log-events"], - node_id=root_ctx.local_config["manager"]["id"], + log_events=root_ctx.local_config['debug']['log-events'], + node_id=root_ctx.local_config['manager']['id'], ) yield await root_ctx.event_dispatcher.close() @@ -363,7 +343,7 @@ async def idle_checker_ctx(root_ctx: RootContext) -> AsyncIterator[None]: @actxmgr async def storage_manager_ctx(root_ctx: RootContext) -> AsyncIterator[None]: - raw_vol_config = await root_ctx.shared_config.etcd.get_prefix("volumes") + raw_vol_config = await root_ctx.shared_config.etcd.get_prefix('volumes') config = volume_config_iv.check(raw_vol_config) root_ctx.storage_manager = StorageSessionManager(config) yield @@ -376,12 +356,12 @@ async def hook_plugin_ctx(root_ctx: RootContext) -> AsyncIterator[None]: root_ctx.hook_plugin_ctx = ctx await ctx.init() hook_result = await ctx.dispatch( - "ACTIVATE_MANAGER", + 'ACTIVATE_MANAGER', (), return_when=ALL_COMPLETED, ) if hook_result.status != PASSED: - raise RuntimeError("Could not activate the manager instance.") + raise RuntimeError('Could not activate the manager instance.') yield await ctx.cleanup() @@ -407,10 +387,8 @@ async def agent_registry_ctx(root_ctx: RootContext) -> AsyncIterator[None]: @actxmgr async def sched_dispatcher_ctx(root_ctx: RootContext) -> AsyncIterator[None]: sched_dispatcher = await SchedulerDispatcher.new( - root_ctx.local_config, - root_ctx.shared_config, - root_ctx.event_dispatcher, - root_ctx.event_producer, + root_ctx.local_config, root_ctx.shared_config, + root_ctx.event_dispatcher, root_ctx.event_producer, root_ctx.registry, ) yield @@ -421,7 +399,7 @@ async def sched_dispatcher_ctx(root_ctx: RootContext) -> AsyncIterator[None]: async def monitoring_ctx(root_ctx: RootContext) -> AsyncIterator[None]: ectx = ErrorPluginContext(root_ctx.shared_config.etcd, root_ctx.local_config) sctx = StatsPluginContext(root_ctx.shared_config.etcd, root_ctx.local_config) - await ectx.init(context={"_root.context": root_ctx}) + await ectx.init(context={'_root.context': root_ctx}) await sctx.init() root_ctx.error_monitor = ectx root_ctx.stats_monitor = sctx @@ -431,19 +409,18 @@ async def monitoring_ctx(root_ctx: RootContext) -> AsyncIterator[None]: class background_task_ctx: + def __init__(self, root_ctx: RootContext) -> None: self.root_ctx = root_ctx async def __aenter__(self) -> None: - self.root_ctx.background_task_manager = BackgroundTaskManager( - self.root_ctx.event_producer, - ) + self.root_ctx.background_task_manager = BackgroundTaskManager(self.root_ctx.event_producer) async def __aexit__(self, *exc_info) -> None: pass async def shutdown(self) -> None: - if hasattr(self.root_ctx, "background_task_manager"): + if hasattr(self.root_ctx, 'background_task_manager'): await self.root_ctx.background_task_manager.shutdown() @@ -452,20 +429,18 @@ def handle_loop_error( loop: asyncio.AbstractEventLoop, context: Mapping[str, Any], ) -> None: - exception = context.get("exception") - msg = context.get("message", "(empty message)") + exception = context.get('exception') + msg = context.get('message', '(empty message)') if exception is not None: if sys.exc_info()[0] is not None: - log.exception("Error inside event loop: {0}", msg) - if (error_monitor := getattr(root_ctx, "error_monitor", None)) is not None: + log.exception('Error inside event loop: {0}', msg) + if (error_monitor := getattr(root_ctx, 'error_monitor', None)) is not None: loop.create_task(error_monitor.capture_exception()) else: exc_info = (type(exception), exception, exception.__traceback__) - log.error("Error inside event loop: {0}", msg, exc_info=exc_info) - if (error_monitor := getattr(root_ctx, "error_monitor", None)) is not None: - loop.create_task( - error_monitor.capture_exception(exc_instance=exception), - ) + log.error('Error inside event loop: {0}', msg, exc_info=exc_info) + if (error_monitor := getattr(root_ctx, 'error_monitor', None)) is not None: + loop.create_task(error_monitor.capture_exception(exc_instance=exception)) def _init_subapp( @@ -479,60 +454,51 @@ def _init_subapp( async def _set_root_ctx(subapp: web.Application): # Allow subapp's access to the root app properties. # These are the public APIs exposed to plugins as well. - subapp["_root.context"] = root_app["_root.context"] + subapp['_root.context'] = root_app['_root.context'] # We must copy the public interface prior to all user-defined startup signal handlers. subapp.on_startup.insert(0, _set_root_ctx) - prefix = subapp.get("prefix", pkg_name.split(".")[-1].replace("_", "-")) - root_app.add_subapp("/" + prefix, subapp) + prefix = subapp.get('prefix', pkg_name.split('.')[-1].replace('_', '-')) + root_app.add_subapp('/' + prefix, subapp) root_app.middlewares.extend(global_middlewares) -def init_subapp( - pkg_name: str, - root_app: web.Application, - create_subapp: AppCreator, -) -> None: - root_ctx: RootContext = root_app["_root.context"] +def init_subapp(pkg_name: str, root_app: web.Application, create_subapp: AppCreator) -> None: + root_ctx: RootContext = root_app['_root.context'] subapp, global_middlewares = create_subapp(root_ctx.cors_options) _init_subapp(pkg_name, root_app, subapp, global_middlewares) def build_root_app( pidx: int, - local_config: LocalConfig, - *, + local_config: LocalConfig, *, cleanup_contexts: Sequence[CleanupContext] = None, subapp_pkgs: Sequence[str] = None, scheduler_opts: Mapping[str, Any] = None, ) -> web.Application: public_interface_objs.clear() - app = web.Application( - middlewares=[ - exception_middleware, - api_middleware, - ], - ) + app = web.Application(middlewares=[ + exception_middleware, + api_middleware, + ]) root_ctx = RootContext() global_exception_handler = functools.partial(handle_loop_error, root_ctx) loop = asyncio.get_running_loop() loop.set_exception_handler(global_exception_handler) - app["_root.context"] = root_ctx + app['_root.context'] = root_ctx root_ctx.local_config = local_config root_ctx.pidx = pidx root_ctx.cors_options = { - "*": aiohttp_cors.ResourceOptions( + '*': aiohttp_cors.ResourceOptions( allow_credentials=False, - expose_headers="*", - allow_headers="*", - ), + expose_headers="*", allow_headers="*"), } default_scheduler_opts = { - "limit": 2048, - "close_timeout": 30, - "exception_handler": global_exception_handler, + 'limit': 2048, + 'close_timeout': 30, + 'exception_handler': global_exception_handler, } - app["scheduler_opts"] = { + app['scheduler_opts'] = { **default_scheduler_opts, **(scheduler_opts if scheduler_opts is not None else {}), } @@ -553,33 +519,26 @@ def build_root_app( background_task_ctx, ] - async def _cleanup_context_wrapper( - cctx, - app: web.Application, - ) -> AsyncIterator[None]: + async def _cleanup_context_wrapper(cctx, app: web.Application) -> AsyncIterator[None]: # aiohttp's cleanup contexts are just async generators, not async context managers. - cctx_instance = cctx(app["_root.context"]) - app["_cctx_instances"].append(cctx_instance) + cctx_instance = cctx(app['_root.context']) + app['_cctx_instances'].append(cctx_instance) try: async with cctx_instance: yield except Exception as e: exc_info = (type(e), e, e.__traceback__) - log.error( - "Error initializing cleanup_contexts: {0}", - cctx.__name__, - exc_info=exc_info, - ) + log.error('Error initializing cleanup_contexts: {0}', cctx.__name__, exc_info=exc_info) async def _call_cleanup_context_shutdown_handlers(app: web.Application) -> None: - for cctx in app["_cctx_instances"]: - if hasattr(cctx, "shutdown"): + for cctx in app['_cctx_instances']: + if hasattr(cctx, 'shutdown'): try: await cctx.shutdown() except Exception: log.exception("error while shutting down a cleanup context") - app["_cctx_instances"] = [] + app['_cctx_instances'] = [] app.on_shutdown.append(_call_cleanup_context_shutdown_handlers) for cleanup_ctx in cleanup_contexts: app.cleanup_ctx.append( @@ -587,15 +546,15 @@ async def _call_cleanup_context_shutdown_handlers(app: web.Application) -> None: ) cors = aiohttp_cors.setup(app, defaults=root_ctx.cors_options) # should be done in create_app() in other modules. - cors.add(app.router.add_route("GET", r"", hello)) - cors.add(app.router.add_route("GET", r"/", hello)) + cors.add(app.router.add_route('GET', r'', hello)) + cors.add(app.router.add_route('GET', r'/', hello)) if subapp_pkgs is None: subapp_pkgs = [] for pkg_name in subapp_pkgs: if pidx == 0: - log.info("Loading module: {0}", pkg_name[1:]) - subapp_mod = importlib.import_module(pkg_name, "ai.backend.manager.api") - init_subapp(pkg_name, app, getattr(subapp_mod, "create_app")) + log.info('Loading module: {0}', pkg_name[1:]) + subapp_mod = importlib.import_module(pkg_name, 'ai.backend.manager.api') + init_subapp(pkg_name, app, getattr(subapp_mod, 'create_app')) return app @@ -606,34 +565,31 @@ async def server_main( _args: List[Any], ) -> AsyncIterator[None]: subapp_pkgs = [ - ".etcd", - ".events", - ".auth", - ".ratelimit", - ".vfolder", - ".filebrowser", - ".admin", - ".session", - ".stream", - ".manager", - ".resource", - ".scaling_group", - ".cluster_template", - ".session_template", - ".image", - ".userconfig", - ".domainconfig", - ".groupconfig", - ".logs", + '.etcd', '.events', + '.auth', '.ratelimit', + '.vfolder', '.admin', + '.filebrowser', + '.session', + '.stream', + '.manager', + '.resource', + '.scaling_group', + '.cluster_template', + '.session_template', + '.image', + '.userconfig', + '.domainconfig', + '.groupconfig', + '.logs', ] root_app = build_root_app(pidx, _args[0], subapp_pkgs=subapp_pkgs) - root_ctx: RootContext = root_app["_root.context"] + root_ctx: RootContext = root_app['_root.context'] # Start aiomonitor. # Port is set by config (default=50001). m = aiomonitor.Monitor( loop, - port=root_ctx.local_config["manager"]["aiomonitor-port"] + pidx, + port=root_ctx.local_config['manager']['aiomonitor-port'] + pidx, console_enabled=False, ) m.prompt = f"monitor (manager[{pidx}@{os.getpid()}]) >>> " @@ -647,16 +603,16 @@ async def server_main( webapp_plugin_ctx(root_app), ): ssl_ctx = None - if root_ctx.local_config["manager"]["ssl-enabled"]: + if root_ctx.local_config['manager']['ssl-enabled']: ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_ctx.load_cert_chain( - str(root_ctx.local_config["manager"]["ssl-cert"]), - str(root_ctx.local_config["manager"]["ssl-privkey"]), + str(root_ctx.local_config['manager']['ssl-cert']), + str(root_ctx.local_config['manager']['ssl-privkey']), ) runner = web.AppRunner(root_app, keepalive_timeout=30.0) await runner.setup() - service_addr = root_ctx.local_config["manager"]["service-addr"] + service_addr = root_ctx.local_config['manager']['service-addr'] site = web.TCPSite( runner, str(service_addr.host), @@ -668,24 +624,21 @@ async def server_main( await site.start() if os.geteuid() == 0: - uid = root_ctx.local_config["manager"]["user"] - gid = root_ctx.local_config["manager"]["group"] - os.setgroups( - [ - g.gr_gid - for g in grp.getgrall() - if pwd.getpwuid(uid).pw_name in g.gr_mem - ], - ) + uid = root_ctx.local_config['manager']['user'] + gid = root_ctx.local_config['manager']['group'] + os.setgroups([ + g.gr_gid for g in grp.getgrall() + if pwd.getpwuid(uid).pw_name in g.gr_mem + ]) os.setgid(gid) os.setuid(uid) - log.info("changed process uid and gid to {}:{}", uid, gid) - log.info("started handling API requests at {}", service_addr) + log.info('changed process uid and gid to {}:{}', uid, gid) + log.info('started handling API requests at {}', service_addr) try: yield finally: - log.info("shutting down...") + log.info('shutting down...') await runner.cleanup() @@ -697,7 +650,7 @@ async def server_main_logwrapper( ) -> AsyncIterator[None]: setproctitle(f"backend.ai: manager worker-{pidx}") log_endpoint = _args[1] - logger = Logger(_args[0]["logging"], is_master=False, log_endpoint=log_endpoint) + logger = Logger(_args[0]['logging'], is_master=False, log_endpoint=log_endpoint) try: with logger: async with server_main(loop, pidx, _args): @@ -707,19 +660,10 @@ async def server_main_logwrapper( @click.group(invoke_without_command=True) -@click.option( - "-f", - "--config-path", - "--config", - type=Path, - default=None, - help="The config file path. (default: ./manager.toml and /etc/backend.ai/manager.toml)", -) -@click.option( - "--debug", - is_flag=True, - help="Enable the debug mode and override the global log level to DEBUG.", -) +@click.option('-f', '--config-path', '--config', type=Path, default=None, + help='The config file path. (default: ./manager.toml and /etc/backend.ai/manager.toml)') +@click.option('--debug', is_flag=True, + help='Enable the debug mode and override the global log level to DEBUG.') @click.pass_context def main(ctx: click.Context, config_path: Path, debug: bool) -> None: """ @@ -729,45 +673,44 @@ def main(ctx: click.Context, config_path: Path, debug: bool) -> None: cfg = load_config(config_path, debug) if ctx.invoked_subcommand is None: - cfg["manager"]["pid-file"].write_text(str(os.getpid())) - log_sockpath = Path(f"/tmp/backend.ai/ipc/manager-logger-{os.getpid()}.sock") + cfg['manager']['pid-file'].write_text(str(os.getpid())) + log_sockpath = Path(f'/tmp/backend.ai/ipc/manager-logger-{os.getpid()}.sock') log_sockpath.parent.mkdir(parents=True, exist_ok=True) - log_endpoint = f"ipc://{log_sockpath}" + log_endpoint = f'ipc://{log_sockpath}' try: - logger = Logger(cfg["logging"], is_master=True, log_endpoint=log_endpoint) + logger = Logger(cfg['logging'], is_master=True, log_endpoint=log_endpoint) with logger: - ns = cfg["etcd"]["namespace"] + ns = cfg['etcd']['namespace'] setproctitle(f"backend.ai: manager {ns}") - log.info("Backend.AI Manager {0}", __version__) - log.info("runtime: {0}", env_info()) - log_config = logging.getLogger("ai.backend.manager.config") - log_config.debug("debug mode enabled.") - if cfg["manager"]["event-loop"] == "uvloop": + log.info('Backend.AI Manager {0}', __version__) + log.info('runtime: {0}', env_info()) + log_config = logging.getLogger('ai.backend.manager.config') + log_config.debug('debug mode enabled.') + if cfg['manager']['event-loop'] == 'uvloop': import uvloop - uvloop.install() - log.info("Using uvloop as the event loop backend") + log.info('Using uvloop as the event loop backend') try: aiotools.start_server( server_main_logwrapper, - num_workers=cfg["manager"]["num-proc"], + num_workers=cfg['manager']['num-proc'], args=(cfg, log_endpoint), ) finally: - log.info("terminated.") + log.info('terminated.') finally: - if cfg["manager"]["pid-file"].is_file(): + if cfg['manager']['pid-file'].is_file(): # check is_file() to prevent deleting /dev/null! - cfg["manager"]["pid-file"].unlink() + cfg['manager']['pid-file'].unlink() else: # Click is going to invoke a subcommand. pass -@main.group(cls=LazyGroup, import_name="ai.backend.manager.api.auth:cli") +@main.group(cls=LazyGroup, import_name='ai.backend.manager.api.auth:cli') def auth() -> None: pass -if __name__ == "__main__": +if __name__ == '__main__': sys.exit(main()) From 57fc4a77bc8d222461f1094879cbeab0653e7d91 Mon Sep 17 00:00:00 2001 From: Sergey Leksikov Date: Fri, 25 Mar 2022 19:44:07 +0900 Subject: [PATCH 14/15] fix: fix style --- src/ai/backend/manager/api/filebrowser.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py index 3b0502cd6..996bd45cd 100644 --- a/src/ai/backend/manager/api/filebrowser.py +++ b/src/ai/backend/manager/api/filebrowser.py @@ -78,7 +78,6 @@ async def create_or_update_filebrowser( headers = {} headers["X-BackendAI-Storage-Auth-Token"] = proxy_info.secret - try: async with proxy_info.session.request( "POST", From 11c91fc08104e7fc48fab6091e9f0f9107b83496 Mon Sep 17 00:00:00 2001 From: lexikovs Date: Tue, 12 Apr 2022 17:53:33 +0900 Subject: [PATCH 15/15] fix: add support for argument option host volume --- src/ai/backend/manager/api/filebrowser.py | 31 +++++++---------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/src/ai/backend/manager/api/filebrowser.py b/src/ai/backend/manager/api/filebrowser.py index 996bd45cd..99992febe 100644 --- a/src/ai/backend/manager/api/filebrowser.py +++ b/src/ai/backend/manager/api/filebrowser.py @@ -21,34 +21,24 @@ VFolderRow = Mapping[str, Any] -async def get_vfid(root_ctx: RootContext, name: str) -> str: +async def get_vfid(root_ctx: RootContext, host: str, name: str) -> str: async with root_ctx.db.begin_readonly() as conn: query = ( sa.select([vfolders.c.id]) .select_from(vfolders) - .where(vfolders.c.name == name) + .where(vfolders.c.host == host and vfolders.c.name == name) ) folder_id = await conn.scalar(query) return folder_id.hex -async def get_volume(root_ctx: RootContext, vfid: str) -> str: - async with root_ctx.db.begin_readonly() as conn: - query = ( - sa.select([vfolders.c.host]) - .select_from(vfolders) - .where(vfolders.c.id == vfid) - ) - host = await conn.scalar(query) - return host - - @auth_required @server_status_required(READ_ALLOWED) @check_api_params( t.Dict( { + t.Key("host"): t.String, t.Key("vfolders"): t.List(t.String), }, ), @@ -59,23 +49,20 @@ async def create_or_update_filebrowser( ) -> web.Response: root_ctx: RootContext = request.app["_root.context"] - vfolders = [] - - # Search for vfid based on vfolder name. And then get relevant host address and volume. + host = params["host"] for vfolder_name in params["vfolders"]: vfolders.append( - {"name": vfolder_name, "vfid": await get_vfid(root_ctx, vfolder_name)}, + { + "name": vfolder_name, + "vfid": await get_vfid(root_ctx, host, vfolder_name), + }, ) - - host = await get_volume(root_ctx, await get_vfid(root_ctx, vfolder_name)) proxy_name, _ = root_ctx.storage_manager.split_host(host) - try: proxy_info = root_ctx.storage_manager._proxies[proxy_name] except KeyError: raise InvalidArgument("There is no such storage proxy", proxy_name) - headers = {} headers["X-BackendAI-Storage-Auth-Token"] = proxy_info.secret try: @@ -83,7 +70,7 @@ async def create_or_update_filebrowser( "POST", proxy_info.manager_api_url / "storage/filebrowser/create", headers=headers, - json={"vfolders": vfolders}, + json={"host": host, "vfolders": vfolders}, ) as client_resp: return web.json_response(await client_resp.json()) except aiohttp.ClientResponseError: