-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refresh drives config credentials using a timer #78
Changes from all commits
8ad9505
5100784
5165ae0
b6ba60c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,9 +25,14 @@ | |
|
||
import re | ||
|
||
from tornado.ioloop import PeriodicCallback | ||
|
||
# constant used as suffix to deal with directory objects | ||
EMPTY_DIR_SUFFIX = '/.jupyter_drives_fix_dir' | ||
|
||
# 15 minutes | ||
CREDENTIALS_REFRESH = 15 * 60 * 1000 | ||
|
||
class JupyterDrivesManager(): | ||
""" | ||
Jupyter-drives manager class. | ||
|
@@ -46,21 +51,12 @@ def __init__(self, config: traitlets.config.Config) -> None: | |
self._client = httpx.AsyncClient() | ||
self._content_managers = {} | ||
self._max_files_listed = 1025 | ||
self._drives = None | ||
|
||
# instate fsspec file system | ||
self._file_system = fsspec.filesystem(self._config.provider, asynchronous=True) | ||
|
||
# initiate aiobotocore session if we are dealing with S3 drives | ||
if self._config.provider == 's3': | ||
if self._config.access_key_id and self._config.secret_access_key: | ||
self._s3_clients = {} | ||
self._s3_session = get_session() | ||
self._file_system = s3fs.S3FileSystem(anon=False, asynchronous=True, key=self._config.access_key_id, secret=self._config.secret_access_key, token=self._config.session_token) | ||
else: | ||
raise tornado.web.HTTPError( | ||
status_code= httpx.codes.BAD_REQUEST, | ||
reason="No credentials specified. Please set them in your user jupyter_server_config file.", | ||
) | ||
self._initialize_credentials_refresh() | ||
|
||
@property | ||
def base_api_url(self) -> str: | ||
|
@@ -81,6 +77,83 @@ def per_page_argument(self) -> Optional[Tuple[str, int]]: | |
""" | ||
return ("per_page", 100) | ||
|
||
def _initialize_credentials_refresh(self): | ||
self._drives_refresh_callback() | ||
if not self._config.credentials_already_set: | ||
self._drives_refresh_timer = PeriodicCallback( | ||
self._drives_refresh_callback, CREDENTIALS_REFRESH | ||
) | ||
self._drives_refresh_timer.start() | ||
|
||
def _drives_refresh_callback(self): | ||
self._config.load_credentials() | ||
self._initialize_s3_file_system() | ||
self._initialize_drives() | ||
self._initialize_content_managers() | ||
|
||
def _initialize_s3_file_system(self): | ||
# initiate aiobotocore session if we are dealing with S3 drives | ||
if self._config.provider == 's3': | ||
if self._config.access_key_id and self._config.secret_access_key: | ||
self._s3_session = get_session() | ||
self._file_system = s3fs.S3FileSystem( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. according to s3fs page - According to this github issue - nsidc/earthaccess#765 it may not work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from my understanding ideally it should work, however with the current implementation I see that we are explicitly providing the access key and token, so from my understanding S3FS will respect these values and won't refresh unless we do it explicitly again. |
||
anon=False, | ||
asynchronous=True, | ||
key=self._config.access_key_id, | ||
secret=self._config.secret_access_key, | ||
token=self._config.session_token, | ||
) | ||
else: | ||
raise tornado.web.HTTPError( | ||
status_code=httpx.codes.BAD_REQUEST, | ||
reason="No credentials specified. Please set them in your user jupyter_server_config file.", | ||
) | ||
|
||
def _initialize_drives(self): | ||
if self._config.provider == "s3": | ||
S3Drive = get_driver(Provider.S3) | ||
self._drives = [S3Drive(self._config.access_key_id, self._config.secret_access_key, True, None, None, None, self._config.session_token)] | ||
elif self._config.provider == 'gcs': | ||
GCSDrive = get_driver(Provider.GOOGLE_STORAGE) | ||
self._drives = [GCSDrive(self._config.access_key_id, self._config.secret_access_key)] # verfiy credentials needed | ||
|
||
def _initialize_content_managers(self): | ||
for drive_name, content_manager in self._content_managers.items(): | ||
self._initialize_content_manager(drive_name, content_manager["provider"], content_manager["location"]) | ||
|
||
def _initialize_content_manager(self, drive_name, provider, region=None): | ||
try: | ||
if provider == 's3': | ||
if self._config.session_token is None: | ||
configuration = { | ||
"aws_access_key_id": self._config.access_key_id, | ||
"aws_secret_access_key": self._config.secret_access_key, | ||
"aws_region": region, | ||
} | ||
else: | ||
configuration = { | ||
"aws_access_key_id": self._config.access_key_id, | ||
"aws_secret_access_key": self._config.secret_access_key, | ||
"aws_session_token": self._config.session_token, | ||
"aws_region": region, | ||
} | ||
store = obs.store.S3Store.from_url("s3://" + drive_name + "/", config = configuration) | ||
elif provider == 'gcs': | ||
store = obs.store.GCSStore.from_url("gs://" + drive_name + "/", config = {}) # add gcs config | ||
elif provider == 'http': | ||
store = obs.store.HTTPStore.from_url(drive_name, client_options = {}) # add http client config | ||
|
||
self._content_managers[drive_name] = { | ||
"store": store, | ||
"location": region, | ||
"provider": provider, | ||
} | ||
except Exception as e: | ||
raise tornado.web.HTTPError( | ||
status_code=httpx.codes.BAD_REQUEST, | ||
reason=f"The following error occured when initializing the content manager: {e}", | ||
) | ||
|
||
def set_listing_limit(self, new_limit): | ||
"""Set new limit for listing. | ||
|
||
|
@@ -105,23 +178,21 @@ async def list_drives(self): | |
""" | ||
data = [] | ||
if self._config.access_key_id and self._config.secret_access_key: | ||
if self._config.provider == "s3": | ||
S3Drive = get_driver(Provider.S3) | ||
drives = [S3Drive(self._config.access_key_id, self._config.secret_access_key, True, None, None, None, self._config.session_token)] | ||
|
||
elif self._config.provider == 'gcs': | ||
GCSDrive = get_driver(Provider.GOOGLE_STORAGE) | ||
drives = [GCSDrive(self._config.access_key_id, self._config.secret_access_key)] # verfiy credentials needed | ||
|
||
else: | ||
if self._drives is None: | ||
raise tornado.web.HTTPError( | ||
status_code= httpx.codes.NOT_IMPLEMENTED, | ||
reason="Listing drives not supported for given provider.", | ||
) | ||
|
||
results = [] | ||
for drive in drives: | ||
results += drive.list_containers() | ||
for drive in self._drives: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are changing the behavior of list drives function. Is this ok ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. verified that this is okay and also I believe it is meant to be stored for subsequent calls, so it should be an improvement. Only callout would be do the connections get correctly disposed when we refresh to get a new instance and discard the old one. |
||
try: | ||
results += drive.list_containers() | ||
except Exception as e: | ||
raise tornado.web.HTTPError( | ||
status_code=httpx.codes.BAD_REQUEST, | ||
reason=f"The following error occured when listing drives: {e}", | ||
) | ||
|
||
for result in results: | ||
data.append( | ||
|
@@ -150,42 +221,10 @@ async def mount_drive(self, drive_name, provider): | |
Args: | ||
drive_name: name of drive to mount | ||
""" | ||
try: | ||
# check if content manager doesn't already exist | ||
if drive_name not in self._content_managers or self._content_managers[drive_name] is None: | ||
if provider == 's3': | ||
# get region of drive | ||
region = await self._get_drive_location(drive_name) | ||
if self._config.session_token is None: | ||
configuration = { | ||
"aws_access_key_id": self._config.access_key_id, | ||
"aws_secret_access_key": self._config.secret_access_key, | ||
"aws_region": region | ||
} | ||
else: | ||
configuration = { | ||
"aws_access_key_id": self._config.access_key_id, | ||
"aws_secret_access_key": self._config.secret_access_key, | ||
"aws_session_token": self._config.session_token, | ||
"aws_region": region | ||
} | ||
store = obs.store.S3Store.from_url("s3://" + drive_name + "/", config = configuration) | ||
elif provider == 'gcs': | ||
store = obs.store.GCSStore.from_url("gs://" + drive_name + "/", config = {}) # add gcs config | ||
elif provider == 'http': | ||
store = obs.store.HTTPStore.from_url(drive_name, client_options = {}) # add http client config | ||
|
||
self._content_managers[drive_name] = { | ||
"store": store, | ||
"location": region | ||
} | ||
|
||
else: | ||
raise tornado.web.HTTPError( | ||
status_code= httpx.codes.CONFLICT, | ||
reason= "Drive already mounted." | ||
) | ||
|
||
try: | ||
if provider == 's3': | ||
region = await self._get_drive_location(drive_name) | ||
self._initialize_content_manager(drive_name, provider, region) | ||
except Exception as e: | ||
raise tornado.web.HTTPError( | ||
status_code= httpx.codes.BAD_REQUEST, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is
JupyterDrivesManager
initialized multiple times ? This will need some deeper refactoring to make sure we dont have multiple refresh callbacks are not invoked for each instance ofJupyterDrivesManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JupyterDrivesManager gets initialized only once