Skip to content
96 changes: 96 additions & 0 deletions cognite/client/_api/streams/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from __future__ import annotations

from collections.abc import MutableSequence, Sequence
from typing import TYPE_CHECKING, Any

from cognite.client._api.streams.records import StreamsRecordsAPI
from cognite.client._api_client import APIClient
from cognite.client.data_classes.streams.stream import (
Stream,
StreamDeleteItem,
StreamList,
StreamWrite,
)
from cognite.client.utils._url import interpolate_and_url_encode

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient
from cognite.client.config import ClientConfig


def _dump_write_item(obj: StreamWrite | dict[str, Any]) -> dict[str, Any]:
if isinstance(obj, dict):
return obj
return obj.dump()


def _dump_delete_item(obj: StreamDeleteItem | dict[str, Any]) -> dict[str, Any]:
if isinstance(obj, dict):
return obj
return obj.dump()


class StreamsAPI(APIClient):
"""ILA Streams API (``/streams``) and nested :class:`StreamsRecordsAPI` (``/streams/{id}/records``)."""

_RESOURCE_PATH = "/streams"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self.records = StreamsRecordsAPI(config, api_version, cognite_client)

async def create(self, items: Sequence[StreamWrite | dict[str, Any]]) -> StreamList:
"""`Create streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_.

The API accepts **exactly one** stream per request. Pass a single-element sequence.
Stream creation is rate-limited; avoid issuing many create calls in a tight loop.
"""
if len(items) != 1:
raise ValueError("ILA create stream accepts exactly one item; see API documentation.")
res = await self._post(
self._RESOURCE_PATH,
json={"items": [_dump_write_item(i) for i in items]},
semaphore=self._get_semaphore("write"),
)
return StreamList._load(res.json()["items"])

async def list(self) -> StreamList:
"""`List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_ in the project.

There is no paging limit parameter: the endpoint returns all streams in the project
(projects are expected to have few streams).
"""
res = await self._get(url_path=self._RESOURCE_PATH, semaphore=self._get_semaphore("read"))
return StreamList._load(res.json()["items"])

async def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream:
"""`Retrieve a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_.

Args:
stream_external_id (str): Stream external id.
include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing
statistics can be expensive; the list endpoint does not offer this flag for that reason.

Returns:
Stream: The stream metadata (and optionally statistics).
"""
path = interpolate_and_url_encode(f"{self._RESOURCE_PATH}/{{}}", stream_external_id)
params: dict[str, Any] | None = None
if include_statistics is not None:
params = {"includeStatistics": "true" if include_statistics else "false"}
res = await self._get(url_path=path, params=params, semaphore=self._get_semaphore("read"))
return Stream._load(res.json())

async def delete(self, items: MutableSequence[StreamDeleteItem | dict[str, Any]]) -> None:
"""`Delete streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_ (POST).

The API accepts **exactly one** stream per request. Deletion is soft-delete and retains
capacity for an extended period; prefer deleting only when necessary.
"""
if len(items) != 1:
raise ValueError("ILA delete stream accepts exactly one item; see API documentation.")
await self._post(
f"{self._RESOURCE_PATH}/delete",
json={"items": [_dump_delete_item(i) for i in items]},
semaphore=self._get_semaphore("write"),
)
106 changes: 106 additions & 0 deletions cognite/client/_api/streams/records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from __future__ import annotations

from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any

from cognite.client._api_client import APIClient
from cognite.client.data_classes.streams.stream_record import (
RecordsAggregateResponse,
RecordsDeleteResponse,
RecordsFilterResponse,
RecordsIngestResponse,
RecordsSyncResponse,
)
from cognite.client.utils._url import interpolate_and_url_encode

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient
from cognite.client.config import ClientConfig


class StreamsRecordsAPI(APIClient):
"""ILA record operations under ``/streams/{streamId}/records/...``."""

_RESOURCE_PATH = "/streams"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None:
super().__init__(config, api_version, cognite_client)

def _records_base(self, stream_external_id: str) -> str:
return interpolate_and_url_encode("/streams/{}/records", stream_external_id)

async def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse:
"""`Ingest records <https://api-docs.cognite.com/20230101/tag/Records/operation/ingestRecords>`_ into a stream."""
res = await self._post(
self._records_base(stream_external_id), json=body, semaphore=self._get_semaphore("write")
)
return RecordsIngestResponse._load(res.json())

async def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse:
"""`Upsert records <https://api-docs.cognite.com/20230101/tag/Records/operation/upsertRecords>`_ in a mutable stream."""
res = await self._post(
self._records_base(stream_external_id) + "/upsert", json=body, semaphore=self._get_semaphore("write")
)
return RecordsIngestResponse._load(res.json())

async def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse:
"""`Delete records <https://api-docs.cognite.com/20230101/tag/Records/operation/deleteRecords>`_ from a mutable stream."""
res = await self._post(
self._records_base(stream_external_id) + "/delete", json=body, semaphore=self._get_semaphore("write")
)
return RecordsDeleteResponse._load(res.json())

async def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse:
"""`Filter records <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_."""
res = await self._post(
self._records_base(stream_external_id) + "/filter", json=body, semaphore=self._get_semaphore("read")
)
return RecordsFilterResponse._load(res.json())

async def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse:
"""`Aggregate over records <https://api-docs.cognite.com/20230101/tag/Records/operation/aggregateRecords>`_."""
res = await self._post(
self._records_base(stream_external_id) + "/aggregate", json=body, semaphore=self._get_semaphore("read")
)
return RecordsAggregateResponse._load(res.json())

async def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse:
"""`Sync records <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_ (cursor-based read)."""
res = await self._post(
self._records_base(stream_external_id) + "/sync", json=body, semaphore=self._get_semaphore("read")
)
return RecordsSyncResponse._load(res.json())

async def ingest_items(
self,
stream_external_id: str,
items: Sequence[Mapping[str, Any]],
) -> RecordsIngestResponse:
"""Ingest records using the ``items`` array shape (1-1000 records per request).

Each element must match the API ``recordItems`` object (``space``, ``externalId``, ``sources``, ...).
This is a thin wrapper around :meth:`ingest` that builds ``{"items": [...]}``.
"""
if not items:
raise ValueError("ingest_items requires at least one record (API allows 1-1000 items per call).")
return await self.ingest(stream_external_id, {"items": [dict(i) for i in items]})

async def upsert_items(
self,
stream_external_id: str,
items: Sequence[Mapping[str, Any]],
) -> RecordsIngestResponse:
"""Upsert records using the ``items`` array (mutable streams). Same shape as :meth:`ingest_items`."""
if not items:
raise ValueError("upsert_items requires at least one record (API allows 1-1000 items per call).")
return await self.upsert(stream_external_id, {"items": [dict(i) for i in items]})

async def delete_items(
self,
stream_external_id: str,
items: Sequence[Mapping[str, Any]],
) -> RecordsDeleteResponse:
"""Delete records by identifier (``space`` + ``externalId`` per item). Wrapper for :meth:`delete`."""
if not items:
raise ValueError("delete_items requires at least one record identifier.")
return await self.delete(stream_external_id, {"items": [dict(i) for i in items]})
2 changes: 2 additions & 0 deletions cognite/client/_cognite_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from cognite.client._api.relationships import RelationshipsAPI
from cognite.client._api.sequences import SequencesAPI
from cognite.client._api.simulators import SimulatorsAPI
from cognite.client._api.streams import StreamsAPI
from cognite.client._api.three_d import ThreeDAPI
from cognite.client._api.time_series import TimeSeriesAPI
from cognite.client._api.transformations import TransformationsAPI
Expand Down Expand Up @@ -92,6 +93,7 @@ def __init__(self, config: ClientConfig | None = None) -> None:
self.workflows = WorkflowAPI(self._config, self._API_VERSION, self)
self.units = UnitAPI(self._config, self._API_VERSION, self)
self.simulators = SimulatorsAPI(self._config, self._API_VERSION, self)
self.streams = StreamsAPI(self._config, self._API_VERSION, self)
# APIs just using base_url:
self._api_client = APIClient(self._config, api_version=None, cognite_client=self)

Expand Down
70 changes: 70 additions & 0 deletions cognite/client/_sync_api/streams/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
===============================================================================
f780e4edf88535a6df4726a0958d12a7
This file is auto-generated from the Async API modules, - do not edit manually!
===============================================================================
"""

from __future__ import annotations

from collections.abc import MutableSequence, Sequence
from typing import Any

from cognite.client import AsyncCogniteClient
from cognite.client._sync_api.streams.records import SyncStreamsRecordsAPI
from cognite.client._sync_api_client import SyncAPIClient
from cognite.client.data_classes.streams.stream import Stream, StreamDeleteItem, StreamList, StreamWrite
from cognite.client.utils._async_helpers import run_sync


class SyncStreamsAPI(SyncAPIClient):
"""Auto-generated, do not modify manually."""

def __init__(self, async_client: AsyncCogniteClient) -> None:
self.__async_client = async_client
self.records = SyncStreamsRecordsAPI(async_client)

def create(self, items: Sequence[StreamWrite | dict[str, Any]]) -> StreamList:
"""
`Create streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_.

The API accepts **exactly one** stream per request. Pass a single-element sequence.
Stream creation is rate-limited; avoid issuing many create calls in a tight loop.
"""
return run_sync(self.__async_client.streams.create(items=items))

def list(self) -> StreamList:
"""
`List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_ in the project.

There is no paging limit parameter: the endpoint returns all streams in the project
(projects are expected to have few streams).
"""
return run_sync(self.__async_client.streams.list())

def retrieve(self, stream_external_id: str, include_statistics: bool | None = None) -> Stream:
"""
`Retrieve a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_.

Args:
stream_external_id (str): Stream external id.
include_statistics (bool | None): When ``True``, the response may include **statistics**. Computing
statistics can be expensive; the list endpoint does not offer this flag for that reason.

Returns:
Stream: The stream metadata (and optionally statistics).
"""
return run_sync(
self.__async_client.streams.retrieve(
stream_external_id=stream_external_id, include_statistics=include_statistics
)
)

def delete(self, items: MutableSequence[StreamDeleteItem | dict[str, Any]]) -> None:
"""
`Delete streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_ (POST).

The API accepts **exactly one** stream per request. Deletion is soft-delete and retains
capacity for an extended period; prefer deleting only when necessary.
"""
return run_sync(self.__async_client.streams.delete(items=items))
95 changes: 95 additions & 0 deletions cognite/client/_sync_api/streams/records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
===============================================================================
a903430692f15dfd71a9a3334ad18209
This file is auto-generated from the Async API modules, - do not edit manually!
===============================================================================
"""

from __future__ import annotations

from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any

from cognite.client import AsyncCogniteClient
from cognite.client._sync_api_client import SyncAPIClient
from cognite.client.data_classes.streams.stream_record import (
RecordsAggregateResponse,
RecordsDeleteResponse,
RecordsFilterResponse,
RecordsIngestResponse,
RecordsSyncResponse,
)
from cognite.client.utils._async_helpers import run_sync

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient


class SyncStreamsRecordsAPI(SyncAPIClient):
"""Auto-generated, do not modify manually."""

def __init__(self, async_client: AsyncCogniteClient) -> None:
self.__async_client = async_client

def ingest(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse:
"""
`Ingest records <https://api-docs.cognite.com/20230101/tag/Records/operation/ingestRecords>`_ into a stream.
"""
return run_sync(self.__async_client.streams.records.ingest(stream_external_id=stream_external_id, body=body))

def upsert(self, stream_external_id: str, body: dict[str, Any]) -> RecordsIngestResponse:
"""
`Upsert records <https://api-docs.cognite.com/20230101/tag/Records/operation/upsertRecords>`_ in a mutable stream.
"""
return run_sync(self.__async_client.streams.records.upsert(stream_external_id=stream_external_id, body=body))

def delete(self, stream_external_id: str, body: dict[str, Any]) -> RecordsDeleteResponse:
"""
`Delete records <https://api-docs.cognite.com/20230101/tag/Records/operation/deleteRecords>`_ from a mutable stream.
"""
return run_sync(self.__async_client.streams.records.delete(stream_external_id=stream_external_id, body=body))

def filter(self, stream_external_id: str, body: dict[str, Any]) -> RecordsFilterResponse:
"""
`Filter records <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_.
"""
return run_sync(self.__async_client.streams.records.filter(stream_external_id=stream_external_id, body=body))

def aggregate(self, stream_external_id: str, body: dict[str, Any]) -> RecordsAggregateResponse:
"""
`Aggregate over records <https://api-docs.cognite.com/20230101/tag/Records/operation/aggregateRecords>`_.
"""
return run_sync(self.__async_client.streams.records.aggregate(stream_external_id=stream_external_id, body=body))

def sync(self, stream_external_id: str, body: dict[str, Any]) -> RecordsSyncResponse:
"""
`Sync records <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_ (cursor-based read).
"""
return run_sync(self.__async_client.streams.records.sync(stream_external_id=stream_external_id, body=body))

def ingest_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsIngestResponse:
"""
Ingest records using the ``items`` array shape (1-1000 records per request).

Each element must match the API ``recordItems`` object (``space``, ``externalId``, ``sources``, ...).
This is a thin wrapper around :meth:`ingest` that builds ``{"items": [...]}``.
"""
return run_sync(
self.__async_client.streams.records.ingest_items(stream_external_id=stream_external_id, items=items)
)

def upsert_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsIngestResponse:
"""
Upsert records using the ``items`` array (mutable streams). Same shape as :meth:`ingest_items`.
"""
return run_sync(
self.__async_client.streams.records.upsert_items(stream_external_id=stream_external_id, items=items)
)

def delete_items(self, stream_external_id: str, items: Sequence[Mapping[str, Any]]) -> RecordsDeleteResponse:
"""
Delete records by identifier (``space`` + ``externalId`` per item). Wrapper for :meth:`delete`.
"""
return run_sync(
self.__async_client.streams.records.delete_items(stream_external_id=stream_external_id, items=items)
)
Loading
Loading