Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cognite/client/_api/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cognite.client._api.data_modeling.instances import InstancesAPI
from cognite.client._api.data_modeling.spaces import SpacesAPI
from cognite.client._api.data_modeling.statistics import StatisticsAPI
from cognite.client._api.data_modeling.streams import StreamsAPI
from cognite.client._api.data_modeling.views import ViewsAPI
from cognite.client._api_client import APIClient

Expand All @@ -27,6 +28,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
self.instances = InstancesAPI(config, api_version, cognite_client)
self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client)
self.statistics = StatisticsAPI(config, api_version, cognite_client)
self.streams = StreamsAPI(config, api_version, cognite_client)

def _get_semaphore(
self, operation: Literal["read", "write", "delete", "search", "read_schema", "write_schema"]
Expand Down
160 changes: 160 additions & 0 deletions cognite/client/_api/data_modeling/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Literal

from cognite.client._api_client import APIClient
from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite
from cognite.client.exceptions import CogniteAPIError
from cognite.client.utils._experimental import FeaturePreviewWarning

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient
from cognite.client.config import ClientConfig


class StreamsAPI(APIClient):
_RESOURCE_PATH = "/streams"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self._CREATE_LIMIT = 1
self._warning = FeaturePreviewWarning(
api_maturity="General Availability", sdk_maturity="alpha", feature_name="Streams API"
)

def _get_semaphore(self, operation: Literal["read", "write", "delete"]) -> asyncio.BoundedSemaphore:
from cognite.client import global_config

assert operation not in ("read_schema", "write_schema"), "Streams API should not use schema semaphores"
return global_config.concurrency_settings.data_modeling._semaphore_factory(
operation, project=self._cognite_client.config.project
)

async def retrieve(self, external_id: str, include_statistics: bool = False) -> Stream | None:
"""`Retrieve a stream by external ID. <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_

Args:
external_id (str): Stream external ID
include_statistics (bool): If set to True, usage statistics will be returned together with stream settings. Defaults to False.

Returns:
Stream | None: Requested stream or None if it does not exist.

Examples:

Retrieve a single stream by external id:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> res = client.data_modeling.streams.retrieve(external_id="my_stream")

Retrieve a stream with statistics:

>>> res = client.data_modeling.streams.retrieve(
... external_id="my_stream", include_statistics=True
... )

"""
self._warning.warn()
params = {"includeStatistics": "true"} if include_statistics else None
try:
result = await self._get(
url_path=f"{self._RESOURCE_PATH}/{external_id}",
params=params,
semaphore=self._get_semaphore("read"),
)
return Stream._load(result.json())
except CogniteAPIError as e:
if e.code == 404:
return None
raise

async def delete(self, external_id: str) -> None:
"""`Delete a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_

Args:
external_id (str): External ID of stream.

Examples:

Delete a stream by external id:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> client.data_modeling.streams.delete(external_id="my_stream")
Comment on lines +84 to +87
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The code example for delete demonstrates synchronous client usage, but this is an asynchronous method. The example should be updated to use AsyncCogniteClient and await.

Suggested change
>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> client.data_modeling.streams.delete(external_id="my_stream")
>>> from cognite.client import AsyncCogniteClient
>>> async_client = AsyncCogniteClient()
>>> await async_client.data_modeling.streams.delete(external_id="my_stream")

"""
self._warning.warn()
await self._post(
url_path=f"{self._RESOURCE_PATH}/delete",
json={"items": [{"externalId": external_id}]},
semaphore=self._get_semaphore("write"),
)

async def list(self) -> StreamList:
"""`List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_

Returns all streams available in the project.

Returns:
StreamList: List of all streams in the project

Examples:

List all streams:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> stream_list = client.data_modeling.streams.list()
Comment on lines +108 to +111
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The code example for list demonstrates synchronous client usage, but this is an asynchronous method. The example should be updated to use AsyncCogniteClient and await.

Suggested change
>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> stream_list = client.data_modeling.streams.list()
>>> from cognite.client import AsyncCogniteClient
>>> async_client = AsyncCogniteClient()
>>> stream_list = await async_client.data_modeling.streams.list()


Iterate over the returned list:

>>> for stream in stream_list:
... stream # do something with the stream
"""
self._warning.warn()
return await self._list(
list_cls=StreamList,
resource_cls=Stream,
method="GET",
limit=None,
override_semaphore=self._get_semaphore("read"),
)

async def create(self, stream: StreamWrite) -> Stream:
"""`Create a stream. <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_

Args:
stream (StreamWrite): Stream to create.

Returns:
Stream: Created stream

Examples:

Create a new stream:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> from cognite.client.data_classes.data_modeling import (
... StreamWrite,
... StreamWriteSettings,
... )
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> stream = StreamWrite(
... external_id="my_stream",
... settings=StreamWriteSettings(template_name="ImmutableTestStream"),
... )
>>> res = client.data_modeling.streams.create(stream)
Comment on lines +140 to +151
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The code example for create demonstrates synchronous client usage, but this is an asynchronous method. The example should be updated to use AsyncCogniteClient and await.

Suggested change
>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> from cognite.client.data_classes.data_modeling import StreamWrite, StreamWriteSettings
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> stream = StreamWrite(
... external_id="my_stream",
... settings=StreamWriteSettings(template_name="ImmutableTestStream")
... )
>>> res = client.data_modeling.streams.create(stream)
>>> from cognite.client import AsyncCogniteClient
>>> from cognite.client.data_classes.data_modeling import StreamWrite, StreamWriteSettings
>>> async_client = AsyncCogniteClient()
>>> stream = StreamWrite(
... external_id="my_stream",
... settings=StreamWriteSettings(template_name="ImmutableTestStream")
... )
>>> res = await async_client.data_modeling.streams.create(stream)

"""
self._warning.warn()
return await self._create_multiple(
list_cls=StreamList,
resource_cls=Stream,
items=stream,
input_resource_cls=StreamWrite,
override_semaphore=self._get_semaphore("write"),
)
5 changes: 3 additions & 2 deletions cognite/client/_sync_api/data_modeling/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
===============================================================================
c76b2b9351d2a5eee6a710fa9893bfa4
584030bc5e2a4b8168f54c101f7f521d
This file is auto-generated from the Async API modules, - do not edit manually!
===============================================================================
"""
Expand All @@ -9,13 +9,13 @@

from typing import TYPE_CHECKING

from cognite.client import AsyncCogniteClient
from cognite.client._sync_api.data_modeling.containers import SyncContainersAPI
from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI
from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI
from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI
from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI
from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI
from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI
from cognite.client._sync_api.data_modeling.views import SyncViewsAPI
from cognite.client._sync_api_client import SyncAPIClient

Expand All @@ -35,3 +35,4 @@ def __init__(self, async_client: AsyncCogniteClient) -> None:
self.instances = SyncInstancesAPI(async_client)
self.graphql = SyncDataModelingGraphQLAPI(async_client)
self.statistics = SyncStatisticsAPI(async_client)
self.streams = SyncStreamsAPI(async_client)
129 changes: 129 additions & 0 deletions cognite/client/_sync_api/data_modeling/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""
===============================================================================
d5cc9b88dcf564328fbb78370c32cab7
This file is auto-generated from the Async API modules, - do not edit manually!
===============================================================================
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from cognite.client import AsyncCogniteClient
from cognite.client._sync_api_client import SyncAPIClient
from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite
from cognite.client.utils._async_helpers import run_sync

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient


class SyncStreamsAPI(SyncAPIClient):
"""Auto-generated, do not modify manually."""

def __init__(self, async_client: AsyncCogniteClient) -> None:
self.__async_client = async_client

def retrieve(self, external_id: str, include_statistics: bool = False) -> Stream | None:
"""
`Retrieve a stream by external ID. <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_

Args:
external_id (str): Stream external ID
include_statistics (bool): If set to True, usage statistics will be returned together with stream settings. Defaults to False.

Returns:
Stream | None: Requested stream or None if it does not exist.

Examples:

Retrieve a single stream by external id:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> res = client.data_modeling.streams.retrieve(external_id="my_stream")

Retrieve a stream with statistics:

>>> res = client.data_modeling.streams.retrieve(
... external_id="my_stream", include_statistics=True
... )
"""
return run_sync(
self.__async_client.data_modeling.streams.retrieve(
external_id=external_id, include_statistics=include_statistics
)
)

def delete(self, external_id: str) -> None:
"""
`Delete a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_

Args:
external_id (str): External ID of stream.

Examples:

Delete a stream by external id:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> client.data_modeling.streams.delete(external_id="my_stream")
"""
return run_sync(self.__async_client.data_modeling.streams.delete(external_id=external_id))

def list(self) -> StreamList:
"""
`List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_

Returns all streams available in the project.

Returns:
StreamList: List of all streams in the project

Examples:

List all streams:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> stream_list = client.data_modeling.streams.list()

Iterate over the returned list:

>>> for stream in stream_list:
... stream # do something with the stream
"""
return run_sync(self.__async_client.data_modeling.streams.list())

def create(self, stream: StreamWrite) -> Stream:
"""
`Create a stream. <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_

Args:
stream (StreamWrite): Stream to create.

Returns:
Stream: Created stream

Examples:

Create a new stream:

>>> from cognite.client import CogniteClient, AsyncCogniteClient
>>> from cognite.client.data_classes.data_modeling import (
... StreamWrite,
... StreamWriteSettings,
... )
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> stream = StreamWrite(
... external_id="my_stream",
... settings=StreamWriteSettings(template_name="ImmutableTestStream"),
... )
>>> res = client.data_modeling.streams.create(stream)
"""
return run_sync(self.__async_client.data_modeling.streams.create(stream=stream))
20 changes: 20 additions & 0 deletions cognite/client/data_classes/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@
UnionAll,
)
from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList
from cognite.client.data_classes.data_modeling.streams import (
Stream,
StreamLifecycleSettings,
StreamLimit,
StreamLimitSettings,
StreamList,
StreamSettings,
StreamWrite,
StreamWriteList,
StreamWriteSettings,
)
from cognite.client.data_classes.data_modeling.sync import SubscriptionContext
from cognite.client.data_classes.data_modeling.views import (
ConnectionDefinition,
Expand Down Expand Up @@ -233,6 +244,15 @@
"SpaceApply",
"SpaceApplyList",
"SpaceList",
"Stream",
"StreamLifecycleSettings",
"StreamLimit",
"StreamLimitSettings",
"StreamList",
"StreamSettings",
"StreamWrite",
"StreamWriteList",
"StreamWriteSettings",
"SubscriptionContext",
"Text",
"TimeSeriesReference",
Expand Down
Loading
Loading