Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib"]
name = "_inner"
name = "_natsrpy_rs"

[dependencies]
async-nats = "0.46"
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ build-backend = "maturin"
[tool.maturin]
bindings = "pyo3"
features = ["pyo3/extension-module"]
module-name = "natsrpy._inner"
module-name = "natsrpy._natsrpy_rs"
python-source = "python"

[tool.mypy]
Expand Down
2 changes: 1 addition & 1 deletion python/natsrpy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from natsrpy._inner import Message, Nats, Subscription
from natsrpy._natsrpy_rs import Message, Nats, Subscription

__all__ = [
"Message",
Expand Down
10 changes: 0 additions & 10 deletions python/natsrpy/_inner/message.pyi

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from datetime import timedelta
from typing import Any

from natsrpy._inner.js import JetStream
from natsrpy._inner.message import Message
from natsrpy._natsrpy_rs.js import JetStream
from natsrpy._natsrpy_rs.message import Message

class Subscription:
def __aiter__(self) -> Subscription: ...
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from natsrpy._inner.js.kv import KeyValue, KVConfig
from natsrpy._inner.js.stream import Stream, StreamConfig
from natsrpy._natsrpy_rs.js.kv import KeyValue, KVConfig
from natsrpy._natsrpy_rs.js.stream import Stream, StreamConfig

class JetStream:
async def publish(
Expand All @@ -20,4 +20,4 @@ class JetStream:
async def create_stream(self, config: StreamConfig) -> Stream: ...
async def update_stream(self, config: StreamConfig) -> Stream: ...
async def get_stream(self, name: str) -> Stream: ...
async def delete_stream(self, name: str) -> Stream: ...
async def delete_stream(self, name: str) -> bool: ...
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from natsrpy._inner.js.stream import Placement, Republish, Source, StorageType
from natsrpy._natsrpy_rs.js.stream import Placement, Republish, Source, StorageType

class KVConfig:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,78 @@ class StreamMessage:
payload: bytes
time: datetime

class StreamState:
messages: int
bytes: int
first_sequence: int
first_timestamp: int
last_sequence: int
last_timestamp: int
consumer_count: int
subjects_count: int
deleted_count: int | None
deleted: list[int] | None

class SourceInfo:
name: str
lag: int
active: timedelta | None
filter_subject: str | None
subject_transform_dest: str | None
subject_transforms: list[SubjectTransform]

class PeerInfo:
name: str
current: bool
active: timedelta
offline: bool
lag: int | None

class ClusterInfo:
name: str | None
raft_group: str | None
leader: str | None
leader_since: int | None
system_account: bool
traffic_account: str | None
replicas: list[PeerInfo]

class StreamInfo:
config: StreamConfig
created: float
state: StreamState
cluster: ClusterInfo | None
mirror: SourceInfo | None
sources: list[SourceInfo]

class Stream:
async def direct_get(self, sequence: int) -> StreamMessage:
"""
Get direct message from a stream.
Get direct message from the stream.

:param sequence: sequence number of the message to get.
:return: Message.
"""

async def get_info(self) -> StreamInfo:
"""
Get information about the stream.

:return: Stream info.
"""

async def purge(
self,
filter: str | None = None,
sequence: int | None = None,
keep: int | None = None,
) -> int:
"""
Purge current stream.

Please note, that this method will throw an error
in case of stream being configured without `allow_direct=True`.
:param filter: filter of subjects to purge, defaults to None
:param sequence: Message sequence to purge up to (inclusive), defaults to None
:param keep: Message count to keep starting from the end of the stream,
defaults to None
:return: number of messages purged
"""
26 changes: 26 additions & 0 deletions python/natsrpy/_natsrpy_rs/message.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Any

class Message:
"""
Simple NATS message.

Attributes:
subject: subject where message was published
reply: subject where reply should be sent, if any
payload: message payload
headers: dictionary of message headers,
every value can be a simple value or a list.
status: status is used for reply messages to indicate the status of the reply.
It is None for regular messages.
description: message description is used for reply messages to
provide additional information about the status.
length: a length of the message payload in bytes.
"""

subject: str
reply: str | None
payload: bytes
headers: dict[str, Any]
status: int | None
description: str | None
length: int
2 changes: 1 addition & 1 deletion python/natsrpy/js/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from natsrpy._inner.js import JetStream
from natsrpy._natsrpy_rs.js import JetStream
from natsrpy.js.kv import KeyValue, KVConfig
from natsrpy.js.stream import (
Compression,
Expand Down
2 changes: 1 addition & 1 deletion python/natsrpy/js/kv.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from natsrpy._inner.js.kv import KeyValue, KVConfig
from natsrpy._natsrpy_rs.js.kv import KeyValue, KVConfig

__all__ = [
"KVConfig",
Expand Down
19 changes: 18 additions & 1 deletion python/natsrpy/js/stream.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,50 @@
from natsrpy._inner.js.stream import (
from natsrpy._natsrpy_rs.js.stream import (
ClusterInfo,
Compression,
ConsumerLimits,
DiscardPolicy,
External,
PeerInfo,
PersistenceMode,
Placement,
Republish,
RetentionPolicy,
Source,
SourceInfo,
StorageType,
Stream,
StreamConfig,
StreamInfo,
StreamMessage,
StreamState,
SubjectTransform,
)

__all__ = [
"ClusterInfo",
"Compression",
"Compression",
"ConsumerLimits",
"ConsumerLimits",
"DiscardPolicy",
"DiscardPolicy",
"External",
"PeerInfo",
"PersistenceMode",
"PersistenceMode",
"Placement",
"Republish",
"RetentionPolicy",
"RetentionPolicy",
"Source",
"SourceInfo",
"StorageType",
"Stream",
"Stream",
"StreamConfig",
"StreamConfig",
"StreamInfo",
"StreamMessage",
"StreamState",
"SubjectTransform",
]
10 changes: 10 additions & 0 deletions src/exceptions/rust_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ pub enum NatsrpyError {
GetStreamError(#[from] async_nats::jetstream::context::GetStreamError),
#[error(transparent)]
StreamDirectGetError(#[from] async_nats::jetstream::stream::DirectGetError),
#[error(transparent)]
StreamInfoError(#[from] async_nats::jetstream::stream::InfoError),
#[error(transparent)]
StreamPurgeError(#[from] async_nats::jetstream::stream::PurgeError),
#[error(transparent)]
UnknownError(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
#[error(transparent)]
PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError),
#[error(transparent)]
PullConsumerError(#[from] async_nats::jetstream::stream::ConsumerError),
}

impl From<NatsrpyError> for pyo3::PyErr {
Expand Down
55 changes: 55 additions & 0 deletions src/js/consumers/_pull.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::sync::Arc;

use futures_util::StreamExt;
use pyo3::{Bound, PyAny, Python};
use tokio::sync::RwLock;

use crate::{exceptions::rust_err::NatsrpyResult, utils::natsrpy_future};

type NatsPullConsumer =
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>;

#[pyo3::pyclass(from_py_object)]
#[derive(Debug, Clone)]
pub struct PullConsumer {
consumer: Arc<RwLock<NatsPullConsumer>>,
}

#[pyo3::pyclass(from_py_object)]
#[derive(Debug, Clone)]
pub struct OrderedPullConsumer {
consumer: Arc<RwLock<NatsPullConsumer>>,
}

impl PullConsumer {
pub fn new(consumer: NatsPullConsumer) -> Self {
Self {
consumer: Arc::new(RwLock::new(consumer)),
}
}
}

#[pyo3::pyclass]
pub struct PullMessageIterator {
inner: Arc<RwLock<async_nats::jetstream::consumer::pull::Batch>>,
}

#[pyo3::pymethods]
impl PullConsumer {
pub fn messages<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
let consumer_lock = self.consumer.clone();
natsrpy_future(py, async move {
let mut messages = consumer_lock.read().await.messages().await.unwrap();
while let Some(message) = messages.next().await {
let msg = message?;
log::info!("{:#?}", msg.message.payload);
msg.ack().await?;
}

Ok(())
})
}
}

#[pyo3::pymethods]
impl PullMessageIterator {}
3 changes: 3 additions & 0 deletions src/js/consumers/_push.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#[pyo3::pyclass(from_py_object)]
#[derive(Debug, Clone)]
pub struct PushConsumer;
Loading
Loading