Skip to content

Commit 7d1d24c

Browse files
committed
feat: add cloud storage via rclone
1 parent 4685981 commit 7d1d24c

File tree

7 files changed

+145
-63
lines changed

7 files changed

+145
-63
lines changed

bases/renku_data_services/data_api/app.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
142142
nb_config=config.nb_config,
143143
internal_gitlab_authenticator=config.gitlab_authenticator,
144144
git_repo=config.git_repositories_repo,
145+
rp_repo=config.rp_repo,
145146
)
146147
notebooks_new = NotebooksNewBP(
147148
name="notebooks",

components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,15 @@
66
class ICloudStorageRequest(Protocol):
77
"""The abstract class for cloud storage."""
88

9-
exists: bool
109
mount_folder: str
11-
source_folder: str
12-
bucket: str
10+
source_path: str
1311

1412
def get_manifest_patch(
1513
self,
1614
base_name: str,
1715
namespace: str,
18-
labels: dict[str, str] = {},
19-
annotations: dict[str, str] = {},
16+
labels: dict[str, str] | None = None,
17+
annotations: dict[str, str] | None = None,
2018
) -> list[dict[str, Any]]:
2119
"""The patches applied to a jupyter server to insert the storage in the session."""
2220
...

components/renku_data_services/notebooks/api/schemas/cloud_storage.py

Lines changed: 78 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22

33
from configparser import ConfigParser
44
from io import StringIO
5-
from pathlib import Path
6-
from typing import Any, Optional, Self
5+
from pathlib import PurePosixPath
6+
from typing import Any, Final, Optional, Self
77

8+
from kubernetes import client
89
from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema
910

1011
from renku_data_services.base_models import APIUser
1112
from renku_data_services.notebooks.api.classes.cloud_storage import ICloudStorageRequest
1213
from renku_data_services.notebooks.config import _NotebooksConfig
1314

15+
_sanitize_for_serialization = client.ApiClient().sanitize_for_serialization
16+
1417

1518
class RCloneStorageRequest(Schema):
1619
"""Request for RClone based storage."""
@@ -36,6 +39,8 @@ def validate_storage(self, data: dict, **kwargs: dict) -> None:
3639
class RCloneStorage(ICloudStorageRequest):
3740
"""RClone based storage."""
3841

42+
pvc_secret_annotation_name: Final[str] = "csi-rclone.dev/secretName"
43+
3944
def __init__(
4045
self,
4146
source_path: str,
@@ -60,7 +65,7 @@ async def storage_from_schema(
6065
user: APIUser,
6166
internal_gitlab_user: APIUser,
6267
project_id: int,
63-
work_dir: Path,
68+
work_dir: PurePosixPath,
6469
config: _NotebooksConfig,
6570
) -> Self:
6671
"""Create storage object from request."""
@@ -92,8 +97,73 @@ async def storage_from_schema(
9297
await config.storage_validator.validate_storage_configuration(configuration, source_path)
9398
return cls(source_path, configuration, readonly, mount_folder, name, config)
9499

100+
def pvc(
101+
self,
102+
base_name: str,
103+
namespace: str,
104+
labels: dict[str, str] | None = None,
105+
annotations: dict[str, str] | None = None,
106+
) -> client.V1PersistentVolumeClaim:
107+
"""The PVC for mounting cloud storage."""
108+
return client.V1PersistentVolumeClaim(
109+
metadata=client.V1ObjectMeta(
110+
name=base_name,
111+
namespace=namespace,
112+
annotations={self.pvc_secret_annotation_name: base_name} | (annotations or {}),
113+
labels={"name": base_name} | (labels or {}),
114+
),
115+
spec=client.V1PersistentVolumeClaimSpec(
116+
access_modes=["ReadOnlyMany" if self.readonly else "ReadWriteMany"],
117+
resources=client.V1VolumeResourceRequirements(requests={"storage": "10Gi"}),
118+
storage_class_name=self.config.cloud_storage.storage_class,
119+
),
120+
)
121+
122+
def volume_mount(self, base_name: str) -> client.V1VolumeMount:
123+
"""The volume mount for cloud storage."""
124+
return client.V1VolumeMount(
125+
mount_path=self.mount_folder,
126+
name=base_name,
127+
read_only=self.readonly,
128+
)
129+
130+
def volume(self, base_name: str) -> client.V1Volume:
131+
"""The volume entry for the statefulset specification."""
132+
return client.V1Volume(
133+
name=base_name,
134+
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
135+
claim_name=base_name, read_only=self.readonly
136+
),
137+
)
138+
139+
def secret(
140+
self,
141+
base_name: str,
142+
namespace: str,
143+
labels: dict[str, str] | None = None,
144+
annotations: dict[str, str] | None = None,
145+
) -> client.V1Secret:
146+
"""The secret containing the configuration for the rclone csi driver."""
147+
return client.V1Secret(
148+
metadata=client.V1ObjectMeta(
149+
name=base_name,
150+
namespace=namespace,
151+
annotations=annotations,
152+
labels={"name": base_name} | (labels or {}),
153+
),
154+
string_data={
155+
"remote": self.name or base_name,
156+
"remotePath": self.source_path,
157+
"configData": self.config_string(self.name or base_name),
158+
},
159+
)
160+
95161
def get_manifest_patch(
96-
self, base_name: str, namespace: str, labels: dict = {}, annotations: dict = {}
162+
self,
163+
base_name: str,
164+
namespace: str,
165+
labels: dict[str, str] | None = None,
166+
annotations: dict[str, str] | None = None,
97167
) -> list[dict[str, Any]]:
98168
"""Get server manifest patch."""
99169
patches = []
@@ -104,57 +174,22 @@ def get_manifest_patch(
104174
{
105175
"op": "add",
106176
"path": f"/{base_name}-pv",
107-
"value": {
108-
"apiVersion": "v1",
109-
"kind": "PersistentVolumeClaim",
110-
"metadata": {
111-
"name": base_name,
112-
"labels": {"name": base_name},
113-
},
114-
"spec": {
115-
"accessModes": ["ReadOnlyMany" if self.readonly else "ReadWriteMany"],
116-
"resources": {"requests": {"storage": "10Gi"}},
117-
"storageClassName": self.config.cloud_storage.storage_class,
118-
},
119-
},
177+
"value": _sanitize_for_serialization(self.pvc(base_name, namespace, labels, annotations)),
120178
},
121179
{
122180
"op": "add",
123181
"path": f"/{base_name}-secret",
124-
"value": {
125-
"apiVersion": "v1",
126-
"kind": "Secret",
127-
"metadata": {
128-
"name": base_name,
129-
"labels": {"name": base_name},
130-
},
131-
"type": "Opaque",
132-
"stringData": {
133-
"remote": self.name or base_name,
134-
"remotePath": self.source_path,
135-
"configData": self.config_string(self.name or base_name),
136-
},
137-
},
182+
"value": _sanitize_for_serialization(self.secret(base_name, namespace, labels, annotations)),
138183
},
139184
{
140185
"op": "add",
141186
"path": "/statefulset/spec/template/spec/containers/0/volumeMounts/-",
142-
"value": {
143-
"mountPath": self.mount_folder,
144-
"name": base_name,
145-
"readOnly": self.readonly,
146-
},
187+
"value": _sanitize_for_serialization(self.volume_mount(base_name)),
147188
},
148189
{
149190
"op": "add",
150191
"path": "/statefulset/spec/template/spec/volumes/-",
151-
"value": {
152-
"name": base_name,
153-
"persistentVolumeClaim": {
154-
"claimName": base_name,
155-
"readOnly": self.readonly,
156-
},
157-
},
192+
"value": _sanitize_for_serialization(self.volume(base_name)),
158193
},
159194
],
160195
}

components/renku_data_services/notebooks/blueprints.py

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from dataclasses import dataclass
88
from datetime import UTC, datetime
99
from math import floor
10-
from pathlib import Path
10+
from pathlib import PurePosixPath
1111
from typing import Any
1212
from urllib.parse import urljoin, urlparse
1313

@@ -55,6 +55,7 @@
5555
Authentication,
5656
AuthenticationType,
5757
Culling,
58+
DataSource,
5859
ExtraContainer,
5960
ExtraVolume,
6061
ExtraVolumeMount,
@@ -64,7 +65,8 @@
6465
Resources,
6566
SecretAsVolume,
6667
SecretAsVolumeItem,
67-
SecretRef,
68+
SecretRefKey,
69+
SecretRefWhole,
6870
Session,
6971
SessionEnvItem,
7072
State,
@@ -86,6 +88,7 @@
8688
from renku_data_services.project.db import ProjectRepository
8789
from renku_data_services.repositories.db import GitRepositoriesRepository
8890
from renku_data_services.session.db import SessionRepository
91+
from renku_data_services.storage.db import StorageV2Repository
8992

9093

9194
@dataclass(kw_only=True)
@@ -415,7 +418,7 @@ async def launch_notebook_helper(
415418
if lfs_auto_fetch is not None:
416419
parsed_server_options.lfs_auto_fetch = lfs_auto_fetch
417420

418-
image_work_dir = image_repo.image_workdir(parsed_image) or Path("/")
421+
image_work_dir = image_repo.image_workdir(parsed_image) or PurePosixPath("/")
419422
mount_path = image_work_dir / "work"
420423

421424
server_work_dir = mount_path / gl_project_path
@@ -430,7 +433,7 @@ async def launch_notebook_helper(
430433
cstorage.model_dump(),
431434
user=user,
432435
project_id=gl_project_id,
433-
work_dir=server_work_dir.absolute(),
436+
work_dir=server_work_dir,
434437
config=nb_config,
435438
internal_gitlab_user=internal_gitlab_user,
436439
)
@@ -774,6 +777,7 @@ class NotebooksNewBP(CustomBlueprint):
774777
project_repo: ProjectRepository
775778
session_repo: SessionRepository
776779
rp_repo: ResourcePoolRepository
780+
storage_repo: StorageV2Repository
777781

778782
def start(self) -> BlueprintFactoryResponse:
779783
"""Start a session with the new operator."""
@@ -805,17 +809,49 @@ async def _handler(
805809
parsed_server_options = await self.nb_config.crc_validator.validate_class_storage(
806810
user, resource_class_id, body.disk_storage
807811
)
808-
work_dir = Path("/home/jovyan/work")
812+
work_dir = environment.working_directory
809813
user_secrets: K8sUserSecrets | None = None
810814
# if body.user_secrets:
811815
# user_secrets = K8sUserSecrets(
812816
# name=server_name,
813817
# user_secret_ids=body.user_secrets.user_secret_ids,
814818
# mount_path=body.user_secrets.mount_path,
815819
# )
816-
cloud_storage: list[RCloneStorage] = []
820+
cloud_storages_db = await self.storage_repo.get_storage(
821+
user=user, project_id=project.id, include_secrets=True
822+
)
823+
cloud_storage: dict[str, RCloneStorage] = {
824+
str(s.storage_id): RCloneStorage(
825+
source_path=s.source_path,
826+
mount_folder=(work_dir / s.target_path).as_posix(),
827+
configuration=s.configuration.model_dump(mode="python"),
828+
readonly=s.readonly,
829+
config=self.nb_config,
830+
name=s.name,
831+
)
832+
for s in cloud_storages_db
833+
}
834+
cloud_storage_request: dict[str, RCloneStorage] = {
835+
s.storage_id: RCloneStorage(
836+
source_path=s.source_path,
837+
mount_folder=(work_dir / s.target_path).as_posix(),
838+
configuration=s.configuration,
839+
readonly=s.readonly,
840+
config=self.nb_config,
841+
name=None,
842+
)
843+
for s in body.cloudstorage or []
844+
if s.storage_id is not None
845+
}
846+
# NOTE: Check the cloud storage in the request body and if any match
847+
# then overwrite the projects cloud storages
848+
# NOTE: Cloud storages in the session launch request body that are not form the DB are ignored
849+
for csr_id, csr in cloud_storage_request.items():
850+
if csr_id in cloud_storage:
851+
cloud_storage[csr_id] = csr
817852
# repositories = [Repository(i.url, branch=i.branch, commit_sha=i.commit_sha) for i in body.repositories]
818853
repositories = [Repository(url=i) for i in project.repositories]
854+
secrets_to_create: list[V1Secret] = []
819855
server = Renku2UserServer(
820856
user=user,
821857
image=image,
@@ -825,7 +861,7 @@ async def _handler(
825861
server_options=parsed_server_options,
826862
environment_variables={},
827863
user_secrets=user_secrets,
828-
cloudstorage=cloud_storage,
864+
cloudstorage=[i for i in cloud_storage.values()],
829865
k8s_client=self.nb_config.k8s_v2_client,
830866
workspace_mount_path=work_dir,
831867
work_dir=work_dir,
@@ -835,6 +871,14 @@ async def _handler(
835871
is_image_private=False,
836872
internal_gitlab_user=internal_gitlab_user,
837873
)
874+
# Generate the cloud starge secrets
875+
data_sources: list[DataSource] = []
876+
for ics, cs in enumerate(cloud_storage.values()):
877+
secret_name = f"{server_name}-ds-{ics}"
878+
secrets_to_create.append(cs.secret(secret_name, server.k8s_client.preferred_namespace))
879+
data_sources.append(
880+
DataSource(mountPath=cs.mount_folder, secretRef=SecretRefWhole(name=secret_name, adopt=True))
881+
)
838882
cert_init, cert_vols = init_containers.certificates_container(self.nb_config)
839883
session_init_containers = [InitContainer.model_validate(self.nb_config.k8s_v2_client.sanitize(cert_init))]
840884
extra_volumes = [ExtraVolume.model_validate(self.nb_config.k8s_v2_client.sanitize(i)) for i in cert_vols]
@@ -868,7 +912,6 @@ async def _handler(
868912
metadata=Metadata(name=server_name, annotations=annotations),
869913
spec=AmaltheaSessionSpec(
870914
codeRepositories=[],
871-
dataSources=[],
872915
hibernated=False,
873916
session=Session(
874917
image=image,
@@ -915,13 +958,14 @@ async def _handler(
915958
type=AuthenticationType.oauth2proxy
916959
if isinstance(user, AuthenticatedAPIUser)
917960
else AuthenticationType.token,
918-
secretRef=SecretRef(name=server_name, key="auth", adopt=True),
961+
secretRef=SecretRefKey(name=server_name, key="auth", adopt=True),
919962
extraVolumeMounts=[
920963
ExtraVolumeMount(name="renku-authorized-emails", mountPath="/authorized_emails")
921964
]
922965
if isinstance(user, AuthenticatedAPIUser)
923966
else [],
924967
),
968+
dataSources=data_sources,
925969
),
926970
)
927971
parsed_proxy_url = urlparse(urljoin(server.server_url + "/", "oauth2"))
@@ -952,12 +996,14 @@ async def _handler(
952996
"verbose": True,
953997
}
954998
)
955-
secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)
956-
secret = await self.nb_config.k8s_v2_client.create_secret(secret)
999+
secrets_to_create.append(V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data))
1000+
for s in secrets_to_create:
1001+
await self.nb_config.k8s_v2_client.create_secret(s)
9571002
try:
9581003
manifest = await self.nb_config.k8s_v2_client.create_server(manifest, user.id)
9591004
except Exception:
960-
await self.nb_config.k8s_v2_client.delete_secret(secret.metadata.name)
1005+
for s in secrets_to_create:
1006+
await self.nb_config.k8s_v2_client.delete_secret(s.metadata.name)
9611007
raise errors.ProgrammingError(message="Could not start the amalthea session")
9621008

9631009
return json(manifest.as_apispec().model_dump(mode="json", exclude_none=True), 201)
@@ -1074,6 +1120,6 @@ async def _handler(
10741120
query: apispec.SessionsSessionIdLogsGetParametersQuery,
10751121
) -> HTTPResponse:
10761122
logs = await self.nb_config.k8s_v2_client.get_server_logs(session_id, user.id, query.max_lines)
1077-
return json(apispec.SessionLogsResponse.model_validate(logs).model_dump_json(exclude_none=True))
1123+
return json(apispec.SessionLogsResponse.model_validate(logs).model_dump(exclude_none=True))
10781124

10791125
return "/sessions/<session_id>/logs", ["GET"], _handler

components/renku_data_services/notebooks/cr_amalthea_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# generated by datamodel-codegen:
22
# filename: <stdin>
3-
# timestamp: 2024-09-04T22:45:28+00:00
3+
# timestamp: 2024-09-04T21:22:45+00:00
44

55
from __future__ import annotations
66

0 commit comments

Comments
 (0)