Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
2d5bfdf
first step
slampy97 Oct 22, 2025
96c0ff1
also driver
slampy97 Oct 22, 2025
a4a292e
working coord client plus not working lock/ conenction problemsg
slampy97 Oct 22, 2025
d9de575
add wrappers, simple test is working, but problem to got structure back
slampy97 Oct 29, 2025
7701a05
several fixes plus wrappers
slampy97 Oct 29, 2025
82669fc
delete path from pr
slampy97 Oct 29, 2025
9fd7073
fix add gitmodules
slampy97 Oct 29, 2025
44a20a6
add alter node
slampy97 Oct 29, 2025
d749eb0
fix
slampy97 Oct 29, 2025
a5a36f6
fix config
slampy97 Oct 29, 2025
53ebed9
erase wrappers
slampy97 Oct 29, 2025
678598b
fix public wrappers
slampy97 Oct 31, 2025
1a6e166
fix last drivers coordination client inclusion
slampy97 Oct 31, 2025
aa8eaf9
fix tox -e style
slampy97 Oct 31, 2025
bd96246
tox -e black-format
slampy97 Oct 31, 2025
c3324cf
fix review remarks
slampy97 Nov 4, 2025
41323e6
fix review remarks
slampy97 Nov 4, 2025
65edbea
fix flake8 mistakes
slampy97 Nov 4, 2025
d2d25d4
fix review remarks plus styles checks
slampy97 Nov 5, 2025
9850b78
working async client, session_init + ping pong + lock release/acquire
slampy97 Nov 17, 2025
3f55d20
working async client, session_init + ping pong + lock release/acquire…
slampy97 Nov 17, 2025
89fe18d
some fix / redundant logic
slampy97 Nov 17, 2025
31e4fb6
fix lock logic
slampy97 Nov 17, 2025
194942f
refactor logic lock + reconnecor + stream - > lock should be resosibl…
slampy97 Nov 17, 2025
a81db7e
add describe -> start making crud lock object
slampy97 Nov 18, 2025
8defd59
crud lock object + acquire and release.
slampy97 Nov 18, 2025
53d547b
Merge branch 'coordination-service-impementation' into coordination-lock
slampy97 Nov 18, 2025
7d78387
add style checking
slampy97 Nov 18, 2025
4f66aad
fix linter
slampy97 Nov 18, 2025
45fc213
Add public wrappers for describe lock result
slampy97 Nov 18, 2025
e7a4357
rewrite stream with wrapper plus sync client
slampy97 Nov 25, 2025
c16e51e
first working version of sync and asunc client with lock api
slampy97 Nov 25, 2025
ae7f76c
refactor + reformat
slampy97 Nov 25, 2025
44a73fe
refactor + reformat
slampy97 Nov 25, 2025
82ea901
fixing format
slampy97 Nov 25, 2025
6f70b24
add NodeConfig as it was before
slampy97 Nov 25, 2025
134982a
simplify lock classes
slampy97 Nov 25, 2025
29d74b8
simplify lock classes
slampy97 Nov 25, 2025
4742860
fix common_utils.py + simplify stream.py
slampy97 Nov 26, 2025
202e4b5
simplify stream.py + correct lock.py
slampy97 Nov 26, 2025
b1a1849
fix lock delete method
slampy97 Nov 26, 2025
633f2d4
fix acquire method
slampy97 Nov 26, 2025
52f0874
get red of req_id logic and loop on every call
slampy97 Dec 10, 2025
35da8a6
test refactor, making ez for read
slampy97 Dec 10, 2025
1cafbff
test refactor, making ez for read
slampy97 Dec 10, 2025
68301b6
style refactor
slampy97 Dec 10, 2025
410c655
prep for rebase
slampy97 Dec 10, 2025
75b7b9a
Merge branch 'main' into coordination-lock
slampy97 Dec 10, 2025
ef5acef
apply req_id plus thin lock client
slampy97 Dec 11, 2025
3096a1a
simplify lock + reconnector
slampy97 Dec 11, 2025
edfc5f2
simplify lock + reconnector
slampy97 Dec 11, 2025
b8bcdb4
simplify lock + reconnector
slampy97 Dec 11, 2025
ee2c2fa
fix recconector
slampy97 Dec 11, 2025
80cd1fd
simplify reconnector.py
slampy97 Dec 11, 2025
5a4fb8c
simplify reconnector.py
slampy97 Dec 11, 2025
aaec556
simplify reconnector.py
slampy97 Dec 11, 2025
05b2c10
simplify reconnector.py
slampy97 Dec 11, 2025
bc389e0
fix style
slampy97 Dec 11, 2025
9e0ec86
fix style
slampy97 Dec 11, 2025
2dafcab
all tests passed - first time. Reconnector recconect session stream
slampy97 Dec 11, 2025
b2e4885
all tests passed - first time. Reconnector recconect session stream
slampy97 Dec 12, 2025
64491fa
all tests passed - first time. Reconnector recconect session stream
slampy97 Dec 12, 2025
fd9f4f1
all tsts passed and zero pending tasks, finally no errors
slampy97 Dec 12, 2025
f63464c
fix styles
slampy97 Dec 12, 2025
d29c9bc
fix timeout
slampy97 Dec 12, 2025
ced951f
fix test for cli
slampy97 Dec 12, 2025
df0b81e
last fixes
slampy97 Dec 12, 2025
f238d0b
last fixes + style fixing
slampy97 Dec 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 227 additions & 38 deletions tests/coordination/test_coordination_client.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,80 @@
import asyncio
import threading
import time

import pytest

import ydb
from ydb.aio.coordination import CoordinationClient as AioCoordinationClient
from ydb import StatusCode, logger

from ydb.coordination import (
NodeConfig,
ConsistencyMode,
RateLimiterCountersMode,
CoordinationClient,
CreateSemaphoreResult,
DescribeLockResult,
)


class TestCoordination:
def test_coordination_node_lifecycle(self, driver_sync: ydb.Driver):
client = CoordinationClient(driver_sync)
node_path = "/local/test_node_lifecycle"
@pytest.fixture
def sync_coordination_node(driver_sync):
client = CoordinationClient(driver_sync)
node_path = "/local/test_node"

try:
client.delete_node(node_path)
except ydb.SchemeError:
pass
try:
client.delete_node(node_path)
except ydb.SchemeError:
pass

with pytest.raises(ydb.SchemeError):
client.describe_node(node_path)
config = NodeConfig(
session_grace_period_millis=1000,
attach_consistency_mode=ConsistencyMode.STRICT,
read_consistency_mode=ConsistencyMode.STRICT,
rate_limiter_counters_mode=RateLimiterCountersMode.UNSET,
self_check_period_millis=0,
)
client.create_node(node_path, config)

yield client, node_path, config

try:
client.delete_node(node_path)
except ydb.SchemeError:
pass


@pytest.fixture
async def async_coordination_node(aio_connection):
client = AioCoordinationClient(aio_connection)
node_path = "/local/test_node"

try:
await client.delete_node(node_path)
except ydb.SchemeError:
pass

config = NodeConfig(
session_grace_period_millis=1000,
attach_consistency_mode=ConsistencyMode.STRICT,
read_consistency_mode=ConsistencyMode.STRICT,
rate_limiter_counters_mode=RateLimiterCountersMode.UNSET,
self_check_period_millis=0,
)
await client.create_node(node_path, config)

yield client, node_path, config

try:
await client.delete_node(node_path)
except ydb.SchemeError:
pass

initial_config = NodeConfig(
session_grace_period_millis=1000,
attach_consistency_mode=ConsistencyMode.STRICT,
read_consistency_mode=ConsistencyMode.STRICT,
rate_limiter_counters_mode=RateLimiterCountersMode.UNSET,
self_check_period_millis=0,
)
client.create_node(node_path, initial_config)

class TestCoordination:
def test_coordination_node_lifecycle(self, sync_coordination_node):
client, node_path, initial_config = sync_coordination_node

node_conf = client.describe_node(node_path)
assert node_conf == initial_config
Expand All @@ -53,26 +96,8 @@ def test_coordination_node_lifecycle(self, driver_sync: ydb.Driver):
with pytest.raises(ydb.SchemeError):
client.describe_node(node_path)

async def test_coordination_node_lifecycle_async(self, aio_connection):
client = AioCoordinationClient(aio_connection)
node_path = "/local/test_node_lifecycle"

try:
await client.delete_node(node_path)
except ydb.SchemeError:
pass

with pytest.raises(ydb.SchemeError):
await client.describe_node(node_path)

initial_config = NodeConfig(
session_grace_period_millis=1000,
attach_consistency_mode=ConsistencyMode.STRICT,
read_consistency_mode=ConsistencyMode.STRICT,
rate_limiter_counters_mode=RateLimiterCountersMode.UNSET,
self_check_period_millis=0,
)
await client.create_node(node_path, initial_config)
async def test_coordination_node_lifecycle_async(self, async_coordination_node):
client, node_path, initial_config = async_coordination_node

node_conf = await client.describe_node(node_path)
assert node_conf == initial_config
Expand All @@ -93,3 +118,167 @@ async def test_coordination_node_lifecycle_async(self, aio_connection):

with pytest.raises(ydb.SchemeError):
await client.describe_node(node_path)

async def test_coordination_lock_describe_full_async(self, async_coordination_node):
client, node_path, config = async_coordination_node

lock = client.lock("test_lock", node_path)

create = await lock.create(init_limit=1, init_data=b"hello")
assert create.status == StatusCode.SUCCESS

desc = await lock.describe()
assert desc.status == StatusCode.SUCCESS
assert desc.name == "test_lock"
assert desc.data == b"hello"
assert desc.count == 0
assert desc.ephemeral is False
assert list(desc.owners) == []
assert list(desc.waiters) == []

upd = await lock.update(new_data=b"world")
assert upd.status == StatusCode.SUCCESS

desc2 = await lock.describe()
assert desc2.status == StatusCode.SUCCESS
assert desc2.name == "test_lock"
assert desc2.data == b"world"
assert desc2.count == 0
assert desc2.ephemeral is False
assert list(desc2.owners) == []
assert list(desc2.waiters) == []

delete = await lock.delete()
assert delete.status == StatusCode.SUCCESS

desc_after = await lock.describe()
assert desc_after.status == StatusCode.NOT_FOUND

def test_coordination_lock_describe_full_sync(self, sync_coordination_node):
client, node_path, config = sync_coordination_node

lock = client.lock("test_lock", node_path)

create = lock.create(init_limit=1, init_data=b"hello")
assert create.status == StatusCode.SUCCESS

desc = lock.describe()
assert desc.status == StatusCode.SUCCESS
assert desc.name == "test_lock"
assert desc.data == b"hello"
assert desc.count == 0
assert desc.ephemeral is False
assert list(desc.owners) == []
assert list(desc.waiters) == []
upd = lock.update(new_data=b"world")
assert upd.status == StatusCode.SUCCESS

desc2 = lock.describe()
assert desc2.status == StatusCode.SUCCESS
assert desc2.name == "test_lock"
assert desc2.data == b"world"
assert desc2.count == 0
assert desc2.ephemeral is False
assert list(desc2.owners) == []
assert list(desc2.waiters) == []

delete = lock.delete()
assert delete.status == StatusCode.SUCCESS

desc_after = lock.describe()
assert desc_after.status == StatusCode.NOT_FOUND

async def test_coordination_lock_racing_async(self, async_coordination_node):
client, node_path, initial_config = async_coordination_node
timeout = 5

lock = client.lock("test_lock", node_path)
await lock.create(init_limit=1, init_data=b"init-data")

describe_resp: DescribeLockResult = await lock.describe()
assert describe_resp.status == StatusCode.SUCCESS

lock2_started = asyncio.Event()
lock2_acquired = asyncio.Event()
lock2_release = asyncio.Event()

async def second_lock_task():
lock2_started.set()
async with client.lock("test_lock", node_path):
lock2_acquired.set()
await lock2_release.wait()

async with client.lock("test_lock", node_path) as lock1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

очень сложно это все читать. давай будем пользоваться методами acquire и release, а также не будем делать этот километровый дескрайб

resp: DescribeLockResult = await lock1.describe()
assert resp.status == StatusCode.SUCCESS

t2 = asyncio.create_task(second_lock_task())
await asyncio.wait_for(lock2_started.wait(), timeout=timeout)

await asyncio.sleep(0)

await asyncio.wait_for(lock2_acquired.wait(), timeout=timeout)

lock2_release.set()
await asyncio.wait_for(t2, timeout=timeout)

delete_resp = await lock.delete()
assert delete_resp.status == StatusCode.SUCCESS

describe_after_delete: DescribeLockResult = await lock.describe()
assert describe_after_delete.status == StatusCode.NOT_FOUND

def test_coordination_lock_racing_sync(self, sync_coordination_node):
client, node_path, initial_config = sync_coordination_node
small_timeout = 1

lock = client.lock("test_lock", node_path)

create_resp: CreateSemaphoreResult = lock.create(init_limit=1, init_data=b"init-data")
assert create_resp.status == StatusCode.SUCCESS

describe_resp: DescribeLockResult = lock.describe()
assert describe_resp.status == StatusCode.SUCCESS

lock2_ready = threading.Event()
lock2_acquired = threading.Event()

def second_lock_task():
try:
lock2_ready.set()
with client.lock("test_lock", node_path):
lock2_acquired.set()
logger.info("Second thread acquired lock")
except Exception as e:
logger.exception(f"{e} | second_lock_task failed")

t2 = threading.Thread(target=second_lock_task)

with client.lock("test_lock", node_path) as lock1:
resp = lock1.describe()
assert resp.status == StatusCode.SUCCESS
t2.start()
lock2_ready.wait(timeout=small_timeout)

lock2_acquired.wait(timeout=small_timeout)
t2.join(timeout=small_timeout)

delete_resp = lock.delete()
assert delete_resp.status == StatusCode.SUCCESS
time.sleep(small_timeout)
describe_after_delete: DescribeLockResult = lock.describe()
assert describe_after_delete.status == StatusCode.NOT_FOUND

async def test_coordination_reconnect_async(self, async_coordination_node):
client, node_path, config = async_coordination_node

lock = client.lock("test_lock", node_path)

res = await lock.create(init_limit=1, init_data=b"")
assert res.status == StatusCode.SUCCESS

await lock._reconnector._stream.close()

desc = await lock.describe()
assert desc.status == StatusCode.SUCCESS
assert desc.name == "test_lock"
4 changes: 2 additions & 2 deletions ydb/_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ class QueryService(object):

class CoordinationService(object):
Stub = ydb_coordination_v1_pb2_grpc.CoordinationServiceStub

Session = "Session"
CreateNode = "CreateNode"
AlterNode = "AlterNode"
DropNode = "DropNode"
DescribeNode = "DescribeNode"
SessionRequest = "SessionRequest"
Session = "Session"
5 changes: 3 additions & 2 deletions ydb/_grpc/grpcwrapper/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def _start_sync_driver(self, driver: Driver, stub, method):
self._stream_call = stream_call
self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor)

async def receive(self, timeout: Optional[int] = None) -> Any:
async def receive(self, timeout: Optional[int] = None, is_coordination_calls=False) -> Any:
# todo handle grpc exceptions and convert it to internal exceptions
try:
if timeout is None:
Expand All @@ -235,7 +235,8 @@ async def get_response():
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
raise connection._rpc_error_handler(self._connection_state, e)

issues._process_response(grpc_message)
if not is_coordination_calls:
issues._process_response(grpc_message)

if self._connection_state != "has_received_messages":
self._connection_state = "has_received_messages"
Expand Down
Loading
Loading