From 971643b99abe9457d46843c462bbd946a1186cd2 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Tue, 9 Sep 2025 15:44:04 +0800 Subject: [PATCH] enhance: Remove all annoying logs in MilvusClient See also: #2909 Signed-off-by: yangxuan --- pymilvus/milvus_client/_utils.py | 13 +- pymilvus/milvus_client/async_milvus_client.py | 460 +++++------------- pymilvus/milvus_client/milvus_client.py | 219 +++------ 3 files changed, 206 insertions(+), 486 deletions(-) diff --git a/pymilvus/milvus_client/_utils.py b/pymilvus/milvus_client/_utils.py index bc6e1a76a..fc0f11d33 100644 --- a/pymilvus/milvus_client/_utils.py +++ b/pymilvus/milvus_client/_utils.py @@ -40,13 +40,6 @@ def create_connection( if connections.has_connection(using): return using - try: - connections.connect( - using, user, password, db_name, token, uri=uri, _async=use_async, **kwargs - ) - except Exception as ex: - logger.error("Failed to create new connection using: %s", using) - raise ex from ex - else: - logger.debug("Created new connection using: %s", using) - return using + connections.connect(using, user, password, db_name, token, uri=uri, _async=use_async, **kwargs) + logger.debug("Created new connection using: %s", using) + return using diff --git a/pymilvus/milvus_client/async_milvus_client.py b/pymilvus/milvus_client/async_milvus_client.py index 70d532612..122871324 100644 --- a/pymilvus/milvus_client/async_milvus_client.py +++ b/pymilvus/milvus_client/async_milvus_client.py @@ -1,4 +1,3 @@ -import logging from typing import Dict, List, Optional, Union from pymilvus.client.abstract import AnnSearchRequest, BaseRanker @@ -12,7 +11,6 @@ ) from pymilvus.exceptions import ( DataTypeNotMatchException, - MilvusException, ParamError, PrimaryKeyException, ) @@ -26,9 +24,6 @@ from .check import validate_param from .index import IndexParam, IndexParams -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) - class AsyncMilvusClient: """AsyncMilvusClient is an EXPERIMENTAL class @@ -126,12 +121,7 @@ async def _fast_create_collection( conn = self._get_connection() if "consistency_level" not in kwargs: kwargs["consistency_level"] = DEFAULT_CONSISTENCY_LEVEL - try: - await conn.create_collection(collection_name, schema, timeout=timeout, **kwargs) - logger.debug("Successfully created collection: %s", collection_name) - except Exception as ex: - logger.error("Failed to create collection: %s", collection_name) - raise ex from ex + await conn.create_collection(collection_name, schema, timeout=timeout, **kwargs) index_params = IndexParams() index_params.add_index(vector_field_name, index_type="AUTOINDEX", metric_type=metric_type) @@ -151,12 +141,7 @@ async def _create_collection_with_schema( conn = self._get_connection() if "consistency_level" not in kwargs: kwargs["consistency_level"] = DEFAULT_CONSISTENCY_LEVEL - try: - await conn.create_collection(collection_name, schema, timeout=timeout, **kwargs) - logger.debug("Successfully created collection: %s", collection_name) - except Exception as ex: - logger.error("Failed to create collection: %s", collection_name) - raise ex from ex + await conn.create_collection(collection_name, schema, timeout=timeout, **kwargs) if index_params: await self.create_index(collection_name, index_params, timeout=timeout) @@ -167,7 +152,6 @@ async def drop_collection( ): conn = self._get_connection() await conn.drop_collection(collection_name, timeout=timeout, **kwargs) - logger.debug("Successfully dropped collection: %s", collection_name) async def rename_collection( self, @@ -179,27 +163,18 @@ async def rename_collection( ): conn = self._get_connection() await conn.rename_collection(old_name, new_name, target_db, timeout=timeout, **kwargs) - logger.debug("Successfully renamed collection from %s to %s", old_name, new_name) async def load_collection( self, collection_name: str, timeout: Optional[float] = None, **kwargs ): conn = self._get_connection() - try: - await conn.load_collection(collection_name, timeout=timeout, **kwargs) - except MilvusException as ex: - logger.error("Failed to load collection: %s", collection_name) - raise ex from ex + await conn.load_collection(collection_name, timeout=timeout, **kwargs) async def release_collection( self, collection_name: str, timeout: Optional[float] = None, **kwargs ): conn = self._get_connection() - try: - await conn.release_collection(collection_name, timeout=timeout, **kwargs) - except MilvusException as ex: - logger.error("Failed to load collection: %s", collection_name) - raise ex from ex + await conn.release_collection(collection_name, timeout=timeout, **kwargs) async def create_index( self, @@ -224,19 +199,14 @@ async def _create_index( **kwargs, ): conn = self._get_connection() - try: - await conn.create_index( - collection_name, - index_param.field_name, - index_param.get_index_configs(), - timeout=timeout, - index_name=index_param.index_name, - **kwargs, - ) - logger.debug("Successfully created an index on collection: %s", collection_name) - except Exception as ex: - logger.error("Failed to create an index on collection: %s", collection_name) - raise ex from ex + await conn.create_index( + collection_name, + index_param.field_name, + index_param.get_index_configs(), + timeout=timeout, + index_name=index_param.index_name, + **kwargs, + ) async def drop_index( self, collection_name: str, index_name: str, timeout: Optional[float] = None, **kwargs @@ -285,25 +255,13 @@ async def has_partition( self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs ) -> bool: conn = self._get_connection() - try: - return await conn.has_partition( - collection_name, partition_name, timeout=timeout, **kwargs - ) - except Exception as ex: - logger.error( - "Failed to check partition existence: %s.%s", collection_name, partition_name - ) - raise ex from ex + return await conn.has_partition(collection_name, partition_name, timeout=timeout, **kwargs) async def list_partitions( self, collection_name: str, timeout: Optional[float] = None, **kwargs ) -> List[str]: conn = self._get_connection() - try: - return await conn.list_partitions(collection_name, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to list partitions: %s", collection_name) - raise ex from ex + return await conn.list_partitions(collection_name, timeout=timeout, **kwargs) async def insert( self, @@ -328,12 +286,9 @@ async def insert( conn = self._get_connection() # Insert into the collection. - try: - res = await conn.insert_rows( - collection_name, data, partition_name=partition_name, timeout=timeout - ) - except Exception as ex: - raise ex from ex + res = await conn.insert_rows( + collection_name, data, partition_name=partition_name, timeout=timeout + ) return OmitZeroDict( { "insert_count": res.insert_count, @@ -413,21 +368,16 @@ async def hybrid_search( **kwargs, ) -> List[List[dict]]: conn = self._get_connection() - try: - res = await conn.hybrid_search( - collection_name, - reqs, - ranker, - limit=limit, - partition_names=partition_names, - output_fields=output_fields, - timeout=timeout, - **kwargs, - ) - except Exception as ex: - logger.error("Failed to hybrid search collection: %s", collection_name) - raise ex from ex - return res + return await conn.hybrid_search( + collection_name, + reqs, + ranker, + limit=limit, + partition_names=partition_names, + output_fields=output_fields, + timeout=timeout, + **kwargs, + ) async def search( self, @@ -443,24 +393,19 @@ async def search( **kwargs, ) -> List[List[dict]]: conn = self._get_connection() - try: - res = await conn.search( - collection_name, - data, - anns_field or "", - search_params or {}, - expression=filter, - limit=limit, - output_fields=output_fields, - partition_names=partition_names, - expr_params=kwargs.pop("filter_params", {}), - timeout=timeout, - **kwargs, - ) - except Exception as ex: - logger.error("Failed to search collection: %s", collection_name) - raise ex from ex - return res + return await conn.search( + collection_name, + data, + anns_field or "", + search_params or {}, + expression=filter, + limit=limit, + output_fields=output_fields, + partition_names=partition_names, + expr_params=kwargs.pop("filter_params", {}), + timeout=timeout, + **kwargs, + ) async def query( self, @@ -484,33 +429,21 @@ async def query( conn = self._get_connection() if ids: - try: - schema_dict = await conn.describe_collection( - collection_name, timeout=timeout, **kwargs - ) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + schema_dict = await conn.describe_collection(collection_name, timeout=timeout, **kwargs) filter = self._pack_pks_expr(schema_dict, ids) if not output_fields: output_fields = ["*"] - try: - res = await conn.query( - collection_name, - expr=filter, - output_fields=output_fields, - partition_names=partition_names, - timeout=timeout, - expr_params=kwargs.pop("filter_params", {}), - **kwargs, - ) - except Exception as ex: - logger.error("Failed to query collection: %s", collection_name) - raise ex from ex - - return res + return await conn.query( + collection_name, + expr=filter, + output_fields=output_fields, + partition_names=partition_names, + timeout=timeout, + expr_params=kwargs.pop("filter_params", {}), + **kwargs, + ) async def get( self, @@ -528,30 +461,20 @@ async def get( return [] conn = self._get_connection() - try: - schema_dict = await conn.describe_collection(collection_name, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + schema_dict = await conn.describe_collection(collection_name, timeout=timeout, **kwargs) if not output_fields: output_fields = ["*"] expr = self._pack_pks_expr(schema_dict, ids) - try: - res = await conn.query( - collection_name, - expr=expr, - output_fields=output_fields, - partition_names=partition_names, - timeout=timeout, - **kwargs, - ) - except Exception as ex: - logger.error("Failed to get collection: %s", collection_name) - raise ex from ex - - return res + return await conn.query( + collection_name, + expr=expr, + output_fields=output_fields, + partition_names=partition_names, + timeout=timeout, + **kwargs, + ) async def delete( self, @@ -591,13 +514,7 @@ async def delete( expr = "" conn = self._get_connection() if len(pks) > 0: - try: - schema_dict = await conn.describe_collection( - collection_name, timeout=timeout, **kwargs - ) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + schema_dict = await conn.describe_collection(collection_name, timeout=timeout, **kwargs) expr = self._pack_pks_expr(schema_dict, pks) else: if not isinstance(filter, str): @@ -605,20 +522,16 @@ async def delete( expr = filter ret_pks = [] - try: - res = await conn.delete( - collection_name=collection_name, - expression=expr, - partition_name=partition_name, - expr_params=kwargs.pop("filter_params", {}), - timeout=timeout, - **kwargs, - ) - if res.primary_keys: - ret_pks.extend(res.primary_keys) - except Exception as ex: - logger.error("Failed to delete primary keys in collection: %s", collection_name) - raise ex from ex + res = await conn.delete( + collection_name=collection_name, + expression=expr, + partition_name=partition_name, + expr_params=kwargs.pop("filter_params", {}), + timeout=timeout, + **kwargs, + ) + if res.primary_keys: + ret_pks.extend(res.primary_keys) # compatible with deletions that returns primary keys if ret_pks: @@ -630,61 +543,39 @@ async def describe_collection( self, collection_name: str, timeout: Optional[float] = None, **kwargs ) -> dict: conn = self._get_connection() - try: - return await conn.describe_collection(collection_name, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + return await conn.describe_collection(collection_name, timeout=timeout, **kwargs) async def has_collection( self, collection_name: str, timeout: Optional[float] = None, **kwargs ) -> bool: conn = self._get_connection() - try: - return await conn.has_collection(collection_name, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to check collection existence: %s", collection_name) - raise ex from ex + return await conn.has_collection(collection_name, timeout=timeout, **kwargs) async def list_collections(self, timeout: Optional[float] = None, **kwargs) -> List[str]: conn = self._get_connection() - try: - return await conn.list_collections(timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to list collections") - raise ex from ex + return await conn.list_collections(timeout=timeout, **kwargs) async def get_collection_stats( self, collection_name: str, timeout: Optional[float] = None, **kwargs ) -> Dict: conn = self._get_connection() - try: - stats = await conn.get_collection_stats(collection_name, timeout=timeout, **kwargs) - result = {stat.key: stat.value for stat in stats} - if "row_count" in result: - result["row_count"] = int(result["row_count"]) - except Exception as ex: - logger.error("Failed to get collection stats: %s", collection_name) - raise ex from ex - else: - return result + stats = await conn.get_collection_stats(collection_name, timeout=timeout, **kwargs) + result = {stat.key: stat.value for stat in stats} + if "row_count" in result: + result["row_count"] = int(result["row_count"]) + return result async def get_partition_stats( self, collection_name: str, partition_name: str, timeout: Optional[float] = None, **kwargs ) -> Dict: conn = self._get_connection() - try: - stats = await conn.get_partition_stats( - collection_name, partition_name, timeout=timeout, **kwargs - ) - result = {stat.key: stat.value for stat in stats} - if "row_count" in result: - result["row_count"] = int(result["row_count"]) - except Exception as ex: - logger.error("Failed to get partition stats: %s.%s", collection_name, partition_name) - raise ex from ex - else: - return result + stats = await conn.get_partition_stats( + collection_name, partition_name, timeout=timeout, **kwargs + ) + result = {stat.key: stat.value for stat in stats} + if "row_count" in result: + result["row_count"] = int(result["row_count"]) + return result async def get_load_state( self, @@ -694,13 +585,9 @@ async def get_load_state( **kwargs, ): conn = self._get_connection() - try: - return await conn.get_load_state( - collection_name, partition_names, timeout=timeout, **kwargs - ) - except Exception as ex: - logger.error("Failed to get load state: %s", collection_name) - raise ex from ex + return await conn.get_load_state( + collection_name, partition_names, timeout=timeout, **kwargs + ) async def refresh_load( self, @@ -710,47 +597,28 @@ async def refresh_load( **kwargs, ): conn = self._get_connection() - try: - return await conn.refresh_load( - collection_name, partition_names, timeout=timeout, **kwargs - ) - except Exception as ex: - logger.error("Failed to refresh load: %s", collection_name) - raise ex from ex + return await conn.refresh_load(collection_name, partition_names, timeout=timeout, **kwargs) async def get_server_version(self, timeout: Optional[float] = None, **kwargs) -> str: conn = self._get_connection() - try: - return await conn.get_server_version(timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to get server version") - raise ex from ex + return await conn.get_server_version(timeout=timeout, **kwargs) async def describe_replica( self, collection_name: str, timeout: Optional[float] = None, **kwargs ): conn = self._get_connection() - try: - return await conn.describe_replica(collection_name, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to describe replica: %s", collection_name) - raise ex from ex + return await conn.describe_replica(collection_name, timeout=timeout, **kwargs) async def alter_collection_properties( self, collection_name: str, properties: dict, timeout: Optional[float] = None, **kwargs ): conn = self._get_connection() - try: - await conn.alter_collection_properties( - collection_name, - properties=properties, - timeout=timeout, - **kwargs, - ) - logger.debug("Successfully altered collection properties: %s", collection_name) - except Exception as ex: - logger.error("Failed to alter collection properties: %s", collection_name) - raise ex from ex + await conn.alter_collection_properties( + collection_name, + properties=properties, + timeout=timeout, + **kwargs, + ) async def drop_collection_properties( self, @@ -760,14 +628,9 @@ async def drop_collection_properties( **kwargs, ): conn = self._get_connection() - try: - await conn.drop_collection_properties( - collection_name, property_keys=property_keys, timeout=timeout, **kwargs - ) - logger.debug("Successfully dropped collection properties: %s", collection_name) - except Exception as ex: - logger.error("Failed to drop collection properties: %s", collection_name) - raise ex from ex + await conn.drop_collection_properties( + collection_name, property_keys=property_keys, timeout=timeout, **kwargs + ) async def alter_collection_field( self, @@ -778,24 +641,13 @@ async def alter_collection_field( **kwargs, ): conn = self._get_connection() - try: - await conn.alter_collection_field( - collection_name, - field_name=field_name, - field_params=field_params, - timeout=timeout, - **kwargs, - ) - logger.debug( - "Successfully altered collection field properties: %s.%s", - collection_name, - field_name, - ) - except Exception as ex: - logger.error( - "Failed to alter collection field properties: %s.%s", collection_name, field_name - ) - raise ex from ex + await conn.alter_collection_field( + collection_name, + field_name=field_name, + field_params=field_params, + timeout=timeout, + **kwargs, + ) async def add_collection_field( self, @@ -808,17 +660,12 @@ async def add_collection_field( ): field_schema = self.create_field_schema(field_name, data_type, desc, **kwargs) conn = self._get_connection() - try: - await conn.add_collection_field( - collection_name, - field_schema, - timeout=timeout, - **kwargs, - ) - logger.debug("Successfully added collection field: %s.%s", collection_name, field_name) - except Exception as ex: - logger.error("Failed to add collection field: %s.%s", collection_name, field_name) - raise ex from ex + await conn.add_collection_field( + collection_name, + field_schema, + timeout=timeout, + **kwargs, + ) @classmethod def create_schema(cls, **kwargs): @@ -1008,11 +855,7 @@ async def describe_user( self, user_name: str, timeout: Optional[float] = None, **kwargs ) -> dict: conn = self._get_connection() - try: - res = await conn.describe_user(user_name, True, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to describe user: %s", user_name) - raise ex from ex + res = await conn.describe_user(user_name, True, timeout=timeout, **kwargs) if hasattr(res, "results") and res.results: user_info = UserInfo(res.results) if user_info.groups: @@ -1044,11 +887,7 @@ async def list_privilege_groups( **kwargs, ) -> List[Dict[str, Union[str, List[str]]]]: conn = self._get_connection() - try: - res = await conn.list_privilege_groups(timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to list privilege groups") - raise ex from ex + res = await conn.list_privilege_groups(timeout=timeout, **kwargs) ret = [] for g in res: privileges = [] @@ -1172,13 +1011,7 @@ async def describe_role( ) -> Dict: conn = self._get_connection() db_name = kwargs.pop("db_name", "") - try: - res = await conn.select_grant_for_one_role( - role_name, db_name, timeout=timeout, **kwargs - ) - except Exception as ex: - logger.error("Failed to describe role: %s", role_name) - raise ex from ex + res = await conn.select_grant_for_one_role(role_name, db_name, timeout=timeout, **kwargs) ret = {} ret["role"] = role_name ret["privileges"] = [dict(i) for i in res.groups] @@ -1186,11 +1019,7 @@ async def describe_role( async def list_roles(self, timeout: Optional[float] = None, **kwargs): conn = self._get_connection() - try: - res = await conn.list_roles(False, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to list roles") - raise ex from ex + res = await conn.list_roles(False, timeout=timeout, **kwargs) role_info = RoleInfo(res) return [g.role_name for g in role_info.groups] @@ -1233,12 +1062,7 @@ async def transfer_replica( async def flush(self, collection_name: str, timeout: Optional[float] = None, **kwargs): conn = self._get_connection() - try: - await conn.flush([collection_name], timeout=timeout, **kwargs) - logger.debug("Successfully flushed collection: %s", collection_name) - except Exception as ex: - logger.error("Failed to flush collection: %s", collection_name) - raise ex from ex + await conn.flush([collection_name], timeout=timeout, **kwargs) async def compact( self, @@ -1248,28 +1072,16 @@ async def compact( **kwargs, ) -> int: conn = self._get_connection() - try: - compaction_id = await conn.compact( - collection_name, is_clustering=is_clustering, timeout=timeout, **kwargs - ) - logger.debug("Successfully started compaction for collection: %s", collection_name) - except Exception as ex: - logger.error("Failed to compact collection: %s", collection_name) - raise ex from ex - else: - return compaction_id + return await conn.compact( + collection_name, is_clustering=is_clustering, timeout=timeout, **kwargs + ) async def get_compaction_state( self, job_id: int, timeout: Optional[float] = None, **kwargs ) -> str: conn = self._get_connection() - try: - result = await conn.get_compaction_state(job_id, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to get compaction state for job: %s", job_id) - raise ex from ex - else: - return result.state_name + result = await conn.get_compaction_state(job_id, timeout=timeout, **kwargs) + return result.state_name async def run_analyzer( self, @@ -1284,18 +1096,14 @@ async def run_analyzer( **kwargs, ): conn = self._get_connection() - try: - return await conn.run_analyzer( - texts, - analyzer_params=analyzer_params, - with_hash=with_hash, - with_detail=with_detail, - collection_name=collection_name, - field_name=field_name, - analyzer_names=analyzer_names, - timeout=timeout, - **kwargs, - ) - except Exception as ex: - logger.error("Failed to run analyzer") - raise ex from ex + return await conn.run_analyzer( + texts, + analyzer_params=analyzer_params, + with_hash=with_hash, + with_detail=with_detail, + collection_name=collection_name, + field_name=field_name, + analyzer_names=analyzer_names, + timeout=timeout, + **kwargs, + ) diff --git a/pymilvus/milvus_client/milvus_client.py b/pymilvus/milvus_client/milvus_client.py index 4f1c0f4db..3101a42d8 100644 --- a/pymilvus/milvus_client/milvus_client.py +++ b/pymilvus/milvus_client/milvus_client.py @@ -135,12 +135,7 @@ def _fast_create_collection( conn = self._get_connection() if "consistency_level" not in kwargs: kwargs["consistency_level"] = DEFAULT_CONSISTENCY_LEVEL - try: - conn.create_collection(collection_name, schema, timeout=timeout, **kwargs) - logger.debug("Successfully created collection: %s", collection_name) - except Exception as ex: - logger.error("Failed to create collection: %s", collection_name) - raise ex from ex + conn.create_collection(collection_name, schema, timeout=timeout, **kwargs) index_params = IndexParams() index_params.add_index(vector_field_name, index_type="AUTOINDEX", metric_type=metric_type) @@ -170,19 +165,14 @@ def _create_index( **kwargs, ): conn = self._get_connection() - try: - conn.create_index( - collection_name, - index_param.field_name, - index_param.get_index_configs(), - timeout=timeout, - index_name=index_param.index_name, - **kwargs, - ) - logger.debug("Successfully created an index on collection: %s", collection_name) - except Exception as ex: - logger.error("Failed to create an index on collection: %s", collection_name) - raise ex from ex + conn.create_index( + collection_name, + index_param.field_name, + index_param.get_index_configs(), + timeout=timeout, + index_name=index_param.index_name, + **kwargs, + ) def insert( self, @@ -351,22 +341,16 @@ def hybrid_search( """ conn = self._get_connection() - try: - res = conn.hybrid_search( - collection_name, - reqs, - ranker, - limit=limit, - partition_names=partition_names, - output_fields=output_fields, - timeout=timeout, - **kwargs, - ) - except Exception as ex: - logger.error("Failed to hybrid search collection: %s", collection_name) - raise ex from ex - - return res + return conn.hybrid_search( + collection_name, + reqs, + ranker, + limit=limit, + partition_names=partition_names, + output_fields=output_fields, + timeout=timeout, + **kwargs, + ) def search( self, @@ -406,26 +390,20 @@ def search( not included in the result data. """ conn = self._get_connection() - try: - res = conn.search( - collection_name, - data, - anns_field or "", - search_params or {}, - expression=filter, - limit=limit, - output_fields=output_fields, - partition_names=partition_names, - expr_params=kwargs.pop("filter_params", {}), - timeout=timeout, - ranker=ranker, - **kwargs, - ) - except Exception as ex: - logger.error("Failed to search collection: %s", collection_name) - raise ex from ex - - return res + return conn.search( + collection_name, + data, + anns_field or "", + search_params or {}, + expression=filter, + limit=limit, + output_fields=output_fields, + partition_names=partition_names, + expr_params=kwargs.pop("filter_params", {}), + timeout=timeout, + ranker=ranker, + **kwargs, + ) def query( self, @@ -465,33 +443,21 @@ def query( conn = self._get_connection() if ids: - try: - schema_dict, _ = conn._get_schema_from_cache_or_remote( - collection_name, timeout=timeout - ) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + schema_dict, _ = conn._get_schema_from_cache_or_remote(collection_name, timeout=timeout) filter = self._pack_pks_expr(schema_dict, ids) if not output_fields: output_fields = ["*"] - try: - res = conn.query( - collection_name, - expr=filter, - output_fields=output_fields, - partition_names=partition_names, - timeout=timeout, - expr_params=kwargs.pop("filter_params", {}), - **kwargs, - ) - except Exception as ex: - logger.error("Failed to query collection: %s", collection_name) - raise ex from ex - - return res + return conn.query( + collection_name, + expr=filter, + output_fields=output_fields, + partition_names=partition_names, + timeout=timeout, + expr_params=kwargs.pop("filter_params", {}), + **kwargs, + ) def query_iterator( self, @@ -509,11 +475,7 @@ def query_iterator( conn = self._get_connection() # set up schema for iterator - try: - schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs) return QueryIterator( connection=conn, @@ -607,19 +569,13 @@ def search_iterator( except ServerVersionIncompatibleException: # for compatibility, return search_iterator V1 logger.warning(ExceptionsMessage.SearchIteratorV2FallbackWarning) - except Exception as ex: - raise ex from ex # following is the old code for search_iterator V1 if filter is not None and not isinstance(filter, str): raise DataTypeNotMatchException(message=ExceptionsMessage.ExprType % type(filter)) # set up schema for iterator - try: - schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + schema_dict = conn.describe_collection(collection_name, timeout=timeout, **kwargs) # if anns_field is not provided # if only one vector field, use to search # if multiple vector fields, raise exception and abort @@ -714,30 +670,20 @@ def get( return [] conn = self._get_connection() - try: - schema_dict, _ = conn._get_schema_from_cache_or_remote(collection_name, timeout=timeout) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + schema_dict, _ = conn._get_schema_from_cache_or_remote(collection_name, timeout=timeout) if not output_fields: output_fields = ["*"] expr = self._pack_pks_expr(schema_dict, ids) - try: - res = conn.query( - collection_name, - expr=expr, - output_fields=output_fields, - partition_names=partition_names, - timeout=timeout, - **kwargs, - ) - except Exception as ex: - logger.error("Failed to get collection: %s", collection_name) - raise ex from ex - - return res + return conn.query( + collection_name, + expr=expr, + output_fields=output_fields, + partition_names=partition_names, + timeout=timeout, + **kwargs, + ) def delete( self, @@ -800,13 +746,7 @@ def delete( expr = "" conn = self._get_connection() if len(pks) > 0: - try: - schema_dict, _ = conn._get_schema_from_cache_or_remote( - collection_name, timeout=timeout - ) - except Exception as ex: - logger.error("Failed to describe collection: %s", collection_name) - raise ex from ex + schema_dict, _ = conn._get_schema_from_cache_or_remote(collection_name, timeout=timeout) expr = self._pack_pks_expr(schema_dict, pks) else: if not isinstance(filter, str): @@ -814,20 +754,16 @@ def delete( expr = filter ret_pks = [] - try: - res = conn.delete( - collection_name=collection_name, - expression=expr, - partition_name=partition_name, - expr_params=kwargs.pop("filter_params", {}), - timeout=timeout, - **kwargs, - ) - if res.primary_keys: - ret_pks.extend(res.primary_keys) - except Exception as ex: - logger.error("Failed to delete primary keys in collection: %s", collection_name) - raise ex from ex + res = conn.delete( + collection_name=collection_name, + expression=expr, + partition_name=partition_name, + expr_params=kwargs.pop("filter_params", {}), + timeout=timeout, + **kwargs, + ) + if res.primary_keys: + ret_pks.extend(res.primary_keys) # compatible with deletions that returns primary keys if ret_pks: @@ -914,12 +850,7 @@ def _create_collection_with_schema( conn = self._get_connection() if "consistency_level" not in kwargs: kwargs["consistency_level"] = DEFAULT_CONSISTENCY_LEVEL - try: - conn.create_collection(collection_name, schema, timeout=timeout, **kwargs) - logger.debug("Successfully created collection: %s", collection_name) - except Exception as ex: - logger.error("Failed to create collection: %s", collection_name) - raise ex from ex + conn.create_collection(collection_name, schema, timeout=timeout, **kwargs) if index_params: self.create_index(collection_name, index_params, timeout=timeout) @@ -959,19 +890,11 @@ def _pack_pks_expr(self, schema_dict: Dict, pks: List) -> str: def load_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): """Loads the collection.""" conn = self._get_connection() - try: - conn.load_collection(collection_name, timeout=timeout, **kwargs) - except MilvusException as ex: - logger.error("Failed to load collection: %s", collection_name) - raise ex from ex + conn.load_collection(collection_name, timeout=timeout, **kwargs) def release_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): conn = self._get_connection() - try: - conn.release_collection(collection_name, timeout=timeout, **kwargs) - except MilvusException as ex: - logger.error("Failed to load collection: %s", collection_name) - raise ex from ex + conn.release_collection(collection_name, timeout=timeout, **kwargs) def get_load_state( self, @@ -1596,11 +1519,7 @@ def list_privilege_groups( MilvusException: If anything goes wrong. """ conn = self._get_connection() - try: - res = conn.list_privilege_groups(timeout=timeout, **kwargs) - except Exception as ex: - logger.exception("Failed to list privilege groups.") - raise ex from ex + res = conn.list_privilege_groups(timeout=timeout, **kwargs) ret = [] for g in res.groups: ret.append({"privilege_group": g.privilege_group, "privileges": g.privileges})