11from typing import Any , List
22from datetime import datetime , timezone
33from nanoid import generate
4- from azure .cosmos .exceptions import CosmosResourceNotFoundError
54
65from primary .services .database_access .session_access .model import SessionDocument
76from primary .services .database_access ._utils import hash_json_string , cast_query_params
1514 SessionUpdate ,
1615 SessionSortBy ,
1716)
18-
17+ from primary .services .database_access .database_access_exceptions import (
18+ DatabaseAccessError ,
19+ )
20+ from primary .services .database_access .error_converter import raise_service_error_from_database_access
1921
2022# Util dict to handle case insensitive collation
2123CASING_FIELD_LOOKUP : dict [SessionSortBy | None , SessionSortBy ] = {SessionSortBy .TITLE_LOWER : SessionSortBy .TITLE }
@@ -43,14 +45,22 @@ def create(cls, user_id: str) -> "SessionAccess":
4345 return cls (user_id = user_id , session_container_access = session_container_access )
4446
4547 async def get_session_by_id_async (self , session_id : str ) -> SessionDocument :
46- document = await self .session_container_access .get_item_async (item_id = session_id , partition_key = self .user_id )
47- return document
48+ try :
49+ document = await self .session_container_access .get_item_async (
50+ item_id = session_id , partition_key = self .user_id
51+ )
52+ return document
53+ except DatabaseAccessError as e :
54+ raise_service_error_from_database_access (e )
4855
4956 async def get_all_sessions_metadata_for_user_async (self ) -> List [SessionMetadataWithId ]:
50- query = "SELECT * FROM c WHERE c.owner_id = @owner_id"
51- params = cast_query_params ([{"name" : "@owner_id" , "value" : self .user_id }])
52- items = await self .session_container_access .query_items_async (query = query , parameters = params )
53- return [self ._to_metadata_summary (item ) for item in items ]
57+ try :
58+ query = "SELECT * FROM c WHERE c.owner_id = @owner_id"
59+ params = cast_query_params ([{"name" : "@owner_id" , "value" : self .user_id }])
60+ items = await self .session_container_access .query_items_async (query = query , parameters = params )
61+ return [self ._to_metadata_summary (item ) for item in items ]
62+ except DatabaseAccessError as e :
63+ raise_service_error_from_database_access (e )
5464
5565 async def get_filtered_sessions_metadata_for_user_async (
5666 self ,
@@ -59,90 +69,98 @@ async def get_filtered_sessions_metadata_for_user_async(
5969 limit : int | None ,
6070 offset : int | None ,
6171 ) -> List [SessionMetadataWithId ]:
62- sort_by_lowercase = sort_by in CASING_FIELD_LOOKUP .keys ()
63- sort_by = CASING_FIELD_LOOKUP .get (sort_by , sort_by )
72+ try :
73+ sort_by_lowercase = sort_by in CASING_FIELD_LOOKUP .keys ()
74+ sort_by = CASING_FIELD_LOOKUP .get (sort_by , sort_by )
6475
65- collation_options = QueryCollationOptions (
66- sort_lowercase = sort_by_lowercase ,
67- sort_dir = sort_direction ,
68- sort_by = sort_by ,
69- offset = offset ,
70- limit = limit ,
71- )
76+ collation_options = QueryCollationOptions (
77+ sort_lowercase = sort_by_lowercase ,
78+ sort_dir = sort_direction ,
79+ sort_by = sort_by ,
80+ offset = offset ,
81+ limit = limit ,
82+ )
7283
73- query = "SELECT * from c WHERE c.owner_id = @owner_id"
74- params = cast_query_params ([{"name" : "@owner_id" , "value" : self .user_id }])
75- search_options = collation_options .to_sql_query_string ("c.metadata" )
84+ query = "SELECT * from c WHERE c.owner_id = @owner_id"
85+ params = cast_query_params ([{"name" : "@owner_id" , "value" : self .user_id }])
86+ search_options = collation_options .to_sql_query_string ("c.metadata" )
7687
77- if search_options :
78- query = f"{ query } { search_options } "
88+ if search_options :
89+ query = f"{ query } { search_options } "
7990
80- items = await self .session_container_access .query_items_async (query = query , parameters = params )
91+ items = await self .session_container_access .query_items_async (query = query , parameters = params )
8192
82- return [self ._to_metadata_summary (item ) for item in items ]
93+ return [self ._to_metadata_summary (item ) for item in items ]
94+ except DatabaseAccessError as e :
95+ raise_service_error_from_database_access (e )
8396
8497 async def get_session_metadata_async (self , session_id : str ) -> SessionMetadata :
8598 try :
8699 document = await self ._assert_ownership_async (session_id )
87- except ServiceRequestError as err :
88- raise ServiceRequestError (f"Session with id '{ session_id } ' not found." , Service .DATABASE ) from err
89-
90- return document .metadata
100+ return document .metadata
101+ except DatabaseAccessError as e :
102+ raise_service_error_from_database_access (e )
91103
92104 async def insert_session_async (self , new_session : NewSession ) -> str :
93- now = datetime .now (timezone .utc )
94- session_id = str (generate (size = 8 )) # Generate a unique session ID
95- session = SessionDocument (
96- id = session_id ,
97- owner_id = self .user_id ,
98- metadata = SessionMetadata (
99- title = new_session .title ,
100- description = new_session .description ,
101- created_at = now ,
102- updated_at = now ,
103- version = 1 ,
104- hash = hash_json_string (new_session .content ),
105- ),
106- content = new_session .content ,
107- )
108- return await self .session_container_access .insert_item_async (session )
105+ try :
106+ now = datetime .now (timezone .utc )
107+ session_id = str (generate (size = 8 )) # Generate a unique session ID
108+ session = SessionDocument (
109+ id = session_id ,
110+ owner_id = self .user_id ,
111+ metadata = SessionMetadata (
112+ title = new_session .title ,
113+ description = new_session .description ,
114+ created_at = now ,
115+ updated_at = now ,
116+ version = 1 ,
117+ hash = hash_json_string (new_session .content ),
118+ ),
119+ content = new_session .content ,
120+ )
121+ return await self .session_container_access .insert_item_async (session )
122+ except DatabaseAccessError as e :
123+ raise_service_error_from_database_access (e )
109124
110125 async def delete_session_async (self , session_id : str ) -> None :
111126 await self ._assert_ownership_async (session_id )
112127 await self .session_container_access .delete_item_async (session_id , partition_key = self .user_id )
113128
114129 async def update_session_async (self , session_id : str , session_update : SessionUpdate ) -> SessionDocument :
115- existing = await self ._assert_ownership_async (session_id )
130+ try :
131+ existing = await self ._assert_ownership_async (session_id )
116132
117- # Get all explicitly defined changes
118- document_update_dict = session_update .model_dump (exclude_unset = True , exclude = set (["id" ]))
119- metadata_update_dict : dict [str , Any ] = document_update_dict .get ("metadata" , {})
133+ # Get all explicitly defined changes
134+ document_update_dict = session_update .model_dump (exclude_unset = True , exclude = set (["id" ]))
135+ metadata_update_dict : dict [str , Any ] = document_update_dict .get ("metadata" , {})
120136
121- # Early return if there are no changes
122- if not document_update_dict and not metadata_update_dict :
123- return existing
137+ # Early return if there are no changes
138+ if not document_update_dict and not metadata_update_dict :
139+ return existing
124140
125- # Inject computed fields
126- metadata_update_dict .update ({"updated_at" : datetime .now (timezone .utc )})
127- metadata_update_dict .update ({"version" : existing .metadata .version + 1 })
141+ # Inject computed fields
142+ metadata_update_dict .update ({"updated_at" : datetime .now (timezone .utc )})
143+ metadata_update_dict .update ({"version" : existing .metadata .version + 1 })
128144
129- if session_update .content :
130- metadata_update_dict .update ({"hash" : hash_json_string (session_update .content )})
145+ if session_update .content :
146+ metadata_update_dict .update ({"hash" : hash_json_string (session_update .content )})
131147
132- updated_metadata = existing .metadata .model_copy (update = metadata_update_dict )
133- document_update_dict .update ({"metadata" : updated_metadata })
148+ updated_metadata = existing .metadata .model_copy (update = metadata_update_dict )
149+ document_update_dict .update ({"metadata" : updated_metadata })
134150
135- updated_session = existing .model_copy (update = document_update_dict )
151+ updated_session = existing .model_copy (update = document_update_dict )
136152
137- await self .session_container_access .update_item_async (session_id , updated_session )
153+ await self .session_container_access .update_item_async (session_id , updated_session )
138154
139- return updated_session
155+ return updated_session
156+ except DatabaseAccessError as e :
157+ raise_service_error_from_database_access (e )
140158
141159 async def _assert_ownership_async (self , session_id : str ) -> SessionDocument :
142160 try :
143161 session = await self .session_container_access .get_item_async (item_id = session_id , partition_key = self .user_id )
144- except CosmosResourceNotFoundError as err :
145- raise ServiceRequestError ( f"Session with id ' { session_id } ' not found." , Service . DATABASE ) from err
162+ except DatabaseAccessError as e :
163+ raise_service_error_from_database_access ( e )
146164
147165 if session .owner_id != self .user_id :
148166 raise ServiceRequestError (f"You do not have permission to access session '{ session_id } '." , Service .DATABASE )
0 commit comments