From bf7e159cedad0ad0f19637e8a82a7ddd9c637c4a Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Tue, 12 Aug 2025 13:38:03 +0200 Subject: [PATCH 01/14] Add support for streams --- cognite/client/_api/data_modeling/__init__.py | 2 + cognite/client/_api/data_modeling/streams.py | 176 ++++++++++++++++++ .../data_classes/data_modeling/_validation.py | 2 +- .../data_classes/data_modeling/streams.py | 167 +++++++++++++++++ .../test_data_modeling/test_streams.py | 42 +++++ 5 files changed, 388 insertions(+), 1 deletion(-) create mode 100644 cognite/client/_api/data_modeling/streams.py create mode 100644 cognite/client/data_classes/data_modeling/streams.py create mode 100644 tests/tests_integration/test_api/test_data_modeling/test_streams.py diff --git a/cognite/client/_api/data_modeling/__init__.py b/cognite/client/_api/data_modeling/__init__.py index 2c0ed3782a..4ec09dfb17 100644 --- a/cognite/client/_api/data_modeling/__init__.py +++ b/cognite/client/_api/data_modeling/__init__.py @@ -8,6 +8,7 @@ from cognite.client._api.data_modeling.instances import InstancesAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api_client import APIClient @@ -26,3 +27,4 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client self.instances = InstancesAPI(config, api_version, cognite_client) self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client) self.statistics = StatisticsAPI(config, api_version, cognite_client) + self.streams = StreamsAPI(config, api_version, cognite_client) diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py new file mode 100644 index 0000000000..8b784b57c8 --- /dev/null +++ b/cognite/client/_api/data_modeling/streams.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +from collections.abc import Iterator, Sequence +from typing import TYPE_CHECKING, overload + +from cognite.client._api_client import APIClient +from cognite.client._constants import DEFAULT_LIMIT_READ +from cognite.client.data_classes.data_modeling.streams import Stream, StreamApply, StreamList +from cognite.client.utils._identifier import Identifier + +if TYPE_CHECKING: + from cognite.client import CogniteClient + from cognite.client.config import ClientConfig + + +class StreamsAPI(APIClient): + _RESOURCE_PATH = "/streams" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self._CREATE_LIMIT = 1 + self.__alpha_headers = { + "cdf-version": "alpha", + } + + @overload + def __call__( + self, + chunk_size: None = None, + limit: int | None = None, + ) -> Iterator[Stream]: ... + + @overload + def __call__( + self, + chunk_size: int, + limit: int | None = None, + ) -> Iterator[StreamList]: ... + + def __call__( + self, + chunk_size: int | None = None, + limit: int | None = None, + ) -> Iterator[Stream] | Iterator[StreamList]: + """Iterate over streams + + Fetches streams as they are iterated over, so you keep a limited number of streams in memory. + + Args: + chunk_size (int | None): Number of streams to return in each chunk. Defaults to yielding one stream a time. + limit (int | None): Maximum number of streams to return. Defaults to returning all items. + + Returns: + Iterator[Stream] | Iterator[StreamList]: yields Stream one by one if chunk_size is not specified, else StreamList objects. + """ + return self._list_generator( + list_cls=StreamList, + resource_cls=Stream, + method="GET", + chunk_size=chunk_size, + limit=limit, + headers=self.__alpha_headers, + ) + + def __iter__(self) -> Iterator[Stream]: + """Iterate over streams + + Fetches streams as they are iterated over, so you keep a limited number of streams in memory. + + Returns: + Iterator[Stream]: yields Streams one by one. + """ + return self() + + def retrieve(self, external_id: str) -> Stream | None: + """`Retrieve a stream. `_ + + Args: + external_id (str): No description. + + Returns: + Stream | None: Requested stream or None if it does not exist. + + Examples: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> res = client.data_modeling.streams.retrieve(streams='myStream') + + Get multiple streams by id: + + >>> res = client.data_modeling.streams.retrieve(streams=["MyStream", "MyAwesomeStream", "MyOtherStream"]) + + """ + identifier = Identifier.load(external_id=external_id) + return self._retrieve(identifier=identifier, cls=Stream, headers=self.__alpha_headers) + + def delete(self, external_id: str) -> None: + """`Delete one or more streams `_ + + Args: + external_id (str): ID of streams. + Examples: + + Delete streams by id: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> client.data_modeling.streams.delete(streams=["myStream", "myOtherStream"]) + """ + self._delete(url_path=f"{self._RESOURCE_PATH}/{external_id}", headers=self.__alpha_headers) + + def list(self, limit: int | None = DEFAULT_LIMIT_READ) -> StreamList: + """`List streams `_ + + Args: + limit (int | None): Maximum number of streams to return. Defaults to 10. Set to -1, float("inf") or None to return all items. + + Returns: + StreamList: List of requested streams + + Examples: + + List streams and filter on max start time: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> stream_list = client.data_modeling.streams.list(limit=5) + + Iterate over streams: + + >>> for stream in client.data_modeling.streams: + ... stream # do something with the stream + + Iterate over chunks of streams to reduce memory load: + + >>> for stream_list in client.data_modeling.streams(chunk_size=2500): + ... stream_list # do something with the streams + """ + return self._list( + list_cls=StreamList, resource_cls=Stream, method="GET", limit=limit, headers=self.__alpha_headers + ) + + @overload + def apply(self, streams: Sequence[StreamApply]) -> StreamList: ... + + @overload + def apply(self, streams: StreamApply) -> Stream: ... + + def apply(self, streams: StreamApply | Sequence[StreamApply]) -> Stream | StreamList: + """`Create or patch one or more streams. `_ + + Args: + streams (StreamApply | Sequence[StreamApply]): Stream | Sequence[Stream]): Stream or streams of streamsda to create or update. + + Returns: + Stream | StreamList: Created stream(s) + + Examples: + + Create new streams: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.streams import StreamApply + >>> client = CogniteClient() + >>> streams = [StreamApply(stream="myStream", description="My first stream", name="My Stream"), + ... StreamApply(stream="myOtherStream", description="My second stream", name="My Other Stream")] + >>> res = client.data_modeling.streams.apply(streams) + """ + return self._create_multiple( + list_cls=StreamList, + resource_cls=Stream, + items=streams, + input_resource_cls=StreamApply, + headers=self.__alpha_headers, + ) diff --git a/cognite/client/data_classes/data_modeling/_validation.py b/cognite/client/data_classes/data_modeling/_validation.py index 10049a0a0d..cb03c6d02c 100644 --- a/cognite/client/data_classes/data_modeling/_validation.py +++ b/cognite/client/data_classes/data_modeling/_validation.py @@ -43,7 +43,7 @@ ) -def validate_data_modeling_identifier(space: str | None, external_id: str | None = None) -> None: +def validate_data_modeling_identifier(space: str, external_id: str | None = None) -> None: if space and space in RESERVED_SPACE_IDS: raise ValueError(f"The space ID: {space!r} is reserved. Please use another ID.") if external_id and external_id in RESERVED_EXTERNAL_IDS: diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py new file mode 100644 index 0000000000..6b06bbb741 --- /dev/null +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import TYPE_CHECKING, Any, Literal + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteObject, + CogniteResourceList, + WriteableCogniteResource, + WriteableCogniteResourceList, +) +from cognite.client.utils._text import convert_all_keys_to_camel_case_recursive + +if TYPE_CHECKING: + from cognite.client import CogniteClient + +TemplateName = Literal[ + "ImmutableTestStream", + "ImmutableDataStaging", + "ImmutableNormalizedData", + "ImmutableArchive", + "MutableTestStream", + "MutableLiveData", +] + + +@dataclass +class StreamTemplate(CogniteObject): + """A template for a stream. + + Args: + name (str): The name of the stream template. + """ + + name: TemplateName + + +@dataclass +class StreamSettings(CogniteObject): + template: StreamTemplate + + +class StreamApply(WriteableCogniteResource): + """A stream of records. This is the write version. + + Args: + external_id (str): Textual description of the stream + settings (StreamSettings): The settings for the stream, including the template. + """ + + def __init__(self, external_id: str, settings: StreamSettings) -> None: + self.external_id = external_id + self.settings = settings + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + settings = resource["settings"] + template = settings["template"] + return cls( + external_id=resource["externalId"], + settings=StreamSettings( + template=StreamTemplate( + name=template["name"], + ), + ), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + dumped = { + "external_id": self.external_id, + "settings": { + "template": {"name": self.settings.template.name}, + }, + } + if camel_case: + return convert_all_keys_to_camel_case_recursive(dumped) + return dumped + + def as_write(self) -> StreamApply: + """Returns this SpaceApply instance.""" + return self + + +class Stream(WriteableCogniteResource): + """A stream of records. This is the read version.""" + + def __init__( + self, + external_id: str, + created_time: datetime, + type: Literal["Immutable", "Mutable"], + created_from_template: TemplateName, + ) -> None: + self.external_id = external_id + self.created_time = created_time + self.type = type + self.created_from_template = created_from_template + + def as_apply(self) -> StreamApply: + return StreamApply( + external_id=self.external_id, + settings=StreamSettings(template=StreamTemplate(name=self.created_from_template)), + ) + + def as_write(self) -> StreamApply: + return self.as_apply() + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls( + external_id=resource["externalId"], + created_time=datetime.fromtimestamp(resource["createdTime"] / 1000.0), + type=resource["type"], + created_from_template=resource["createdFromTemplate"], + ) + + +class StreamApplyList(CogniteResourceList[StreamApply]): + _RESOURCE = StreamApply + + def as_ids(self) -> list[str]: + """ + Converts all the spaces to a space id list. + + Returns: + list[str]: A list of space ids. + """ + return [item.external_id for item in self] + + +class StreamList(WriteableCogniteResourceList[StreamApply, Stream]): + """A list of Stream objects.""" + + _RESOURCE = Stream + + def as_ids(self) -> list[str]: + """ + Converts all the spaces to a space id list. + + Returns: + list[str]: A list of space ids. + """ + return [item.external_id for item in self] + + def as_apply(self) -> StreamApplyList: + """ + Converts all the spaces to a space apply list. + + Returns: + StreamApplyList: A list of space applies. + """ + return StreamApplyList( + resources=[item.as_apply() for item in self], + cognite_client=self._get_cognite_client(), + ) + + def as_write(self) -> StreamApplyList: + """ + Converts all the spaces to a space apply list. + + Returns: + StreamApplyList: A list of space applies. + """ + return self.as_apply() diff --git a/tests/tests_integration/test_api/test_data_modeling/test_streams.py b/tests/tests_integration/test_api/test_data_modeling/test_streams.py new file mode 100644 index 0000000000..ba72ed4a19 --- /dev/null +++ b/tests/tests_integration/test_api/test_data_modeling/test_streams.py @@ -0,0 +1,42 @@ +import string + +import pytest + +from cognite.client import CogniteClient +from cognite.client.data_classes.data_modeling.streams import Stream, StreamApply, StreamSettings, StreamTemplate +from cognite.client.utils._text import random_string + + +@pytest.fixture +def persisted_stream(cognite_client: CogniteClient) -> Stream: + external_id = "python-sdk-test-stream-persistent" + stream = cognite_client.data_modeling.streams.retrieve(external_id=external_id) + if stream is None: + stream = cognite_client.data_modeling.streams.apply( + StreamApply( + external_id=external_id, settings=StreamSettings(template=StreamTemplate(name="MutableTestStream")) + ) + ) + return stream + + +class TestStreamsAPI: + def test_list(self, cognite_client: CogniteClient, persisted_stream: Stream) -> None: + streams = cognite_client.data_modeling.streams.list(limit=-1) + assert any(s.external_id == persisted_stream.external_id for s in streams) + + def test_retrieve(self, cognite_client: CogniteClient, persisted_stream: Stream) -> None: + retrieved = cognite_client.data_modeling.streams.retrieve(persisted_stream.external_id) + assert retrieved is not None + assert retrieved.external_id == persisted_stream.external_id + + def test_delete(self, cognite_client: CogniteClient, persisted_stream: Stream) -> None: + external_id = f"python-sdk-test-stream-{random_string(10, string.ascii_lowercase)}" + cognite_client.data_modeling.streams.apply( + StreamApply( + external_id=external_id, + settings=StreamSettings(template=StreamTemplate(name="MutableTestStream")), + ) + ) + cognite_client.data_modeling.streams.delete(external_id) + assert cognite_client.data_modeling.streams.retrieve(external_id) is None From 5ee67ff6b2c46c0f9dad3e23e74d3c791e04eeb4 Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Tue, 12 Aug 2025 16:18:19 +0200 Subject: [PATCH 02/14] Add basic support for records --- cognite/client/_api/data_modeling/__init__.py | 2 + cognite/client/_api/data_modeling/records.py | 83 ++++++ .../data_classes/data_modeling/records.py | 255 ++++++++++++++++++ .../test_api/test_data_modeling/conftest.py | 14 + .../test_data_modeling/test_records.py | 132 +++++++++ .../test_data_modeling/test_streams.py | 15 -- 6 files changed, 486 insertions(+), 15 deletions(-) create mode 100644 cognite/client/_api/data_modeling/records.py create mode 100644 cognite/client/data_classes/data_modeling/records.py create mode 100644 tests/tests_integration/test_api/test_data_modeling/test_records.py diff --git a/cognite/client/_api/data_modeling/__init__.py b/cognite/client/_api/data_modeling/__init__.py index 4ec09dfb17..b4fb498b1a 100644 --- a/cognite/client/_api/data_modeling/__init__.py +++ b/cognite/client/_api/data_modeling/__init__.py @@ -6,6 +6,7 @@ from cognite.client._api.data_modeling.data_models import DataModelsAPI from cognite.client._api.data_modeling.graphql import DataModelingGraphQLAPI from cognite.client._api.data_modeling.instances import InstancesAPI +from cognite.client._api.data_modeling.records import RecordsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI from cognite.client._api.data_modeling.streams import StreamsAPI @@ -28,3 +29,4 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client) self.statistics = StatisticsAPI(config, api_version, cognite_client) self.streams = StreamsAPI(config, api_version, cognite_client) + self.records = RecordsAPI(config, api_version, cognite_client) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py new file mode 100644 index 0000000000..49a903b204 --- /dev/null +++ b/cognite/client/_api/data_modeling/records.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +from collections.abc import Sequence +from typing import TYPE_CHECKING + +from cognite.client._api_client import APIClient +from cognite.client._constants import DEFAULT_LIMIT_READ +from cognite.client.data_classes.data_modeling.records import ( + LastUpdatedRange, + RecordId, + RecordIngest, + RecordList, + RecordListWithCursor, +) +from cognite.client.data_classes.filters import Filter + +if TYPE_CHECKING: + from cognite.client import CogniteClient + from cognite.client.config import ClientConfig + + +class RecordsAPI(APIClient): + _RESOURCE_PATH = "/streams/{}/records" + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self.__alpha_headers = { + "cdf-version": "alpha", + } + + def ingest(self, stream: str, records: Sequence[RecordIngest]) -> None: + body = {"items": [record.dump(camel_case=True) for record in records]} + self._post(url_path=self._RESOURCE_PATH.format(stream), json=body, headers=self.__alpha_headers) + + def upsert(self, stream: str, records: Sequence[RecordIngest]) -> None: + body = {"items": [record.dump(camel_case=True) for record in records]} + self._post(url_path=self._RESOURCE_PATH.format(stream) + "/upsert", json=body, headers=self.__alpha_headers) + + def delete(self, stream: str, id: RecordId | Sequence[RecordId]) -> None: + items = id if isinstance(id, Sequence) else [id] + body = {"items": [item.dump(camel_case=True) for item in items]} + self._post(url_path=self._RESOURCE_PATH.format(stream) + "/delete", json=body, headers=self.__alpha_headers) + + def filter( + self, + stream: str, + last_updated_time: LastUpdatedRange, + *, + filter: Filter | None = None, + limit: int | None = DEFAULT_LIMIT_READ, + ) -> RecordList: + body: dict = { + "lastUpdatedTime": last_updated_time.dump(), + } + if filter is not None: + body["filter"] = filter.dump() + body["limit"] = limit + res = self._post( + url_path=self._RESOURCE_PATH.format(stream) + "/filter", json=body, headers=self.__alpha_headers + ) + return RecordList._load(res.json()["items"], cognite_client=self._cognite_client) + + def sync( + self, + stream: str, + *, + filter: Filter | None = None, + cursor: str | None = None, + initialize_cursor: str | None = None, + limit: int | None = DEFAULT_LIMIT_READ, + ) -> RecordListWithCursor: + body: dict = {} + if filter is not None: + body["filter"] = filter + if cursor is not None: + body["cursor"] = cursor + if initialize_cursor is not None: + body["initializeCursor"] = initialize_cursor + body["limit"] = limit + res = self._post(url_path=self._RESOURCE_PATH.format(stream) + "/sync", json=body, headers=self.__alpha_headers) + payload = res.json() + items = RecordList._load(payload["items"], cognite_client=self._cognite_client) + return RecordListWithCursor(list(items), cursor=payload.get("nextCursor"), has_next=payload.get("hasNext")) diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py new file mode 100644 index 0000000000..cb241172f7 --- /dev/null +++ b/cognite/client/data_classes/data_modeling/records.py @@ -0,0 +1,255 @@ +from __future__ import annotations + +from collections import defaultdict +from collections.abc import ItemsView, Iterator, KeysView, Mapping, MutableMapping, ValuesView +from dataclasses import dataclass +from datetime import datetime +from typing import ( + TYPE_CHECKING, + Any, + TypeVar, + cast, + overload, +) + +from typing_extensions import Self + +from cognite.client.data_classes._base import CogniteObject, CogniteResource, CogniteResourceList +from cognite.client.data_classes.data_modeling.ids import ContainerId, ContainerIdentifier +from cognite.client.data_classes.data_modeling.instances import ( + PropertyIdentifier, + PropertyValue, + PropertyValueWrite, + _PropertyValueSerializer, +) +from cognite.client.utils import datetime_to_ms +from cognite.client.utils._text import convert_all_keys_to_camel_case + +if TYPE_CHECKING: + from cognite.client import CogniteClient + + +@dataclass(frozen=True) +class RecordId: + space: str + external_id: str + + def dump(self, camel_case: bool = True) -> dict[str, str]: + return {"space": self.space, "externalId" if camel_case else "external_id": self.external_id} + + @classmethod + def load(cls, data: dict[str, str] | tuple[str, str] | Self) -> Self: + if isinstance(data, cls): + return data + elif isinstance(data, tuple) and len(data) == 2: + return cls(*data) + elif isinstance(data, dict): + if "externalId" in data: + return cls(space=data["space"], external_id=data["externalId"]) + if "external_id" in data: + return cls(space=data["space"], external_id=data["external_id"]) + raise KeyError(f"Cannot load {data} into {cls}, missing 'externalId' or 'external_id' key") + + def as_tuple(self) -> tuple[str, str]: + return self.space, self.external_id + + def __repr__(self) -> str: + return f"{self.__class__.__name__}(space={self.space!r}, external_id={self.external_id!r})" + + +@dataclass +class RecordData(CogniteObject): + """This represents the data values of a node or edge. + + Args: + source (ContainerId | ViewId): The container or view the node or edge property is in + properties (Mapping[str, PropertyValue]): The properties of the node or edge. + """ + + source: ContainerId + properties: Mapping[str, PropertyValueWrite] + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + try: + source_type = resource["source"]["type"] + except KeyError as e: + raise ValueError("source must be a dict with a type key") from e + source: ContainerId + if source_type == "container": + source = ContainerId.load(resource["source"]) + else: + raise ValueError(f"source type must be container or view, but was {source_type}") + return cls( + source=source, + properties=resource["properties"], + ) + + def dump(self, camel_case: bool = True) -> dict: + properties = _PropertyValueSerializer.serialize_values(self.properties, camel_case) + output: dict[str, Any] = {"properties": properties} + if self.source: + if isinstance(self.source, ContainerId): + output["source"] = self.source.dump(camel_case) + else: + raise TypeError(f"source must be ContainerId, but was {type(self.source)}") + return output + + +@dataclass +class RecordIngest(CogniteObject): + id: RecordId + sources: list[RecordData] + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "space": self.id.space, + "externalId" if camel_case else "external_id": self.id.external_id, + "sources": [s.dump(camel_case) for s in self.sources], + } + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + id = RecordId(space=resource["space"], external_id=resource["externalId"]) + sources = [RecordData._load(source, cognite_client) for source in resource["sources"]] + return cls(id=id, sources=sources) + + +_T = TypeVar("_T") + + +class Properties(MutableMapping[ContainerIdentifier, MutableMapping[PropertyIdentifier, PropertyValue]]): + def __init__( + self, properties: MutableMapping[ContainerId, MutableMapping[PropertyIdentifier, PropertyValue]] + ) -> None: + self.data = properties + + @classmethod + def load( + cls, data: MutableMapping[str, MutableMapping[str, MutableMapping[PropertyIdentifier, PropertyValue]]] + ) -> Properties: + props: MutableMapping[ContainerId, MutableMapping[PropertyIdentifier, PropertyValue]] = {} + for space, container_properties in data.items(): + for container_id_str, properties in container_properties.items(): + container_id = ContainerId.load((space, container_id_str)) + props[container_id] = properties + return cls(props) + + def dump(self) -> dict[str, dict[str, dict[PropertyIdentifier, PropertyValue]]]: + props: dict[str, dict[str, dict[PropertyIdentifier, PropertyValue]]] = defaultdict(dict) + for container_id, properties in self.data.items(): + extid = container_id.external_id + props[container_id.space][extid] = cast(dict[PropertyIdentifier, PropertyValue], properties) + # Defaultdict is not yaml serializable + return dict(props) + + def items(self) -> ItemsView[ContainerId, MutableMapping[PropertyIdentifier, PropertyValue]]: + return self.data.items() + + def keys(self) -> KeysView[ContainerId]: + return self.data.keys() + + def values(self) -> ValuesView[MutableMapping[PropertyIdentifier, PropertyValue]]: + return self.data.values() + + def __iter__(self) -> Iterator[ContainerId]: + yield from self.keys() + + def __getitem__(self, view: ContainerIdentifier) -> MutableMapping[PropertyIdentifier, PropertyValue]: + view_id = ContainerId.load(view) + return self.data.get(view_id, {}) + + def __contains__(self, item: Any) -> bool: + view_id = ContainerId.load(item) + return view_id in self.data + + @overload + def get(self, source: ContainerIdentifier) -> MutableMapping[PropertyIdentifier, PropertyValue] | None: ... + + @overload + def get( + self, source: ContainerIdentifier, default: MutableMapping[PropertyIdentifier, PropertyValue] | _T + ) -> MutableMapping[PropertyIdentifier, PropertyValue] | _T: ... + + def get( + self, + source: ContainerIdentifier, + default: MutableMapping[PropertyIdentifier, PropertyValue] | None | _T | None = None, + ) -> MutableMapping[PropertyIdentifier, PropertyValue] | None | _T: + source_id = ContainerId.load(source) + return self.data.get(source_id, default) + + def __len__(self) -> int: + return len(self.data) + + def __delitem__(self, source: ContainerIdentifier) -> None: + source_id = ContainerId.load(source) + del self.data[source_id] + + def __setitem__( + self, source: ContainerIdentifier, properties: MutableMapping[PropertyIdentifier, PropertyValue] + ) -> None: + source_id = ContainerId.load(source) + self.data[source_id] = properties + + +@dataclass +class Record(CogniteResource): + id: RecordId + created_time: int + last_updated_time: int + properties: Properties + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "space": self.id.space, + "external_id": self.id.external_id, + "created_time": self.created_time, + "last_updated_time": self.last_updated_time, + "properties": [s.dump(camel_case) for s in self.properties] if self.properties else {}, + } + if camel_case: + convert_all_keys_to_camel_case(out) + return out + + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + id = RecordId.load(resource) + properties = Properties.load(resource.get("properties", {})) + return cls( + id=id, + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + properties=properties, + ) + + +class RecordList(CogniteResourceList[Record]): + _RESOURCE = Record + + +class RecordListWithCursor(RecordList): + def __init__(self, resources: list[Record], cursor: str | None, has_next: bool | None = None) -> None: + super().__init__(resources) + self.cursor = cursor + self.has_next = has_next + + +@dataclass +class LastUpdatedRange(CogniteObject): + gt: int | datetime | None = None + gte: int | datetime | None = None + lt: int | datetime | None = None + lte: int | datetime | None = None + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = {} + if self.gt is not None: + out["gt"] = datetime_to_ms(self.gt) if isinstance(self.gt, datetime) else self.gt + if self.gte is not None: + out["gte"] = datetime_to_ms(self.gte) if isinstance(self.gte, datetime) else self.gte + if self.lt is not None: + out["lt"] = datetime_to_ms(self.lt) if isinstance(self.lt, datetime) else self.lt + if self.lte is not None: + out["lte"] = datetime_to_ms(self.lte) if isinstance(self.lte, datetime) else self.lte + return out diff --git a/tests/tests_integration/test_api/test_data_modeling/conftest.py b/tests/tests_integration/test_api/test_data_modeling/conftest.py index 74418911a1..3002030357 100644 --- a/tests/tests_integration/test_api/test_data_modeling/conftest.py +++ b/tests/tests_integration/test_api/test_data_modeling/conftest.py @@ -31,6 +31,7 @@ Node, NodeApplyList, ) +from cognite.client.data_classes.data_modeling.streams import Stream, StreamApply, StreamSettings, StreamTemplate RESOURCES = Path(__file__).parent / "resources" @@ -586,3 +587,16 @@ def _read_edges(views: ViewList) -> EdgeApplyList: ) edges.append(edge) return EdgeApplyList(edges) + + +@pytest.fixture +def persisted_stream(cognite_client: CogniteClient) -> Stream: + external_id = "python-sdk-test-stream-persistent" + stream = cognite_client.data_modeling.streams.retrieve(external_id=external_id) + if stream is None: + stream = cognite_client.data_modeling.streams.apply( + StreamApply( + external_id=external_id, settings=StreamSettings(template=StreamTemplate(name="MutableTestStream")) + ) + ) + return stream diff --git a/tests/tests_integration/test_api/test_data_modeling/test_records.py b/tests/tests_integration/test_api/test_data_modeling/test_records.py new file mode 100644 index 0000000000..9510e088aa --- /dev/null +++ b/tests/tests_integration/test_api/test_data_modeling/test_records.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +import string +from collections.abc import Callable +from random import random +from time import sleep +from typing import TypeVar + +from cognite.client import CogniteClient +from cognite.client.data_classes.data_modeling import Container +from cognite.client.data_classes.data_modeling.ids import ContainerId +from cognite.client.data_classes.data_modeling.records import ( + LastUpdatedRange, + RecordData, + RecordId, + RecordIngest, + RecordListWithCursor, +) +from cognite.client.data_classes.data_modeling.streams import Stream +from cognite.client.utils._text import random_string + +T = TypeVar("T") + + +def retry_assertion_errors(func: Callable[[], T], num_retries: int = 10) -> T: + for i in range(num_retries): + try: + return func() + except AssertionError as e: + if i < num_retries - 1: + sleep(i * random()) + else: + raise e + raise RuntimeError("Unexpected state reached") + + +class TestRecords: + def test_records_ingest_upsert_delete( + self, cognite_client: CogniteClient, persisted_stream: Stream, primitive_nullable_container: Container + ) -> None: + # Use the primitive container from integration fixtures + space = primitive_nullable_container.space + container = ContainerId(space=space, external_id=primitive_nullable_container.external_id) + + record_id = RecordId(space, f"rec-{random_string(10, string.ascii_lowercase)}") + ingest = RecordIngest( + id=record_id, + sources=[ + RecordData( + source=container, + properties={"text": "hello", "int32": 1}, + ) + ], + ) + + cognite_client.data_modeling.records.ingest(persisted_stream.external_id, [ingest]) + + # Upsert with a modified property + upsert = RecordIngest( + id=record_id, + sources=[ + RecordData( + source=container, + properties={"text": "hello2", "int32": 2}, + ) + ], + ) + cognite_client.data_modeling.records.upsert(persisted_stream.external_id, [upsert]) + + # Delete the record + cognite_client.data_modeling.records.delete(persisted_stream.external_id, record_id) + + def test_sync( + self, cognite_client: CogniteClient, persisted_stream: Stream, primitive_nullable_container: Container + ) -> None: + space = primitive_nullable_container.space + container = ContainerId(space=space, external_id=primitive_nullable_container.external_id) + + rec_external_id = f"rec-{random_string(10, string.ascii_lowercase)}" + ingest = RecordIngest( + id=RecordId(space=space, external_id=rec_external_id), + sources=[RecordData(source=container, properties={"text": "hello", "int32": 1})], + ) + + # Ingest + cognite_client.data_modeling.records.ingest(persisted_stream.external_id, [ingest]) + + def sync_and_assert_result_present() -> RecordListWithCursor: + synced = cognite_client.data_modeling.records.sync( + persisted_stream.external_id, initialize_cursor="1d-ago", limit=100 + ) + assert any(r.id.external_id == rec_external_id for r in synced) + return synced + + synced = retry_assertion_errors(sync_and_assert_result_present) + + # Expect presence of cursor fields (may be None depending on backend), but ensure attributes exist + assert hasattr(synced, "cursor") + + # Cleanup + cognite_client.data_modeling.records.delete( + persisted_stream.external_id, [RecordId(space=space, external_id=rec_external_id)] + ) + + def test_filter( + self, cognite_client: CogniteClient, persisted_stream: Stream, primitive_nullable_container: Container + ) -> None: + space = primitive_nullable_container.space + container = ContainerId(space=space, external_id=primitive_nullable_container.external_id) + + rec_external_id = f"rec-{random_string(10, string.ascii_lowercase)}" + ingest = RecordIngest( + id=RecordId(space=space, external_id=rec_external_id), + sources=[RecordData(source=container, properties={"text": "hello", "int32": 1})], + ) + + # Ingest + cognite_client.data_modeling.records.ingest(persisted_stream.external_id, [ingest]) + + # Filter + def filter_and_assert_result_present() -> None: + filtered = cognite_client.data_modeling.records.filter( + persisted_stream.external_id, last_updated_time=LastUpdatedRange(gt=0), limit=100 + ) + assert any(r.id.external_id == rec_external_id for r in filtered) + + retry_assertion_errors(filter_and_assert_result_present) + + # Cleanup + cognite_client.data_modeling.records.delete( + persisted_stream.external_id, [RecordId(space=space, external_id=rec_external_id)] + ) diff --git a/tests/tests_integration/test_api/test_data_modeling/test_streams.py b/tests/tests_integration/test_api/test_data_modeling/test_streams.py index ba72ed4a19..57ac110fd3 100644 --- a/tests/tests_integration/test_api/test_data_modeling/test_streams.py +++ b/tests/tests_integration/test_api/test_data_modeling/test_streams.py @@ -1,25 +1,10 @@ import string -import pytest - from cognite.client import CogniteClient from cognite.client.data_classes.data_modeling.streams import Stream, StreamApply, StreamSettings, StreamTemplate from cognite.client.utils._text import random_string -@pytest.fixture -def persisted_stream(cognite_client: CogniteClient) -> Stream: - external_id = "python-sdk-test-stream-persistent" - stream = cognite_client.data_modeling.streams.retrieve(external_id=external_id) - if stream is None: - stream = cognite_client.data_modeling.streams.apply( - StreamApply( - external_id=external_id, settings=StreamSettings(template=StreamTemplate(name="MutableTestStream")) - ) - ) - return stream - - class TestStreamsAPI: def test_list(self, cognite_client: CogniteClient, persisted_stream: Stream) -> None: streams = cognite_client.data_modeling.streams.list(limit=-1) From 886749363df107cc9a01c850321626f1568bf7f2 Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Tue, 12 Aug 2025 16:37:44 +0200 Subject: [PATCH 03/14] Raise FeaturePreviewWarning on all invocations of stream/record methods --- cognite/client/_api/data_modeling/records.py | 8 ++++++ cognite/client/_api/data_modeling/streams.py | 8 ++++++ cognite/client/utils/_experimental.py | 28 +++++++++++++++++++- 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 49a903b204..0179066eb1 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -13,12 +13,20 @@ RecordListWithCursor, ) from cognite.client.data_classes.filters import Filter +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations if TYPE_CHECKING: from cognite.client import CogniteClient from cognite.client.config import ClientConfig +@warn_on_all_method_invocations( + FeaturePreviewWarning( + api_maturity="alpha", + sdk_maturity="alpha", + feature_name="Records API", + ) +) class RecordsAPI(APIClient): _RESOURCE_PATH = "/streams/{}/records" diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py index 8b784b57c8..6a21e3001a 100644 --- a/cognite/client/_api/data_modeling/streams.py +++ b/cognite/client/_api/data_modeling/streams.py @@ -6,6 +6,7 @@ from cognite.client._api_client import APIClient from cognite.client._constants import DEFAULT_LIMIT_READ from cognite.client.data_classes.data_modeling.streams import Stream, StreamApply, StreamList +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import Identifier if TYPE_CHECKING: @@ -13,6 +14,13 @@ from cognite.client.config import ClientConfig +@warn_on_all_method_invocations( + FeaturePreviewWarning( + api_maturity="alpha", + sdk_maturity="alpha", + feature_name="Records API", + ) +) class StreamsAPI(APIClient): _RESOURCE_PATH = "/streams" diff --git a/cognite/client/utils/_experimental.py b/cognite/client/utils/_experimental.py index f7988abb05..42d778be65 100644 --- a/cognite/client/utils/_experimental.py +++ b/cognite/client/utils/_experimental.py @@ -1,7 +1,9 @@ from __future__ import annotations +import functools import warnings -from typing import Literal +from collections.abc import Callable +from typing import Any, Literal, TypeVar, cast class FeaturePreviewWarning(FutureWarning): @@ -38,3 +40,27 @@ def warn(self) -> None: def __reduce__(self) -> tuple: # This is needed to make the cognite client picklable as warings are stored on APIClass objects. return self.__class__, (self.api_version, self.sdk_version, self.feature_name) + + +T_Class = TypeVar("T_Class", bound=type) +T_Callable = TypeVar("T_Callable", bound=Callable) + + +def warn_on_all_method_invocations(warning: FeaturePreviewWarning) -> Callable[[T_Class], T_Class]: + def _with_warning(c: T_Callable) -> T_Callable: + @functools.wraps(c) + def warning_wrapper(*args: Any, **kwargs: Any) -> Any: + warning.warn() + return c(*args, **kwargs) + + return cast(T_Callable, warning_wrapper) + + def _warn_on_all_method_invocations(cls: T_Class) -> T_Class: + for name in dir(cls): + if not name.startswith("_"): + attr = getattr(cls, name) + if callable(attr): + setattr(cls, name, _with_warning(attr)) + return cls + + return _warn_on_all_method_invocations From e3086e35d29ce3ecd5894bbf4793f533ea512b98 Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Tue, 12 Aug 2025 16:48:57 +0200 Subject: [PATCH 04/14] Use new warn_on_all_method_invocations decorator across the board --- cognite/client/_api/agents/agents.py | 10 ++++------ cognite/client/_api/data_modeling/records.py | 6 +----- cognite/client/_api/data_modeling/streams.py | 6 +----- .../_api/hosted_extractors/destinations.py | 15 ++++----------- cognite/client/_api/hosted_extractors/jobs.py | 16 ++++------------ .../client/_api/hosted_extractors/mappings.py | 15 ++++----------- cognite/client/_api/hosted_extractors/sources.py | 15 ++++----------- cognite/client/_api/simulators/__init__.py | 9 ++++----- cognite/client/_api/simulators/integrations.py | 9 ++++----- cognite/client/_api/simulators/logs.py | 10 ++++------ cognite/client/_api/simulators/models.py | 11 ++++------- .../client/_api/simulators/models_revisions.py | 11 ++++------- .../client/_api/simulators/routine_revisions.py | 12 ++++-------- cognite/client/_api/simulators/routines.py | 14 ++++---------- cognite/client/_api/simulators/runs.py | 14 ++++---------- 15 files changed, 54 insertions(+), 119 deletions(-) diff --git a/cognite/client/_api/agents/agents.py b/cognite/client/_api/agents/agents.py index f5e41162b1..69b6ef3d6a 100644 --- a/cognite/client/_api/agents/agents.py +++ b/cognite/client/_api/agents/agents.py @@ -5,7 +5,7 @@ from cognite.client._api_client import APIClient from cognite.client.data_classes.agents import Agent, AgentList, AgentUpsert -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence from cognite.client.utils.useful_types import SequenceNotStr @@ -14,12 +14,14 @@ from cognite.client.config import ClientConfig +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Agents") +) class AgentsAPI(APIClient): _RESOURCE_PATH = "/ai/agents" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) - self._warnings = FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Agents") self._api_subversion = "alpha" self._CREATE_LIMIT = 1 self._DELETE_LIMIT = 1 @@ -150,7 +152,6 @@ def upsert(self, agents: AgentUpsert | Sequence[AgentUpsert]) -> Agent | AgentLi """ - self._warnings.warn() return self._create_multiple( list_cls=AgentList, resource_cls=Agent, @@ -188,7 +189,6 @@ def retrieve( >>> res = client.agents.retrieve(external_ids=["my_agent_1", "my_agent_2"]) """ - self._warnings.warn() identifiers = IdentifierSequence.load(external_ids=external_ids) return self._retrieve_multiple( list_cls=AgentList, @@ -213,7 +213,6 @@ def delete(self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bo >>> client.agents.delete(external_ids="my_agent") """ - self._warnings.warn() self._delete_multiple( identifiers=IdentifierSequence.load(external_ids=external_ids), wrap_ids=True, @@ -235,6 +234,5 @@ def list(self) -> AgentList: # The API does not yet support limit or pagination >>> agent_list = client.agents.list() """ - self._warnings.warn() res = self._get(url_path=self._RESOURCE_PATH) return AgentList._load(res.json()["items"], cognite_client=self._cognite_client) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 0179066eb1..4eaac48b63 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -21,11 +21,7 @@ @warn_on_all_method_invocations( - FeaturePreviewWarning( - api_maturity="alpha", - sdk_maturity="alpha", - feature_name="Records API", - ) + FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Records API") ) class RecordsAPI(APIClient): _RESOURCE_PATH = "/streams/{}/records" diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py index 6a21e3001a..f78dec4baa 100644 --- a/cognite/client/_api/data_modeling/streams.py +++ b/cognite/client/_api/data_modeling/streams.py @@ -15,11 +15,7 @@ @warn_on_all_method_invocations( - FeaturePreviewWarning( - api_maturity="alpha", - sdk_maturity="alpha", - feature_name="Records API", - ) + FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Records API") ) class StreamsAPI(APIClient): _RESOURCE_PATH = "/streams" diff --git a/cognite/client/_api/hosted_extractors/destinations.py b/cognite/client/_api/hosted_extractors/destinations.py index 77ce16d0bb..e5c3988b3a 100644 --- a/cognite/client/_api/hosted_extractors/destinations.py +++ b/cognite/client/_api/hosted_extractors/destinations.py @@ -11,7 +11,7 @@ DestinationUpdate, DestinationWrite, ) -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence from cognite.client.utils.useful_types import SequenceNotStr @@ -19,14 +19,14 @@ from cognite.client import ClientConfig, CogniteClient +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="beta", sdk_maturity="alpha", feature_name="Hosted Extractors") +) class DestinationsAPI(APIClient): _RESOURCE_PATH = "/hostedextractors/destinations" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) - self._warning = FeaturePreviewWarning( - api_maturity="beta", sdk_maturity="alpha", feature_name="Hosted Extractors" - ) self._CREATE_LIMIT = 10 self._LIST_LIMIT = 100 self._RETRIEVE_LIMIT = 100 @@ -63,8 +63,6 @@ def __call__( Returns: Iterator[Destination] | Iterator[DestinationList]: yields Destination one by one if chunk_size is not specified, else DestinationList objects. """ - self._warning.warn() - return self._list_generator( list_cls=DestinationList, resource_cls=Destination, @@ -114,7 +112,6 @@ def retrieve( >>> res = client.hosted_extractors.destinations.retrieve(["myDestination", "myDestination2"], ignore_unknown_ids=True) """ - self._warning.warn() return self._retrieve_multiple( list_cls=DestinationList, resource_cls=Destination, @@ -141,7 +138,6 @@ def delete( >>> client = CogniteClient() >>> client.hosted_extractors.destinations.delete(["myDest", "MyDest2"]) """ - self._warning.warn() extra_body_fields: dict[str, Any] = {} if ignore_unknown_ids: extra_body_fields["ignoreUnknownIds"] = True @@ -181,7 +177,6 @@ def create(self, items: DestinationWrite | Sequence[DestinationWrite]) -> Destin >>> destination = DestinationWrite(external_id='my_dest', credentials=SessionWrite("my_nonce"), target_data_set_id=123) >>> res = client.hosted_extractors.destinations.create(destination) """ - self._warning.warn() return self._create_multiple( list_cls=DestinationList, resource_cls=Destination, @@ -228,7 +223,6 @@ def update( >>> destination = DestinationUpdate('my_dest').target_data_set_id.set(123) >>> res = client.hosted_extractors.destinations.update(destination) """ - self._warning.warn() return self._update_multiple( items=items, list_cls=DestinationList, @@ -268,7 +262,6 @@ def list( >>> for destination_list in client.hosted_extractors.destinations(chunk_size=25): ... destination_list # do something with the destinationss """ - self._warning.warn() return self._list( list_cls=DestinationList, resource_cls=Destination, diff --git a/cognite/client/_api/hosted_extractors/jobs.py b/cognite/client/_api/hosted_extractors/jobs.py index ff8ad8b053..bf5a9d47ea 100644 --- a/cognite/client/_api/hosted_extractors/jobs.py +++ b/cognite/client/_api/hosted_extractors/jobs.py @@ -15,7 +15,7 @@ JobUpdate, JobWrite, ) -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence from cognite.client.utils.useful_types import SequenceNotStr @@ -23,14 +23,14 @@ from cognite.client import ClientConfig, CogniteClient +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="beta", sdk_maturity="alpha", feature_name="Hosted Extractors") +) class JobsAPI(APIClient): _RESOURCE_PATH = "/hostedextractors/jobs" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) - self._warning = FeaturePreviewWarning( - api_maturity="beta", sdk_maturity="alpha", feature_name="Hosted Extractors" - ) self._CREATE_LIMIT = 10 self._LIST_LIMIT = 100 self._RETRIEVE_LIMIT = 100 @@ -67,7 +67,6 @@ def __call__( Returns: Iterator[Job] | Iterator[JobList]: yields Job one by one if chunk_size is not specified, else JobList objects. """ - self._warning.warn() return self._list_generator( list_cls=JobList, resource_cls=Job, @@ -116,7 +115,6 @@ def retrieve( >>> res = client.hosted_extractors.jobs.retrieve(["myJob", "myOtherJob"], ignore_unknown_ids=True) """ - self._warning.warn() return self._retrieve_multiple( list_cls=JobList, resource_cls=Job, @@ -143,7 +141,6 @@ def delete( >>> client = CogniteClient() >>> client.hosted_extractors.jobs.delete(["myMQTTJob", "MyEventHubJob"]) """ - self._warning.warn() extra_body_fields: dict[str, Any] = {} if ignore_unknown_ids: extra_body_fields["ignoreUnknownIds"] = True @@ -181,7 +178,6 @@ def create(self, items: JobWrite | Sequence[JobWrite]) -> Job | JobList: >>> job_write = EventHubSourceWrite('my_event_hub', 'http://myeventhub.com', "My EventHub", 'my_key', 'my_value') >>> job = client.hosted_extractors.jobs.create(job_write) """ - self._warning.warn() return self._create_multiple( list_cls=JobList, resource_cls=Job, @@ -228,7 +224,6 @@ def update( >>> job = EventHubSourceUpdate('my_event_hub').event_hub_name.set("My Updated EventHub") >>> updated_job = client.hosted_extractors.jobs.update(job) """ - self._warning.warn() return self._update_multiple( items=items, list_cls=JobList, @@ -268,7 +263,6 @@ def list( >>> for job_list in client.hosted_extractors.jobs(chunk_size=25): ... job_list # do something with the jobs """ - self._warning.warn() return self._list( list_cls=JobList, resource_cls=Job, @@ -303,7 +297,6 @@ def list_logs( >>> client = CogniteClient() >>> res = client.hosted_extractors.jobs.list_logs(job="myJob") """ - self._warning.warn() filter_: dict[str, Any] = {} if job: filter_["job"] = job @@ -348,7 +341,6 @@ def list_metrics( >>> client = CogniteClient() >>> res = client.hosted_extractors.jobs.list_metrics(job="myJob") """ - self._warning.warn() filter_: dict[str, Any] = {} if job: filter_["job"] = job diff --git a/cognite/client/_api/hosted_extractors/mappings.py b/cognite/client/_api/hosted_extractors/mappings.py index f7d2b02ba5..f1c1d93f78 100644 --- a/cognite/client/_api/hosted_extractors/mappings.py +++ b/cognite/client/_api/hosted_extractors/mappings.py @@ -11,7 +11,7 @@ MappingUpdate, MappingWrite, ) -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence from cognite.client.utils.useful_types import SequenceNotStr @@ -19,14 +19,14 @@ from cognite.client import ClientConfig, CogniteClient +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors") +) class MappingsAPI(APIClient): _RESOURCE_PATH = "/hostedextractors/mappings" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) - self._warning = FeaturePreviewWarning( - api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors" - ) self._CREATE_LIMIT = 10 self._LIST_LIMIT = 100 self._RETRIEVE_LIMIT = 100 @@ -63,8 +63,6 @@ def __call__( Returns: Iterator[Mapping] | Iterator[MappingList]: yields Mapping one by one if chunk_size is not specified, else MappingList objects. """ - self._warning.warn() - return self._list_generator( list_cls=MappingList, resource_cls=Mapping, @@ -114,7 +112,6 @@ def retrieve( >>> res = client.hosted_extractors.mappings.retrieve(["myMapping", "myMapping2"], ignore_unknown_ids=True) """ - self._warning.warn() return self._retrieve_multiple( list_cls=MappingList, resource_cls=Mapping, @@ -141,7 +138,6 @@ def delete( >>> client = CogniteClient() >>> client.hosted_extractors.mappings.delete(["myMapping", "MyMapping2"]) """ - self._warning.warn() extra_body_fields: dict[str, Any] = { "ignoreUnknownIds": ignore_unknown_ids, "force": force, @@ -180,7 +176,6 @@ def create(self, items: MappingWrite | Sequence[MappingWrite]) -> Mapping | Mapp >>> mapping = MappingWrite(external_id="my_mapping", mapping=CustomMapping("some expression"), published=True, input="json") >>> res = client.hosted_extractors.mappings.create(mapping) """ - self._warning.warn() return self._create_multiple( list_cls=MappingList, resource_cls=Mapping, @@ -216,7 +211,6 @@ def update( >>> mapping = MappingUpdate('my_mapping').published.set(False) >>> res = client.hosted_extractors.mappings.update(mapping) """ - self._warning.warn() return self._update_multiple( items=items, list_cls=MappingList, @@ -255,7 +249,6 @@ def list( >>> for mapping_list in client.hosted_extractors.mappings(chunk_size=25): ... mapping_list # do something with the mappings """ - self._warning.warn() return self._list( list_cls=MappingList, resource_cls=Mapping, diff --git a/cognite/client/_api/hosted_extractors/sources.py b/cognite/client/_api/hosted_extractors/sources.py index 6ef02474d5..3879a204e4 100644 --- a/cognite/client/_api/hosted_extractors/sources.py +++ b/cognite/client/_api/hosted_extractors/sources.py @@ -7,7 +7,7 @@ from cognite.client._constants import DEFAULT_LIMIT_READ from cognite.client.data_classes._base import CogniteResource, PropertySpec, T_CogniteResource from cognite.client.data_classes.hosted_extractors.sources import Source, SourceList, SourceUpdate, SourceWrite -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence from cognite.client.utils.useful_types import SequenceNotStr @@ -15,14 +15,14 @@ from cognite.client import ClientConfig, CogniteClient +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors") +) class SourcesAPI(APIClient): _RESOURCE_PATH = "/hostedextractors/sources" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) - self._warning = FeaturePreviewWarning( - api_maturity="alpha", sdk_maturity="alpha", feature_name="Hosted Extractors" - ) self._CREATE_LIMIT = 10 self._LIST_LIMIT = 100 self._RETRIEVE_LIMIT = 100 @@ -59,8 +59,6 @@ def __call__( Returns: Iterator[Source] | Iterator[SourceList]: yields Source one by one if chunk_size is not specified, else SourceList objects. """ - self._warning.warn() - return self._list_generator( list_cls=SourceList, resource_cls=Source, # type: ignore[type-abstract] @@ -109,7 +107,6 @@ def retrieve( >>> res = client.hosted_extractors.sources.retrieve(["myMQTTSource", "MyEventHubSource"], ignore_unknown_ids=True) """ - self._warning.warn() return self._retrieve_multiple( list_cls=SourceList, resource_cls=Source, # type: ignore[type-abstract] @@ -135,7 +132,6 @@ def delete( >>> client = CogniteClient() >>> client.hosted_extractors.sources.delete(["myMQTTSource", "MyEventHubSource"]) """ - self._warning.warn() extra_body_fields: dict[str, Any] = {} if ignore_unknown_ids: extra_body_fields["ignoreUnknownIds"] = True @@ -174,7 +170,6 @@ def create(self, items: SourceWrite | Sequence[SourceWrite]) -> Source | SourceL >>> source = EventHubSourceWrite('my_event_hub', 'http://myeventhub.com', "My EventHub", 'my_key', 'my_value') >>> res = client.hosted_extractors.sources.create(source) """ - self._warning.warn() return self._create_multiple( list_cls=SourceList, resource_cls=Source, # type: ignore[type-abstract] @@ -221,7 +216,6 @@ def update( >>> source = EventHubSourceUpdate('my_event_hub').event_hub_name.set("My Updated EventHub") >>> res = client.hosted_extractors.sources.update(source) """ - self._warning.warn() return self._update_multiple( items=items, # type: ignore[arg-type] list_cls=SourceList, @@ -274,7 +268,6 @@ def list( >>> for source_list in client.hosted_extractors.sources(chunk_size=25): ... source_list # do something with the sources """ - self._warning.warn() return self._list( list_cls=SourceList, resource_cls=Source, # type: ignore[type-abstract] diff --git a/cognite/client/_api/simulators/__init__.py b/cognite/client/_api/simulators/__init__.py index b638b17986..5969712411 100644 --- a/cognite/client/_api/simulators/__init__.py +++ b/cognite/client/_api/simulators/__init__.py @@ -11,13 +11,16 @@ from cognite.client._api_client import APIClient from cognite.client._constants import DEFAULT_LIMIT_READ from cognite.client.data_classes.simulators.simulators import Simulator, SimulatorList -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations if TYPE_CHECKING: from cognite.client import CogniteClient from cognite.client.config import ClientConfig +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators") +) class SimulatorsAPI(APIClient): _RESOURCE_PATH = "/simulators" @@ -28,9 +31,6 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client self.runs = SimulatorRunsAPI(config, api_version, cognite_client) self.routines = SimulatorRoutinesAPI(config, api_version, cognite_client) self.logs = SimulatorLogsAPI(config, api_version, cognite_client) - self._warning = FeaturePreviewWarning( - api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators" - ) def __iter__(self) -> Iterator[Simulator]: """Iterate over simulators @@ -86,5 +86,4 @@ def list(self, limit: int = DEFAULT_LIMIT_READ) -> SimulatorList: >>> res = client.simulators.list(limit=10) """ - self._warning.warn() return self._list(method="POST", limit=limit, resource_cls=Simulator, list_cls=SimulatorList) diff --git a/cognite/client/_api/simulators/integrations.py b/cognite/client/_api/simulators/integrations.py index 7210afec8f..d61aceacec 100644 --- a/cognite/client/_api/simulators/integrations.py +++ b/cognite/client/_api/simulators/integrations.py @@ -10,7 +10,7 @@ SimulatorIntegration, SimulatorIntegrationList, ) -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence from cognite.client.utils.useful_types import SequenceNotStr @@ -19,15 +19,15 @@ from cognite.client.config import ClientConfig +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators") +) class SimulatorIntegrationsAPI(APIClient): _RESOURCE_PATH = "/simulators/integrations" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) self._DELETE_LIMIT = 1 - self._warning = FeaturePreviewWarning( - api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators" - ) def __iter__(self) -> Iterator[SimulatorIntegration]: """Iterate over simulator integrations @@ -118,7 +118,6 @@ def list( ... ) """ integrations_filter = SimulatorIntegrationFilter(simulator_external_ids=simulator_external_ids, active=active) - self._warning.warn() return self._list( method="POST", limit=limit, diff --git a/cognite/client/_api/simulators/logs.py b/cognite/client/_api/simulators/logs.py index 9f93ecf1c7..a14d8a55b5 100644 --- a/cognite/client/_api/simulators/logs.py +++ b/cognite/client/_api/simulators/logs.py @@ -5,21 +5,21 @@ from cognite.client._api_client import APIClient from cognite.client.data_classes.simulators.logs import SimulatorLog, SimulatorLogList -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence if TYPE_CHECKING: from cognite.client import ClientConfig, CogniteClient +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators") +) class SimulatorLogsAPI(APIClient): _RESOURCE_PATH = "/simulators/logs" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) - self._warning = FeaturePreviewWarning( - api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators" - ) self._RETRIEVE_LIMIT = 1 @overload @@ -65,8 +65,6 @@ def retrieve(self, ids: int | Sequence[int]) -> SimulatorLogList | SimulatorLog >>> run = client.simulators.runs.retrieve(ids=2) >>> res = run.get_logs() """ - self._warning.warn() - return self._retrieve_multiple( list_cls=SimulatorLogList, resource_cls=SimulatorLog, diff --git a/cognite/client/_api/simulators/models.py b/cognite/client/_api/simulators/models.py index 7728ac4613..8f6998619e 100644 --- a/cognite/client/_api/simulators/models.py +++ b/cognite/client/_api/simulators/models.py @@ -13,7 +13,7 @@ SimulatorModelUpdate, SimulatorModelWrite, ) -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence from cognite.client.utils._validation import assert_type from cognite.client.utils.useful_types import SequenceNotStr @@ -22,15 +22,15 @@ from cognite.client import ClientConfig, CogniteClient +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators") +) class SimulatorModelsAPI(APIClient): _RESOURCE_PATH = "/simulators/models" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) self.revisions = SimulatorModelRevisionsAPI(config, api_version, cognite_client) - self._warning = FeaturePreviewWarning( - api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators" - ) self._RETRIEVE_LIMIT = 1 self._CREATE_LIMIT = 1 self._DELETE_LIMIT = 1 @@ -71,7 +71,6 @@ def list( """ model_filter = SimulatorModelsFilter(simulator_external_ids=simulator_external_ids) - self._warning.warn() return self._list( method="POST", limit=limit, @@ -134,8 +133,6 @@ def retrieve( ... external_ids=["model_external_id", "model_external_id2"] ... ) """ - self._warning.warn() - return self._retrieve_multiple( list_cls=SimulatorModelList, resource_cls=SimulatorModel, diff --git a/cognite/client/_api/simulators/models_revisions.py b/cognite/client/_api/simulators/models_revisions.py index 1ad63255e8..0370046a0a 100644 --- a/cognite/client/_api/simulators/models_revisions.py +++ b/cognite/client/_api/simulators/models_revisions.py @@ -12,7 +12,7 @@ SimulatorModelRevisionList, SimulatorModelRevisionWrite, ) -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence from cognite.client.utils._validation import assert_type from cognite.client.utils.useful_types import SequenceNotStr @@ -21,14 +21,14 @@ from cognite.client import ClientConfig, CogniteClient +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators") +) class SimulatorModelRevisionsAPI(APIClient): _RESOURCE_PATH = "/simulators/models/revisions" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) - self._warning = FeaturePreviewWarning( - api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators" - ) self._CREATE_LIMIT = 1 self._RETRIEVE_LIMIT = 100 @@ -79,7 +79,6 @@ def list( created_time=created_time, last_updated_time=last_updated_time, ) - self._warning.warn() return self._list( method="POST", limit=limit, @@ -144,8 +143,6 @@ def retrieve( ... external_ids=["revision1", "revision2"] ... ) """ - self._warning.warn() - return self._retrieve_multiple( list_cls=SimulatorModelRevisionList, resource_cls=SimulatorModelRevision, diff --git a/cognite/client/_api/simulators/routine_revisions.py b/cognite/client/_api/simulators/routine_revisions.py index ddf82b1710..095e7799de 100644 --- a/cognite/client/_api/simulators/routine_revisions.py +++ b/cognite/client/_api/simulators/routine_revisions.py @@ -12,7 +12,7 @@ SimulatorRoutineRevisionList, SimulatorRoutineRevisionWrite, ) -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence from cognite.client.utils._validation import assert_type from cognite.client.utils.useful_types import SequenceNotStr @@ -21,14 +21,14 @@ from cognite.client import ClientConfig, CogniteClient +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators") +) class SimulatorRoutineRevisionsAPI(APIClient): _RESOURCE_PATH = "/simulators/routines/revisions" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) - self._warning = FeaturePreviewWarning( - api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators" - ) self._LIST_LIMIT = 20 self._CREATE_LIMIT = 1 self._RETRIEVE_LIMIT = 20 @@ -105,7 +105,6 @@ def __call__( Returns: Iterator[SimulatorRoutineRevision] | Iterator[SimulatorRoutineRevisionList]: yields SimulatorRoutineRevision one by one if chunk is not specified, else SimulatorRoutineRevisionList objects. """ - self._warning.warn() filter = SimulatorRoutineRevisionsFilter( all_versions=all_versions, routine_external_ids=routine_external_ids, @@ -171,7 +170,6 @@ def retrieve( Get simulator routine revision by external id: >>> res = client.simulators.routines.revisions.retrieve(external_ids="routine_v1") """ - self._warning.warn() identifiers = IdentifierSequence.load(ids=ids, external_ids=external_ids) return self._retrieve_multiple( resource_cls=SimulatorRoutineRevision, @@ -301,7 +299,6 @@ def create( ... ] >>> res = client.simulators.routines.revisions.create(routine_revisions) """ - self._warning.warn() assert_type( items, "simulator_routine_revision", @@ -361,7 +358,6 @@ def list( ... ) """ - self._warning.warn() filter = SimulatorRoutineRevisionsFilter( all_versions=all_versions, routine_external_ids=routine_external_ids, diff --git a/cognite/client/_api/simulators/routines.py b/cognite/client/_api/simulators/routines.py index 42ea544f0b..82dac24718 100644 --- a/cognite/client/_api/simulators/routines.py +++ b/cognite/client/_api/simulators/routines.py @@ -17,7 +17,7 @@ SimulationRun, SimulationRunWrite, ) -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence from cognite.client.utils._validation import assert_type from cognite.client.utils.useful_types import SequenceNotStr @@ -26,15 +26,15 @@ from cognite.client import ClientConfig, CogniteClient +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators") +) class SimulatorRoutinesAPI(APIClient): _RESOURCE_PATH = "/simulators/routines" def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None: super().__init__(config, api_version, cognite_client) self.revisions = SimulatorRoutineRevisionsAPI(config, api_version, cognite_client) - self._warning = FeaturePreviewWarning( - api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators" - ) self._CREATE_LIMIT = 1 self._DELETE_LIMIT = 1 @@ -86,7 +86,6 @@ def __call__( Returns: Iterator[SimulatorRoutine] | Iterator[SimulatorRoutineList]: yields SimulatorRoutine one by one if chunk is not specified, else SimulatorRoutineList objects. """ - self._warning.warn() routines_filter = SimulatorRoutinesFilter( model_external_ids=model_external_ids, simulator_integration_external_ids=simulator_integration_external_ids, @@ -139,7 +138,6 @@ def create( ... ] >>> res = client.simulators.routines.create(routines) """ - self._warning.warn() assert_type(routine, "simulator_routines", [SimulatorRoutineWrite, Sequence]) return self._create_multiple( @@ -167,7 +165,6 @@ def delete( >>> client = CogniteClient() >>> client.simulators.routines.delete(ids=[1,2,3], external_ids="foo") """ - self._warning.warn() self._delete_multiple( identifiers=IdentifierSequence.load(ids=ids, external_ids=external_ids), wrap_ids=True, @@ -210,12 +207,10 @@ def list( ... ) """ - self._warning.warn() routines_filter = SimulatorRoutinesFilter( model_external_ids=model_external_ids, simulator_integration_external_ids=simulator_integration_external_ids, ) - self._warning.warn() return self._list( limit=limit, method="POST", @@ -261,7 +256,6 @@ def run( ... log_severity="Debug" ... ) """ - self._warning.warn() run_object = SimulationRunWrite( routine_external_id=routine_external_id, inputs=list(inputs) if inputs is not None else None, diff --git a/cognite/client/_api/simulators/runs.py b/cognite/client/_api/simulators/runs.py index ea88f7bb49..4d9f04c54d 100644 --- a/cognite/client/_api/simulators/runs.py +++ b/cognite/client/_api/simulators/runs.py @@ -12,7 +12,7 @@ SimulatorRunDataList, SimulatorRunList, ) -from cognite.client.utils._experimental import FeaturePreviewWarning +from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import IdentifierSequence from cognite.client.utils._validation import assert_type from cognite.client.utils.useful_types import SequenceNotStr @@ -22,6 +22,9 @@ from cognite.client.config import ClientConfig +@warn_on_all_method_invocations( + FeaturePreviewWarning(api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators") +) class SimulatorRunsAPI(APIClient): _RESOURCE_PATH = "/simulators/runs" _RESOURCE_PATH_RUN = "/simulators/run" @@ -35,11 +38,6 @@ def __init__( super().__init__(config, api_version, cognite_client) self._CREATE_LIMIT = 1 self._RETRIEVE_LIMIT = 1 - self._warning = FeaturePreviewWarning( - api_maturity="General Availability", - sdk_maturity="alpha", - feature_name="Simulators", - ) def __iter__(self) -> Iterator[SimulationRun]: """Iterate over simulation runs @@ -187,7 +185,6 @@ def list( routine_revision_external_ids=routine_revision_external_ids, model_revision_external_ids=model_revision_external_ids, ) - self._warning.warn() return self._list( method="POST", limit=limit, @@ -223,7 +220,6 @@ def retrieve( >>> client = CogniteClient() >>> run = client.simulators.runs.retrieve(ids=2) """ - self._warning.warn() identifiers = IdentifierSequence.load(ids=ids) return self._retrieve_multiple( resource_cls=SimulationRun, @@ -295,8 +291,6 @@ def list_run_data( >>> run = client.simulators.runs.retrieve(ids=2) >>> res = run.get_data() """ - self._warning.warn() - req = self._post( url_path=f"{self._RESOURCE_PATH}/data/list", json={"items": [{"runId": run_id}]}, From a80483204808d1461a36749b95037c334318f5d7 Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Tue, 12 Aug 2025 16:50:40 +0200 Subject: [PATCH 05/14] Bump version and update changelog --- CHANGELOG.md | 5 +++++ cognite/client/_version.py | 2 +- pyproject.toml | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56fdd9769b..a767b3caaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. + +## [7.81.0] - 2025-08-14 +### Added +- [alpha] Alpha support for Streams & Records APIs. Note that both the SDK and API implementation may be changed at any time. + ## [7.80.1] - 2025-08-14 ### Fixed - Make CogniteAPIError.response_code non-nullable again, addressing a regression introduced in the previous version. diff --git a/cognite/client/_version.py b/cognite/client/_version.py index b9ef73f03e..2f265c125d 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,5 +1,5 @@ from __future__ import annotations -__version__ = "7.80.1" +__version__ = "7.81.0" __api_subversion__ = "20230101" diff --git a/pyproject.toml b/pyproject.toml index 7b932d474d..09e0ed22c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cognite-sdk" -version = "7.80.1" +version = "7.81.0" description = "Cognite Python SDK" readme = "README.md" From a62553666aef90fe02e247f0354bab48442b0178 Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Tue, 12 Aug 2025 17:06:29 +0200 Subject: [PATCH 06/14] Fix CogniteClientMock --- cognite/client/testing.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 696cfd4e56..318f833030 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -17,8 +17,10 @@ from cognite.client._api.data_modeling.data_models import DataModelsAPI from cognite.client._api.data_modeling.graphql import DataModelingGraphQLAPI from cognite.client._api.data_modeling.instances import InstancesAPI +from cognite.client._api.data_modeling.records import RecordsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import SpaceStatisticsAPI, StatisticsAPI +from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api.data_sets import DataSetsAPI from cognite.client._api.datapoints import DatapointsAPI @@ -125,6 +127,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.data_modeling.graphql = MagicMock(spec_set=DataModelingGraphQLAPI) self.data_modeling.statistics = MagicMock(spec=StatisticsAPI) self.data_modeling.statistics.spaces = MagicMock(spec_set=SpaceStatisticsAPI) + self.data_modeling.streams = MagicMock(spec_set=StreamsAPI) + self.data_modeling.records = MagicMock(spec_set=RecordsAPI) self.data_sets = MagicMock(spec_set=DataSetsAPI) From 08ed751564a7556bf6f022723556ff6393c8075e Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Tue, 12 Aug 2025 17:14:21 +0200 Subject: [PATCH 07/14] Add retryable POST endpoints to _RETRYABLE_POST_ENDPOINT_REGEX_PATTERNS We should consider making this a blacklist instead of a whitelist, would be much easier to maintain --- cognite/client/_api_client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cognite/client/_api_client.py b/cognite/client/_api_client.py index cf37c2fbd7..7e5a77e9fd 100644 --- a/cognite/client/_api_client.py +++ b/cognite/client/_api_client.py @@ -91,6 +91,8 @@ class APIClient: "context/entitymatching/(byids|list|jobs)", "sessions/revoke", "models/.*", + "streams/[^/]/records", + "streams/[^/]/records/(list|delete|aggregate|filter|sync)", ".*/graphql", "units/.*", "annotations/(list|byids|reverselookup)", From 55ab1b57ecce81a0b7d7a9c4d591fd3cf31dab70 Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Tue, 12 Aug 2025 17:24:45 +0200 Subject: [PATCH 08/14] Fix dump and load methods --- .../data_classes/data_modeling/records.py | 4 +-- .../data_classes/data_modeling/streams.py | 31 ++++++++++++++++++- tests/utils.py | 5 +-- 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py index cb241172f7..5eb455c81b 100644 --- a/cognite/client/data_classes/data_modeling/records.py +++ b/cognite/client/data_classes/data_modeling/records.py @@ -206,10 +206,10 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: "external_id": self.id.external_id, "created_time": self.created_time, "last_updated_time": self.last_updated_time, - "properties": [s.dump(camel_case) for s in self.properties] if self.properties else {}, + "properties": self.properties.dump(), } if camel_case: - convert_all_keys_to_camel_case(out) + return convert_all_keys_to_camel_case(out) return out @classmethod diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py index 6b06bbb741..c0204b06ac 100644 --- a/cognite/client/data_classes/data_modeling/streams.py +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -12,6 +12,7 @@ WriteableCogniteResource, WriteableCogniteResourceList, ) +from cognite.client.utils import datetime_to_ms, ms_to_datetime from cognite.client.utils._text import convert_all_keys_to_camel_case_recursive if TYPE_CHECKING: @@ -37,11 +38,28 @@ class StreamTemplate(CogniteObject): name: TemplateName + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + return cls(name=resource["name"]) + @dataclass class StreamSettings(CogniteObject): template: StreamTemplate + @classmethod + def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: + template = StreamTemplate._load(resource["template"], cognite_client) + return cls(template=template) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + dumped = { + "template": self.template.dump(camel_case=camel_case), + } + if camel_case: + return convert_all_keys_to_camel_case_recursive(dumped) + return dumped + class StreamApply(WriteableCogniteResource): """A stream of records. This is the write version. @@ -112,11 +130,22 @@ def as_write(self) -> StreamApply: def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: return cls( external_id=resource["externalId"], - created_time=datetime.fromtimestamp(resource["createdTime"] / 1000.0), + created_time=ms_to_datetime(resource["createdTime"]), type=resource["type"], created_from_template=resource["createdFromTemplate"], ) + def dump(self, camel_case: bool = True) -> dict[str, Any]: + dumped = { + "external_id": self.external_id, + "created_time": datetime_to_ms(self.created_time), + "type": self.type, + "created_from_template": self.created_from_template, + } + if camel_case: + return convert_all_keys_to_camel_case_recursive(dumped) + return dumped + class StreamApplyList(CogniteResourceList[StreamApply]): _RESOURCE = StreamApply diff --git a/tests/utils.py b/tests/utils.py index 33621ea15a..93b52ea69f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -14,7 +14,7 @@ import typing from collections.abc import Mapping from contextlib import contextmanager -from datetime import timedelta, timezone +from datetime import datetime, timedelta, timezone from pathlib import Path from types import UnionType from typing import TYPE_CHECKING, Any, Literal, TypeVar, cast, get_args, get_origin, get_type_hints @@ -473,7 +473,8 @@ def create_instance(self, resource_cls: type[T_Object], skip_defaulted_args: boo keyword_arguments.pop("max_list_size", None) elif resource_cls is SimulatorRoutineStepArguments: keyword_arguments = {"data": {"reference_id": self._random_string(50), "arg2": self._random_string(50)}} - + elif resource_cls is datetime: + return datetime(year=2000, month=1, day=1) return resource_cls(*positional_arguments, **keyword_arguments) def create_value(self, type_: Any, var_name: str | None = None) -> Any: From 6ea6ae8bf793490d6d320bfd7dcdc3f53dfc7343 Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Tue, 12 Aug 2025 17:31:36 +0200 Subject: [PATCH 09/14] Skip records/streams integration tests in CI Only run them locally for now against the erlend-test project where records&streams is available --- .../test_api/test_data_modeling/test_records.py | 8 ++++++++ .../test_api/test_data_modeling/test_streams.py | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/tests/tests_integration/test_api/test_data_modeling/test_records.py b/tests/tests_integration/test_api/test_data_modeling/test_records.py index 9510e088aa..463c831195 100644 --- a/tests/tests_integration/test_api/test_data_modeling/test_records.py +++ b/tests/tests_integration/test_api/test_data_modeling/test_records.py @@ -1,11 +1,14 @@ from __future__ import annotations +import os import string from collections.abc import Callable from random import random from time import sleep from typing import TypeVar +import pytest + from cognite.client import CogniteClient from cognite.client.data_classes.data_modeling import Container from cognite.client.data_classes.data_modeling.ids import ContainerId @@ -21,6 +24,11 @@ T = TypeVar("T") +if os.environ["COGNITE_PROJECT"] != "erlend-test": + pytest.skip( + "Skipping all Records integration tests, only enabled in alpha for erlend-test project", allow_module_level=True + ) + def retry_assertion_errors(func: Callable[[], T], num_retries: int = 10) -> T: for i in range(num_retries): diff --git a/tests/tests_integration/test_api/test_data_modeling/test_streams.py b/tests/tests_integration/test_api/test_data_modeling/test_streams.py index 57ac110fd3..2971e1e52c 100644 --- a/tests/tests_integration/test_api/test_data_modeling/test_streams.py +++ b/tests/tests_integration/test_api/test_data_modeling/test_streams.py @@ -1,9 +1,17 @@ +import os import string +import pytest + from cognite.client import CogniteClient from cognite.client.data_classes.data_modeling.streams import Stream, StreamApply, StreamSettings, StreamTemplate from cognite.client.utils._text import random_string +if os.environ["COGNITE_PROJECT"] != "erlend-test": + pytest.skip( + "Skipping all Records integration tests, only enabled in alpha for erlend-test project", allow_module_level=True + ) + class TestStreamsAPI: def test_list(self, cognite_client: CogniteClient, persisted_stream: Stream) -> None: From 7ebb47a5fab5e62357a193d203ac0963f227f87a Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Wed, 13 Aug 2025 10:36:30 +0200 Subject: [PATCH 10/14] Get rid of RecordData class Just reuse SourceData (previously NodeOrEdgeData) instead --- .../data_classes/data_modeling/instances.py | 9 +++- .../data_classes/data_modeling/records.py | 50 +++---------------- .../test_data_modeling/test_records.py | 10 ++-- 3 files changed, 18 insertions(+), 51 deletions(-) diff --git a/cognite/client/data_classes/data_modeling/instances.py b/cognite/client/data_classes/data_modeling/instances.py index 11e6ef20d2..554799cce7 100644 --- a/cognite/client/data_classes/data_modeling/instances.py +++ b/cognite/client/data_classes/data_modeling/instances.py @@ -103,8 +103,8 @@ @dataclass -class NodeOrEdgeData(CogniteObject): - """This represents the data values of a node or edge. +class SourceData(CogniteObject): + """This represents the property data for a given view/container. Args: source (ContainerId | ViewId): The container or view the node or edge property is in @@ -145,6 +145,11 @@ def dump(self, camel_case: bool = True) -> dict: return output +# Keep this for backwards compatibility, since we renamed NodeOrEdgeData to SourceData when we introduced records +# into the SDK. +NodeOrEdgeData = SourceData + + class InstanceCore(DataModelingResource, ABC): """A node or edge Args: diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py index 5eb455c81b..b6d9ee7f32 100644 --- a/cognite/client/data_classes/data_modeling/records.py +++ b/cognite/client/data_classes/data_modeling/records.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections import defaultdict -from collections.abc import ItemsView, Iterator, KeysView, Mapping, MutableMapping, ValuesView +from collections.abc import ItemsView, Iterator, KeysView, MutableMapping, ValuesView from dataclasses import dataclass from datetime import datetime from typing import ( @@ -19,8 +19,7 @@ from cognite.client.data_classes.data_modeling.instances import ( PropertyIdentifier, PropertyValue, - PropertyValueWrite, - _PropertyValueSerializer, + SourceData, ) from cognite.client.utils import datetime_to_ms from cognite.client.utils._text import convert_all_keys_to_camel_case @@ -57,49 +56,10 @@ def __repr__(self) -> str: return f"{self.__class__.__name__}(space={self.space!r}, external_id={self.external_id!r})" -@dataclass -class RecordData(CogniteObject): - """This represents the data values of a node or edge. - - Args: - source (ContainerId | ViewId): The container or view the node or edge property is in - properties (Mapping[str, PropertyValue]): The properties of the node or edge. - """ - - source: ContainerId - properties: Mapping[str, PropertyValueWrite] - - @classmethod - def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: - try: - source_type = resource["source"]["type"] - except KeyError as e: - raise ValueError("source must be a dict with a type key") from e - source: ContainerId - if source_type == "container": - source = ContainerId.load(resource["source"]) - else: - raise ValueError(f"source type must be container or view, but was {source_type}") - return cls( - source=source, - properties=resource["properties"], - ) - - def dump(self, camel_case: bool = True) -> dict: - properties = _PropertyValueSerializer.serialize_values(self.properties, camel_case) - output: dict[str, Any] = {"properties": properties} - if self.source: - if isinstance(self.source, ContainerId): - output["source"] = self.source.dump(camel_case) - else: - raise TypeError(f"source must be ContainerId, but was {type(self.source)}") - return output - - @dataclass class RecordIngest(CogniteObject): id: RecordId - sources: list[RecordData] + sources: list[SourceData] def dump(self, camel_case: bool = True) -> dict[str, Any]: return { @@ -111,13 +71,15 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: @classmethod def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = None) -> Self: id = RecordId(space=resource["space"], external_id=resource["externalId"]) - sources = [RecordData._load(source, cognite_client) for source in resource["sources"]] + sources = [SourceData._load(source, cognite_client) for source in resource["sources"]] return cls(id=id, sources=sources) _T = TypeVar("_T") +# TODO: Use the Properties class from cognite.client.data_classes.data_modeling.instances, and make that support +# both container ids and view ids. We need that for when we support containers for instances anyway. class Properties(MutableMapping[ContainerIdentifier, MutableMapping[PropertyIdentifier, PropertyValue]]): def __init__( self, properties: MutableMapping[ContainerId, MutableMapping[PropertyIdentifier, PropertyValue]] diff --git a/tests/tests_integration/test_api/test_data_modeling/test_records.py b/tests/tests_integration/test_api/test_data_modeling/test_records.py index 463c831195..b37add3071 100644 --- a/tests/tests_integration/test_api/test_data_modeling/test_records.py +++ b/tests/tests_integration/test_api/test_data_modeling/test_records.py @@ -12,9 +12,9 @@ from cognite.client import CogniteClient from cognite.client.data_classes.data_modeling import Container from cognite.client.data_classes.data_modeling.ids import ContainerId +from cognite.client.data_classes.data_modeling.instances import SourceData from cognite.client.data_classes.data_modeling.records import ( LastUpdatedRange, - RecordData, RecordId, RecordIngest, RecordListWithCursor, @@ -54,7 +54,7 @@ def test_records_ingest_upsert_delete( ingest = RecordIngest( id=record_id, sources=[ - RecordData( + SourceData( source=container, properties={"text": "hello", "int32": 1}, ) @@ -67,7 +67,7 @@ def test_records_ingest_upsert_delete( upsert = RecordIngest( id=record_id, sources=[ - RecordData( + SourceData( source=container, properties={"text": "hello2", "int32": 2}, ) @@ -87,7 +87,7 @@ def test_sync( rec_external_id = f"rec-{random_string(10, string.ascii_lowercase)}" ingest = RecordIngest( id=RecordId(space=space, external_id=rec_external_id), - sources=[RecordData(source=container, properties={"text": "hello", "int32": 1})], + sources=[SourceData(source=container, properties={"text": "hello", "int32": 1})], ) # Ingest @@ -119,7 +119,7 @@ def test_filter( rec_external_id = f"rec-{random_string(10, string.ascii_lowercase)}" ingest = RecordIngest( id=RecordId(space=space, external_id=rec_external_id), - sources=[RecordData(source=container, properties={"text": "hello", "int32": 1})], + sources=[SourceData(source=container, properties={"text": "hello", "int32": 1})], ) # Ingest From bf85d13eeb2a77f76f3297d5878dcaa0b48a4e2a Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Wed, 13 Aug 2025 13:13:36 +0200 Subject: [PATCH 11/14] Use instances.Properties class in records too --- .../client/data_classes/data_modeling/ids.py | 16 +++ .../data_classes/data_modeling/instances.py | 85 +++++++++------- .../data_classes/data_modeling/records.py | 97 +------------------ 3 files changed, 67 insertions(+), 131 deletions(-) diff --git a/cognite/client/data_classes/data_modeling/ids.py b/cognite/client/data_classes/data_modeling/ids.py index 5eda8c6252..3f6e3cfc7f 100644 --- a/cognite/client/data_classes/data_modeling/ids.py +++ b/cognite/client/data_classes/data_modeling/ids.py @@ -183,6 +183,8 @@ def version(self) -> str | None: ... ConstraintIdentifier = tuple[ContainerId, str] IndexIdentifier = tuple[ContainerId, str] ViewIdentifier = ViewId | tuple[str, str] | tuple[str, str, str] +SourceIdentifier = ContainerIdentifier | ViewIdentifier +SourceId = ContainerId | ViewId DataModelIdentifier = DataModelId | tuple[str, str] | tuple[str, str, str] NodeIdentifier = NodeId | tuple[str, str, str] EdgeIdentifier = EdgeId | tuple[str, str, str] @@ -198,6 +200,20 @@ def _load_space_identifier(ids: str | SequenceNotStr[str]) -> DataModelingIdenti ) +def _load_source_id(source_identifier: SourceIdentifier) -> SourceId: + match source_identifier: + case ContainerId() | ViewId(): + return source_identifier + case (str(space), str(view_external_id), str(version)): + return ViewId(space=space, external_id=view_external_id, version=version) + case (str(space), str(container_external_id)): + return ContainerId(space, container_external_id) + raise ValueError( + "Invalid source identifier format. Expected ContainerId or ViewId or tuple of " + "strings((space, externalId) for container or (space, externalId, version) for views)." + ) + + def _load_identifier( ids: Id | Sequence[Id], id_type: Literal["container", "view", "data_model", "space", "node", "edge"] ) -> DataModelingIdentifierSequence: diff --git a/cognite/client/data_classes/data_modeling/instances.py b/cognite/client/data_classes/data_modeling/instances.py index 554799cce7..1aa84f02e7 100644 --- a/cognite/client/data_classes/data_modeling/instances.py +++ b/cognite/client/data_classes/data_modeling/instances.py @@ -59,8 +59,10 @@ ContainerId, EdgeId, NodeId, + SourceId, + SourceIdentifier, ViewId, - ViewIdentifier, + _load_source_id, ) from cognite.client.utils._auxiliary import exactly_one_is_not_none, find_duplicates, flatten_dict from cognite.client.utils._identifier import InstanceId @@ -224,82 +226,93 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> S _T = TypeVar("_T") -class Properties(MutableMapping[ViewIdentifier, MutableMapping[PropertyIdentifier, PropertyValue]]): - def __init__(self, properties: MutableMapping[ViewId, MutableMapping[PropertyIdentifier, PropertyValue]]) -> None: +class Properties(MutableMapping[SourceIdentifier, MutableMapping[PropertyIdentifier, PropertyValue]]): + def __init__(self, properties: MutableMapping[SourceId, MutableMapping[PropertyIdentifier, PropertyValue]]) -> None: self.data = properties @classmethod def load( cls, data: MutableMapping[Space, MutableMapping[str, MutableMapping[PropertyIdentifier, PropertyValue]]] ) -> Properties: - props: MutableMapping[ViewId, MutableMapping[PropertyIdentifier, PropertyValue]] = {} + props: MutableMapping[SourceId, MutableMapping[PropertyIdentifier, PropertyValue]] = {} for space, view_properties in data.items(): - for view_id_str, properties in view_properties.items(): - view_tuple = tuple(view_id_str.split("/", 1)) - if len(view_tuple) != 2: - warnings.warn( - f"Unknown type of view id: {view_id_str}, expected format /. Skipping...", - stacklevel=2, - ) - continue - view_id = ViewId.load((space, *view_tuple)) - props[view_id] = properties + for source_id_str, properties in view_properties.items(): + if "/" in source_id_str: + view_tuple = tuple(source_id_str.split("/", 1)) + if len(view_tuple) != 2: + warnings.warn( + f"Unknown type of view id: {source_id_str}, expected format /. Skipping...", + stacklevel=2, + ) + continue + source_id: SourceId = ViewId.load((space, *view_tuple)) + else: + source_id = ContainerId.load((space, source_id_str)) + props[source_id] = properties + return cls(props) def dump(self) -> dict[Space, dict[str, dict[PropertyIdentifier, PropertyValue]]]: props: dict[Space, dict[str, dict[PropertyIdentifier, PropertyValue]]] = defaultdict(dict) - for view_id, properties in self.data.items(): - view_id_str = f"{view_id.external_id}/{view_id.version}" - props[view_id.space][view_id_str] = cast(dict[PropertyIdentifier, PropertyValue], properties) + for source_id, properties in self.data.items(): + if isinstance(source_id, ViewId): + source_id_str = f"{source_id.external_id}/{source_id.version}" + elif isinstance(source_id, ContainerId): + source_id_str = source_id.external_id + else: + raise ValueError("SourceId must be either a ViewId or a ContainerId") + props[source_id.space][source_id_str] = cast(dict[PropertyIdentifier, PropertyValue], properties) # Defaultdict is not yaml serializable return dict(props) - def items(self) -> ItemsView[ViewId, MutableMapping[PropertyIdentifier, PropertyValue]]: + def items(self) -> ItemsView[SourceId, MutableMapping[PropertyIdentifier, PropertyValue]]: return self.data.items() - def keys(self) -> KeysView[ViewId]: + def keys(self) -> KeysView[SourceId]: return self.data.keys() def values(self) -> ValuesView[MutableMapping[PropertyIdentifier, PropertyValue]]: return self.data.values() - def __iter__(self) -> Iterator[ViewId]: + def __iter__(self) -> Iterator[SourceId]: yield from self.keys() - def __getitem__(self, view: ViewIdentifier) -> MutableMapping[PropertyIdentifier, PropertyValue]: - view_id = ViewId.load(view) - return self.data.get(view_id, {}) + def __getitem__(self, item: SourceIdentifier) -> MutableMapping[PropertyIdentifier, PropertyValue]: + source_id = _load_source_id(item) + return self.data.get(source_id, {}) def __contains__(self, item: Any) -> bool: - view_id = ViewId.load(item) - return view_id in self.data + source_id = _load_source_id(item) + return source_id in self.data @overload - def get(self, view: ViewIdentifier) -> MutableMapping[PropertyIdentifier, PropertyValue] | None: ... + def get(self, view: SourceIdentifier) -> MutableMapping[PropertyIdentifier, PropertyValue] | None: ... @overload def get( - self, view: ViewIdentifier, default: MutableMapping[PropertyIdentifier, PropertyValue] | _T + self, view: SourceIdentifier, default: MutableMapping[PropertyIdentifier, PropertyValue] | _T ) -> MutableMapping[PropertyIdentifier, PropertyValue] | _T: ... def get( self, - view: ViewIdentifier, + view: SourceIdentifier, default: MutableMapping[PropertyIdentifier, PropertyValue] | None | _T | None = None, ) -> MutableMapping[PropertyIdentifier, PropertyValue] | None | _T: - view_id = ViewId.load(view) - return self.data.get(view_id, default) + source_id = _load_source_id(view) + return self.data.get(source_id, default) def __len__(self) -> int: return len(self.data) - def __delitem__(self, view: ViewIdentifier) -> None: - view_id = ViewId.load(view) - del self.data[view_id] + def __delitem__(self, item: SourceIdentifier) -> None: + source_id = _load_source_id(item) + del self.data[source_id] - def __setitem__(self, view: ViewIdentifier, properties: MutableMapping[PropertyIdentifier, PropertyValue]) -> None: - view_id = ViewId.load(view) - self.data[view_id] = properties + def __setitem__( + self, item: SourceIdentifier, properties: MutableMapping[PropertyIdentifier, PropertyValue] + ) -> None: + source_id = _load_source_id(item) + self.data[source_id] = properties def _repr_html_(self) -> str: pd = local_import("pandas") diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py index b6d9ee7f32..b00ad56ef9 100644 --- a/cognite/client/data_classes/data_modeling/records.py +++ b/cognite/client/data_classes/data_modeling/records.py @@ -1,26 +1,13 @@ from __future__ import annotations -from collections import defaultdict -from collections.abc import ItemsView, Iterator, KeysView, MutableMapping, ValuesView from dataclasses import dataclass from datetime import datetime -from typing import ( - TYPE_CHECKING, - Any, - TypeVar, - cast, - overload, -) +from typing import TYPE_CHECKING, Any from typing_extensions import Self from cognite.client.data_classes._base import CogniteObject, CogniteResource, CogniteResourceList -from cognite.client.data_classes.data_modeling.ids import ContainerId, ContainerIdentifier -from cognite.client.data_classes.data_modeling.instances import ( - PropertyIdentifier, - PropertyValue, - SourceData, -) +from cognite.client.data_classes.data_modeling.instances import Properties, SourceData from cognite.client.utils import datetime_to_ms from cognite.client.utils._text import convert_all_keys_to_camel_case @@ -75,86 +62,6 @@ def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None = return cls(id=id, sources=sources) -_T = TypeVar("_T") - - -# TODO: Use the Properties class from cognite.client.data_classes.data_modeling.instances, and make that support -# both container ids and view ids. We need that for when we support containers for instances anyway. -class Properties(MutableMapping[ContainerIdentifier, MutableMapping[PropertyIdentifier, PropertyValue]]): - def __init__( - self, properties: MutableMapping[ContainerId, MutableMapping[PropertyIdentifier, PropertyValue]] - ) -> None: - self.data = properties - - @classmethod - def load( - cls, data: MutableMapping[str, MutableMapping[str, MutableMapping[PropertyIdentifier, PropertyValue]]] - ) -> Properties: - props: MutableMapping[ContainerId, MutableMapping[PropertyIdentifier, PropertyValue]] = {} - for space, container_properties in data.items(): - for container_id_str, properties in container_properties.items(): - container_id = ContainerId.load((space, container_id_str)) - props[container_id] = properties - return cls(props) - - def dump(self) -> dict[str, dict[str, dict[PropertyIdentifier, PropertyValue]]]: - props: dict[str, dict[str, dict[PropertyIdentifier, PropertyValue]]] = defaultdict(dict) - for container_id, properties in self.data.items(): - extid = container_id.external_id - props[container_id.space][extid] = cast(dict[PropertyIdentifier, PropertyValue], properties) - # Defaultdict is not yaml serializable - return dict(props) - - def items(self) -> ItemsView[ContainerId, MutableMapping[PropertyIdentifier, PropertyValue]]: - return self.data.items() - - def keys(self) -> KeysView[ContainerId]: - return self.data.keys() - - def values(self) -> ValuesView[MutableMapping[PropertyIdentifier, PropertyValue]]: - return self.data.values() - - def __iter__(self) -> Iterator[ContainerId]: - yield from self.keys() - - def __getitem__(self, view: ContainerIdentifier) -> MutableMapping[PropertyIdentifier, PropertyValue]: - view_id = ContainerId.load(view) - return self.data.get(view_id, {}) - - def __contains__(self, item: Any) -> bool: - view_id = ContainerId.load(item) - return view_id in self.data - - @overload - def get(self, source: ContainerIdentifier) -> MutableMapping[PropertyIdentifier, PropertyValue] | None: ... - - @overload - def get( - self, source: ContainerIdentifier, default: MutableMapping[PropertyIdentifier, PropertyValue] | _T - ) -> MutableMapping[PropertyIdentifier, PropertyValue] | _T: ... - - def get( - self, - source: ContainerIdentifier, - default: MutableMapping[PropertyIdentifier, PropertyValue] | None | _T | None = None, - ) -> MutableMapping[PropertyIdentifier, PropertyValue] | None | _T: - source_id = ContainerId.load(source) - return self.data.get(source_id, default) - - def __len__(self) -> int: - return len(self.data) - - def __delitem__(self, source: ContainerIdentifier) -> None: - source_id = ContainerId.load(source) - del self.data[source_id] - - def __setitem__( - self, source: ContainerIdentifier, properties: MutableMapping[PropertyIdentifier, PropertyValue] - ) -> None: - source_id = ContainerId.load(source) - self.data[source_id] = properties - - @dataclass class Record(CogniteResource): id: RecordId From 44708f7e31fd38cba7f388571b4f35cfe6a94b22 Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Fri, 15 Aug 2025 16:22:27 +0200 Subject: [PATCH 12/14] Fix filter dump in sync method --- cognite/client/_api/data_modeling/records.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 4eaac48b63..8e5dfcc24c 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -75,7 +75,7 @@ def sync( ) -> RecordListWithCursor: body: dict = {} if filter is not None: - body["filter"] = filter + body["filter"] = filter.dump() if cursor is not None: body["cursor"] = cursor if initialize_cursor is not None: From 8704f931f36e79e3b88ffcdb84266970649f1bef Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Mon, 18 Aug 2025 14:15:49 +0200 Subject: [PATCH 13/14] Make last_updated_time filter optional on records.filter --- cognite/client/_api/data_modeling/records.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index 8e5dfcc24c..b99a769ce2 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections.abc import Sequence -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from cognite.client._api_client import APIClient from cognite.client._constants import DEFAULT_LIMIT_READ @@ -48,14 +48,14 @@ def delete(self, stream: str, id: RecordId | Sequence[RecordId]) -> None: def filter( self, stream: str, - last_updated_time: LastUpdatedRange, *, + last_updated_time: LastUpdatedRange | None = None, filter: Filter | None = None, limit: int | None = DEFAULT_LIMIT_READ, ) -> RecordList: - body: dict = { - "lastUpdatedTime": last_updated_time.dump(), - } + body: dict[str, Any] = {} + if last_updated_time is not None: + body["lastUpdatedTime"] = last_updated_time.dump() if filter is not None: body["filter"] = filter.dump() body["limit"] = limit From 74cd61d93ed158086b8ec27e3246c1b6c7634cae Mon Sep 17 00:00:00 2001 From: erlendvollset Date: Mon, 18 Aug 2025 14:17:59 +0200 Subject: [PATCH 14/14] Rename Stream apply to create --- cognite/client/_api/data_modeling/records.py | 4 ++-- cognite/client/_api/data_modeling/streams.py | 22 +++++++++---------- .../data_classes/data_modeling/streams.py | 16 +++++++------- .../test_api/test_data_modeling/conftest.py | 6 ++--- .../test_data_modeling/test_streams.py | 6 ++--- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py index b99a769ce2..b0d63fa0ca 100644 --- a/cognite/client/_api/data_modeling/records.py +++ b/cognite/client/_api/data_modeling/records.py @@ -40,8 +40,8 @@ def upsert(self, stream: str, records: Sequence[RecordIngest]) -> None: body = {"items": [record.dump(camel_case=True) for record in records]} self._post(url_path=self._RESOURCE_PATH.format(stream) + "/upsert", json=body, headers=self.__alpha_headers) - def delete(self, stream: str, id: RecordId | Sequence[RecordId]) -> None: - items = id if isinstance(id, Sequence) else [id] + def delete(self, stream: str, ids: RecordId | Sequence[RecordId]) -> None: + items = ids if isinstance(ids, Sequence) else [ids] body = {"items": [item.dump(camel_case=True) for item in items]} self._post(url_path=self._RESOURCE_PATH.format(stream) + "/delete", json=body, headers=self.__alpha_headers) diff --git a/cognite/client/_api/data_modeling/streams.py b/cognite/client/_api/data_modeling/streams.py index f78dec4baa..4615c2f361 100644 --- a/cognite/client/_api/data_modeling/streams.py +++ b/cognite/client/_api/data_modeling/streams.py @@ -5,7 +5,7 @@ from cognite.client._api_client import APIClient from cognite.client._constants import DEFAULT_LIMIT_READ -from cognite.client.data_classes.data_modeling.streams import Stream, StreamApply, StreamList +from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations from cognite.client.utils._identifier import Identifier @@ -146,16 +146,16 @@ def list(self, limit: int | None = DEFAULT_LIMIT_READ) -> StreamList: ) @overload - def apply(self, streams: Sequence[StreamApply]) -> StreamList: ... + def create(self, streams: Sequence[StreamWrite]) -> StreamList: ... @overload - def apply(self, streams: StreamApply) -> Stream: ... + def create(self, streams: StreamWrite) -> Stream: ... - def apply(self, streams: StreamApply | Sequence[StreamApply]) -> Stream | StreamList: - """`Create or patch one or more streams. `_ + def create(self, streams: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList: + """`Create one or more streams. `_ Args: - streams (StreamApply | Sequence[StreamApply]): Stream | Sequence[Stream]): Stream or streams of streamsda to create or update. + streams (StreamWrite | Sequence[StreamWrite]): Stream | Sequence[Stream]): Stream or streams of streamsda to create or update. Returns: Stream | StreamList: Created stream(s) @@ -165,16 +165,16 @@ def apply(self, streams: StreamApply | Sequence[StreamApply]) -> Stream | Stream Create new streams: >>> from cognite.client import CogniteClient - >>> from cognite.client.data_classes.data_modeling.streams import StreamApply + >>> from cognite.client.data_classes.data_modeling.streams import StreamWrite >>> client = CogniteClient() - >>> streams = [StreamApply(stream="myStream", description="My first stream", name="My Stream"), - ... StreamApply(stream="myOtherStream", description="My second stream", name="My Other Stream")] - >>> res = client.data_modeling.streams.apply(streams) + >>> streams = [StreamWrite(stream="myStream", description="My first stream", name="My Stream"), + ... StreamWrite(stream="myOtherStream", description="My second stream", name="My Other Stream")] + >>> res = client.data_modeling.streams.create(streams) """ return self._create_multiple( list_cls=StreamList, resource_cls=Stream, items=streams, - input_resource_cls=StreamApply, + input_resource_cls=StreamWrite, headers=self.__alpha_headers, ) diff --git a/cognite/client/data_classes/data_modeling/streams.py b/cognite/client/data_classes/data_modeling/streams.py index c0204b06ac..f6ff31cf86 100644 --- a/cognite/client/data_classes/data_modeling/streams.py +++ b/cognite/client/data_classes/data_modeling/streams.py @@ -61,7 +61,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: return dumped -class StreamApply(WriteableCogniteResource): +class StreamWrite(WriteableCogniteResource): """A stream of records. This is the write version. Args: @@ -97,7 +97,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: return convert_all_keys_to_camel_case_recursive(dumped) return dumped - def as_write(self) -> StreamApply: + def as_write(self) -> StreamWrite: """Returns this SpaceApply instance.""" return self @@ -117,13 +117,13 @@ def __init__( self.type = type self.created_from_template = created_from_template - def as_apply(self) -> StreamApply: - return StreamApply( + def as_apply(self) -> StreamWrite: + return StreamWrite( external_id=self.external_id, settings=StreamSettings(template=StreamTemplate(name=self.created_from_template)), ) - def as_write(self) -> StreamApply: + def as_write(self) -> StreamWrite: return self.as_apply() @classmethod @@ -147,8 +147,8 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: return dumped -class StreamApplyList(CogniteResourceList[StreamApply]): - _RESOURCE = StreamApply +class StreamApplyList(CogniteResourceList[StreamWrite]): + _RESOURCE = StreamWrite def as_ids(self) -> list[str]: """ @@ -160,7 +160,7 @@ def as_ids(self) -> list[str]: return [item.external_id for item in self] -class StreamList(WriteableCogniteResourceList[StreamApply, Stream]): +class StreamList(WriteableCogniteResourceList[StreamWrite, Stream]): """A list of Stream objects.""" _RESOURCE = Stream diff --git a/tests/tests_integration/test_api/test_data_modeling/conftest.py b/tests/tests_integration/test_api/test_data_modeling/conftest.py index 3002030357..7b1f59fac6 100644 --- a/tests/tests_integration/test_api/test_data_modeling/conftest.py +++ b/tests/tests_integration/test_api/test_data_modeling/conftest.py @@ -31,7 +31,7 @@ Node, NodeApplyList, ) -from cognite.client.data_classes.data_modeling.streams import Stream, StreamApply, StreamSettings, StreamTemplate +from cognite.client.data_classes.data_modeling.streams import Stream, StreamSettings, StreamTemplate, StreamWrite RESOURCES = Path(__file__).parent / "resources" @@ -594,8 +594,8 @@ def persisted_stream(cognite_client: CogniteClient) -> Stream: external_id = "python-sdk-test-stream-persistent" stream = cognite_client.data_modeling.streams.retrieve(external_id=external_id) if stream is None: - stream = cognite_client.data_modeling.streams.apply( - StreamApply( + stream = cognite_client.data_modeling.streams.create( + StreamWrite( external_id=external_id, settings=StreamSettings(template=StreamTemplate(name="MutableTestStream")) ) ) diff --git a/tests/tests_integration/test_api/test_data_modeling/test_streams.py b/tests/tests_integration/test_api/test_data_modeling/test_streams.py index 2971e1e52c..43e45dab4b 100644 --- a/tests/tests_integration/test_api/test_data_modeling/test_streams.py +++ b/tests/tests_integration/test_api/test_data_modeling/test_streams.py @@ -4,7 +4,7 @@ import pytest from cognite.client import CogniteClient -from cognite.client.data_classes.data_modeling.streams import Stream, StreamApply, StreamSettings, StreamTemplate +from cognite.client.data_classes.data_modeling.streams import Stream, StreamSettings, StreamTemplate, StreamWrite from cognite.client.utils._text import random_string if os.environ["COGNITE_PROJECT"] != "erlend-test": @@ -25,8 +25,8 @@ def test_retrieve(self, cognite_client: CogniteClient, persisted_stream: Stream) def test_delete(self, cognite_client: CogniteClient, persisted_stream: Stream) -> None: external_id = f"python-sdk-test-stream-{random_string(10, string.ascii_lowercase)}" - cognite_client.data_modeling.streams.apply( - StreamApply( + cognite_client.data_modeling.streams.create( + StreamWrite( external_id=external_id, settings=StreamSettings(template=StreamTemplate(name="MutableTestStream")), )