Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.


## [7.81.0] - 2025-08-14
### Added
- [alpha] Alpha support for Streams & Records APIs. Note that both the SDK and API implementation may be changed at any time.

## [7.80.1] - 2025-08-14
### Fixed
- Make CogniteAPIError.response_code non-nullable again, addressing a regression introduced in the previous version.
Expand Down
10 changes: 4 additions & 6 deletions cognite/client/_api/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from cognite.client._api_client import APIClient
from cognite.client.data_classes.agents import Agent, AgentList, AgentUpsert
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations
from cognite.client.utils._identifier import IdentifierSequence
from cognite.client.utils.useful_types import SequenceNotStr

Expand All @@ -14,12 +14,14 @@
from cognite.client.config import ClientConfig


@warn_on_all_method_invocations(
FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Agents")
)
class AgentsAPI(APIClient):
_RESOURCE_PATH = "/ai/agents"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self._warnings = FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Agents")
self._api_subversion = "alpha"
self._CREATE_LIMIT = 1
self._DELETE_LIMIT = 1
Expand Down Expand Up @@ -150,7 +152,6 @@ def upsert(self, agents: AgentUpsert | Sequence[AgentUpsert]) -> Agent | AgentLi


"""
self._warnings.warn()
return self._create_multiple(
list_cls=AgentList,
resource_cls=Agent,
Expand Down Expand Up @@ -188,7 +189,6 @@ def retrieve(

>>> res = client.agents.retrieve(external_ids=["my_agent_1", "my_agent_2"])
"""
self._warnings.warn()
identifiers = IdentifierSequence.load(external_ids=external_ids)
return self._retrieve_multiple(
list_cls=AgentList,
Expand All @@ -213,7 +213,6 @@ def delete(self, external_ids: str | SequenceNotStr[str], ignore_unknown_ids: bo
>>> client.agents.delete(external_ids="my_agent")

"""
self._warnings.warn()
self._delete_multiple(
identifiers=IdentifierSequence.load(external_ids=external_ids),
wrap_ids=True,
Expand All @@ -235,6 +234,5 @@ def list(self) -> AgentList: # The API does not yet support limit or pagination
>>> agent_list = client.agents.list()

"""
self._warnings.warn()
res = self._get(url_path=self._RESOURCE_PATH)
return AgentList._load(res.json()["items"], cognite_client=self._cognite_client)
4 changes: 4 additions & 0 deletions cognite/client/_api/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
from cognite.client._api.data_modeling.data_models import DataModelsAPI
from cognite.client._api.data_modeling.graphql import DataModelingGraphQLAPI
from cognite.client._api.data_modeling.instances import InstancesAPI
from cognite.client._api.data_modeling.records import RecordsAPI
from cognite.client._api.data_modeling.spaces import SpacesAPI
from cognite.client._api.data_modeling.statistics import StatisticsAPI
from cognite.client._api.data_modeling.streams import StreamsAPI
from cognite.client._api.data_modeling.views import ViewsAPI
from cognite.client._api_client import APIClient

Expand All @@ -26,3 +28,5 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
self.instances = InstancesAPI(config, api_version, cognite_client)
self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client)
self.statistics = StatisticsAPI(config, api_version, cognite_client)
self.streams = StreamsAPI(config, api_version, cognite_client)
self.records = RecordsAPI(config, api_version, cognite_client)
87 changes: 87 additions & 0 deletions cognite/client/_api/data_modeling/records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.data_classes.data_modeling.records import (
LastUpdatedRange,
RecordId,
RecordIngest,
RecordList,
RecordListWithCursor,
)
from cognite.client.data_classes.filters import Filter
from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations

if TYPE_CHECKING:
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig


@warn_on_all_method_invocations(
FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Records API")
)
class RecordsAPI(APIClient):
_RESOURCE_PATH = "/streams/{}/records"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self.__alpha_headers = {
"cdf-version": "alpha",
}

def ingest(self, stream: str, records: Sequence[RecordIngest]) -> None:
body = {"items": [record.dump(camel_case=True) for record in records]}
self._post(url_path=self._RESOURCE_PATH.format(stream), json=body, headers=self.__alpha_headers)

def upsert(self, stream: str, records: Sequence[RecordIngest]) -> None:
body = {"items": [record.dump(camel_case=True) for record in records]}
self._post(url_path=self._RESOURCE_PATH.format(stream) + "/upsert", json=body, headers=self.__alpha_headers)

def delete(self, stream: str, ids: RecordId | Sequence[RecordId]) -> None:
items = ids if isinstance(ids, Sequence) else [ids]
body = {"items": [item.dump(camel_case=True) for item in items]}
self._post(url_path=self._RESOURCE_PATH.format(stream) + "/delete", json=body, headers=self.__alpha_headers)

def filter(
self,
stream: str,
*,
last_updated_time: LastUpdatedRange | None = None,
filter: Filter | None = None,
limit: int | None = DEFAULT_LIMIT_READ,
) -> RecordList:
body: dict[str, Any] = {}
if last_updated_time is not None:
body["lastUpdatedTime"] = last_updated_time.dump()
if filter is not None:
body["filter"] = filter.dump()
body["limit"] = limit
res = self._post(
url_path=self._RESOURCE_PATH.format(stream) + "/filter", json=body, headers=self.__alpha_headers
)
return RecordList._load(res.json()["items"], cognite_client=self._cognite_client)

def sync(
self,
stream: str,
*,
filter: Filter | None = None,
cursor: str | None = None,
initialize_cursor: str | None = None,
limit: int | None = DEFAULT_LIMIT_READ,
) -> RecordListWithCursor:
body: dict = {}
if filter is not None:
body["filter"] = filter.dump()
if cursor is not None:
body["cursor"] = cursor
if initialize_cursor is not None:
body["initializeCursor"] = initialize_cursor
body["limit"] = limit
res = self._post(url_path=self._RESOURCE_PATH.format(stream) + "/sync", json=body, headers=self.__alpha_headers)
payload = res.json()
items = RecordList._load(payload["items"], cognite_client=self._cognite_client)
return RecordListWithCursor(list(items), cursor=payload.get("nextCursor"), has_next=payload.get("hasNext"))
180 changes: 180 additions & 0 deletions cognite/client/_api/data_modeling/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
from __future__ import annotations

from collections.abc import Iterator, Sequence
from typing import TYPE_CHECKING, overload

from cognite.client._api_client import APIClient
from cognite.client._constants import DEFAULT_LIMIT_READ
from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite
from cognite.client.utils._experimental import FeaturePreviewWarning, warn_on_all_method_invocations
from cognite.client.utils._identifier import Identifier

if TYPE_CHECKING:
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig


@warn_on_all_method_invocations(
FeaturePreviewWarning(api_maturity="alpha", sdk_maturity="alpha", feature_name="Records API")
)
class StreamsAPI(APIClient):
_RESOURCE_PATH = "/streams"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: CogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self._CREATE_LIMIT = 1
self.__alpha_headers = {
"cdf-version": "alpha",
}

@overload
def __call__(
self,
chunk_size: None = None,
limit: int | None = None,
) -> Iterator[Stream]: ...

@overload
def __call__(
self,
chunk_size: int,
limit: int | None = None,
) -> Iterator[StreamList]: ...

def __call__(
self,
chunk_size: int | None = None,
limit: int | None = None,
) -> Iterator[Stream] | Iterator[StreamList]:
"""Iterate over streams

Fetches streams as they are iterated over, so you keep a limited number of streams in memory.

Args:
chunk_size (int | None): Number of streams to return in each chunk. Defaults to yielding one stream a time.
limit (int | None): Maximum number of streams to return. Defaults to returning all items.

Returns:
Iterator[Stream] | Iterator[StreamList]: yields Stream one by one if chunk_size is not specified, else StreamList objects.
"""
return self._list_generator(
list_cls=StreamList,
resource_cls=Stream,
method="GET",
chunk_size=chunk_size,
limit=limit,
headers=self.__alpha_headers,
)

def __iter__(self) -> Iterator[Stream]:
"""Iterate over streams

Fetches streams as they are iterated over, so you keep a limited number of streams in memory.

Returns:
Iterator[Stream]: yields Streams one by one.
"""
return self()

def retrieve(self, external_id: str) -> Stream | None:
"""`Retrieve a stream. <https://developer.cognite.com/api#tag/Streams/operation/byStreamIdsStreams>`_

Args:
external_id (str): No description.

Returns:
Stream | None: Requested stream or None if it does not exist.

Examples:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> res = client.data_modeling.streams.retrieve(streams='myStream')

Get multiple streams by id:

>>> res = client.data_modeling.streams.retrieve(streams=["MyStream", "MyAwesomeStream", "MyOtherStream"])

"""
identifier = Identifier.load(external_id=external_id)
return self._retrieve(identifier=identifier, cls=Stream, headers=self.__alpha_headers)

def delete(self, external_id: str) -> None:
"""`Delete one or more streams <https://developer.cognite.com/api#tag/Streams/operation/deleteStreamsV3>`_

Args:
external_id (str): ID of streams.
Examples:

Delete streams by id:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> client.data_modeling.streams.delete(streams=["myStream", "myOtherStream"])
"""
self._delete(url_path=f"{self._RESOURCE_PATH}/{external_id}", headers=self.__alpha_headers)

def list(self, limit: int | None = DEFAULT_LIMIT_READ) -> StreamList:
"""`List streams <https://developer.cognite.com/api#tag/Streams/operation/listStreamsV3>`_

Args:
limit (int | None): Maximum number of streams to return. Defaults to 10. Set to -1, float("inf") or None to return all items.

Returns:
StreamList: List of requested streams

Examples:

List streams and filter on max start time:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> stream_list = client.data_modeling.streams.list(limit=5)

Iterate over streams:

>>> for stream in client.data_modeling.streams:
... stream # do something with the stream

Iterate over chunks of streams to reduce memory load:

>>> for stream_list in client.data_modeling.streams(chunk_size=2500):
... stream_list # do something with the streams
"""
return self._list(
list_cls=StreamList, resource_cls=Stream, method="GET", limit=limit, headers=self.__alpha_headers
)

@overload
def create(self, streams: Sequence[StreamWrite]) -> StreamList: ...

@overload
def create(self, streams: StreamWrite) -> Stream: ...

def create(self, streams: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList:
"""`Create one or more streams. <https://developer.cognite.com/api#tag/Streams/operation/ApplyStreams>`_

Args:
streams (StreamWrite | Sequence[StreamWrite]): Stream | Sequence[Stream]): Stream or streams of streamsda to create or update.

Returns:
Stream | StreamList: Created stream(s)

Examples:

Create new streams:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.data_modeling.streams import StreamWrite
>>> client = CogniteClient()
>>> streams = [StreamWrite(stream="myStream", description="My first stream", name="My Stream"),
... StreamWrite(stream="myOtherStream", description="My second stream", name="My Other Stream")]
>>> res = client.data_modeling.streams.create(streams)
"""
return self._create_multiple(
list_cls=StreamList,
resource_cls=Stream,
items=streams,
input_resource_cls=StreamWrite,
headers=self.__alpha_headers,
)
Loading