Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,28 @@ jobs:
- name: Setup environment & install dependencies
run: make setup

- name: Run unit tests
run: make test

- name: Install wick
run: sudo snap install wick --classic

- name: Run Integration tests
run: |
make install-nxt
nxt start -c tests/ &
timeout 30 bash -c 'until wick --url ws://localhost:8079/ws publish test; do sleep 1; done'
make integration

- name: Setup AAT
run: |
git clone https://github.com/xconnio/xconn-aat-setup.git
cd xconn-aat-setup
make up
sudo snap install wick --classic
timeout 30 bash -c 'until wick --url ws://localhost:8081/ws publish test; do sleep 1; done'

- name: Run tests
run: make test
- name: Run AAT tests
run: make aat

ruff:
runs-on: ubuntu-latest
Expand Down
15 changes: 14 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ lint:
./.venv/bin/ruff check .

test:
./.venv/bin/pytest -s -v
./.venv/bin/pytest -s -v tests/unit

run:
./.venv/bin/xconn example:app --directory examples/simple
Expand All @@ -37,3 +37,16 @@ build-docs:

clean-docs:
rm -rf site/

install-nxt:
@if ! command -v nxt >/dev/null 2>&1; then \
sudo snap install nxt-router --classic --edge; \
fi


integration:
make install-nxt
./.venv/bin/pytest -s -v tests/integration/

aat:
./.venv/bin/pytest -s -v tests/aat/
55 changes: 55 additions & 0 deletions tests/.nxt/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
version: '1'

config:
loglevel: debug
management: true

realms:
- name: realm1
roles:
- name: anonymous
permissions:
- uri: ""
match: prefix
allow_call: true
allow_register: true
allow_publish: true
allow_subscribe: true

transports:
- type: websocket
listener: tcp
address: localhost:8079
serializers:
- json
- cbor
- msgpack

authenticators:
cryptosign:
- authid: john
realm: realm1
role: anonymous
authorized_keys:
- 20e6ff0eb2552204fac19a15a61da586e437abd64a545bedce61a89b48184fcb

wampcra:
- authid: john
realm: realm1
role: anonymous
secret: hello

ticket:
- authid: john
realm: realm1
role: anonymous
ticket: hello

anonymous:
- authid: john
realm: realm1
role: anonymous

- authid: john
realm: io.xconn.mgmt
role: anonymous
Empty file added tests/aat/__init__.py
Empty file.
File renamed without changes.
File renamed without changes.
Empty file added tests/integration/__init__.py
Empty file.
70 changes: 70 additions & 0 deletions tests/integration/async_pubsub_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os
import asyncio
import hashlib

import pytest
from wampproto import serializers

from xconn import AsyncClient
from xconn.types import Event
from xconn.async_client import connect_anonymous


async def test_pubsub():
client1 = await connect_anonymous("ws://localhost:8079/ws", "realm1")
client2 = await connect_anonymous("ws://localhost:8079/ws", "realm1")

args = ["client1", "client2"]

async def event_handler_with_args(event: Event):
assert event.args == args
assert event.kwargs is None

args_subscription = await client1.subscribe("io.xconn.pubsub.args", event_handler_with_args)
await client2.publish("io.xconn.pubsub.args", args, options={"acknowledge": True})
await args_subscription.unsubscribe()

kwargs = {"foo": "bar", "baz": {"k": "v"}}

async def event_handler_with_kwargs(event: Event):
assert event.args == []
assert event.kwargs == kwargs

subscription = await client1.subscribe("io.xconn.pubsub.kwargs", event_handler_with_kwargs)
await client2.publish("io.xconn.pubsub.kwargs", kwargs=kwargs, options={"acknowledge": True})

await subscription.unsubscribe()

await client2.leave()
await client1.leave()


@pytest.mark.parametrize("serializer", [serializers.CBORSerializer(), serializers.MsgPackSerializer()])
async def test_pubsub_with_various_data(serializer: serializers.Serializer):
async_client = AsyncClient(serializer=serializer)
client1 = await async_client.connect("ws://localhost:8079/ws", "realm1")
async_client2 = AsyncClient(serializer=serializer)
client2 = await async_client2.connect("ws://localhost:8079/ws", "realm1")

async def event_handler(inv: Event):
payload: bytes = inv.kwargs["payload"]
checksum: bytes = inv.kwargs["checksum"]

calculated_checksum = hashlib.sha256(payload).digest()
assert calculated_checksum == checksum, f"Checksum mismatch! got {calculated_checksum}, expected {checksum}"

await client1.subscribe("io.xconn.pubsub.event_handler", event_handler)

async def send_payload(size_bytes: int):
payload = os.urandom(size_bytes)
checksum = hashlib.sha256(payload).digest()

await client2.publish("io.xconn.pubsub.event_handler", kwargs={"payload": payload, "checksum": checksum})

# test call with different payload sizes
sizes = [1024 * n for n in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1023]]

await asyncio.gather(*(send_payload(size) for size in sizes))

await client1.leave()
await client2.leave()
76 changes: 76 additions & 0 deletions tests/integration/async_rpc_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os
import asyncio
import hashlib

import pytest
from wampproto import serializers

from xconn import AsyncClient
from xconn.types import Invocation
from xconn.async_client import connect_anonymous
from xconn.exception import ApplicationError


async def test_rpc():
client1 = await connect_anonymous("ws://localhost:8079/ws", "realm1")
client2 = await connect_anonymous("ws://localhost:8079/ws", "realm1")

args = ["client1", "client2"]

async def inv_handler_with_args(inv: Invocation):
assert inv.args == args
assert inv.kwargs is None

args_registration = await client1.register("io.xconn.rpc.args", inv_handler_with_args)
await client2.call("io.xconn.rpc.args", args)
await args_registration.unregister()

with pytest.raises(ApplicationError, match="wamp.error.no_such_procedure"):
await client2.call("io.xconn.rpc.args", args)

kwargs = {"foo": "bar", "baz": {"k": "v"}}

async def inv_handler_with_kwargs(inv: Invocation):
assert inv.args == []
assert inv.kwargs == kwargs

registration = await client1.register("io.xconn.rpc.kwargs", inv_handler_with_kwargs)
await client2.call("io.xconn.rpc.kwargs", kwargs=kwargs)

await registration.unregister()

await client2.leave()
await client1.leave()


@pytest.mark.parametrize(
"serializer", [serializers.CBORSerializer(), serializers.MsgPackSerializer()]
)
async def test_rpc_with_various_data(serializer: serializers.Serializer):
async_client = AsyncClient(serializer=serializer)
client1 = await async_client.connect("ws://localhost:8079/ws", "realm1")
async_client2 = AsyncClient(serializer=serializer)
client2 = await async_client2.connect("ws://localhost:8079/ws", "realm1")

async def inv_handler(inv: Invocation):
payload: bytes = inv.kwargs["payload"]
checksum: bytes = inv.kwargs["checksum"]

calculated_checksum = hashlib.sha256(payload).digest()
assert calculated_checksum == checksum, f"Checksum mismatch! got {calculated_checksum}, expected {checksum}"

await client1.register("io.xconn.rpc.inv_handler", inv_handler)

async def send_payload(size_bytes: int):
payload = os.urandom(size_bytes)
checksum = hashlib.sha256(payload).digest()

await client2.call("io.xconn.rpc.inv_handler", kwargs={"payload": payload, "checksum": checksum})

# test call with different payload sizes
sizes = [1024 * n for n in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1023]]

await asyncio.gather(*(send_payload(size) for size in sizes))

await client1.leave()
await client2.leave()
73 changes: 73 additions & 0 deletions tests/integration/pubsub_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import concurrent
import os
import hashlib
from concurrent.futures import ThreadPoolExecutor

import pytest
from wampproto import serializers

from xconn import Client
from xconn.types import Event
from xconn.client import connect_anonymous


def test_pubsub():
client1 = connect_anonymous("ws://localhost:8079/ws", "realm1")
client2 = connect_anonymous("ws://localhost:8079/ws", "realm1")

args = ["client1", "client2"]

def event_handler_with_args(event: Event):
assert event.args == args
assert event.kwargs is None

args_subscription = client1.subscribe("io.xconn.pubsub.args", event_handler_with_args)
client2.publish("io.xconn.pubsub.args", args, options={"acknowledge": True})
args_subscription.unsubscribe()

kwargs = {"foo": "bar", "baz": {"k": "v"}}

def event_handler_with_kwargs(event: Event):
assert event.args == []
assert event.kwargs == kwargs

registration = client1.subscribe("io.xconn.pubsub.kwargs", event_handler_with_kwargs)
client2.publish("io.xconn.pubsub.kwargs", kwargs=kwargs, options={"acknowledge": True})

registration.unsubscribe()

client2.leave()
client1.leave()


@pytest.mark.parametrize("serializer", [serializers.CBORSerializer(), serializers.MsgPackSerializer()])
def test_pubsub_with_various_data(serializer: serializers.Serializer):
client1 = Client(serializer=serializer).connect("ws://localhost:8079/ws", "realm1")
client2 = Client(serializer=serializer).connect("ws://localhost:8079/ws", "realm1")

def event_handler(inv: Event):
payload: bytes = inv.kwargs["payload"]
checksum: bytes = inv.kwargs["checksum"]

calculated_checksum = hashlib.sha256(payload).digest()
assert calculated_checksum == checksum, f"Checksum mismatch! got {calculated_checksum}, expected {checksum}"

client1.subscribe("io.xconn.pubsub.event_handler", event_handler)

def send_payload(size_bytes: int):
payload = os.urandom(size_bytes)
checksum = hashlib.sha256(payload).digest()

client2.publish("io.xconn.pubsub.event_handler", kwargs={"payload": payload, "checksum": checksum})

# test call with different payload sizes
sizes = [1024 * n for n in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1023]]

with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(send_payload, size) for size in sizes]

for future in concurrent.futures.as_completed(futures):
future.result()

client1.leave()
client2.leave()
Loading