Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 84 additions & 98 deletions tests/mock_vws/test_respx_mock_usage.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
"""Tests for ``MockVWS`` intercepting ``httpx`` via asynchronous ``vws``
"""Tests for ``MockVWS`` intercepting ``httpx`` via synchronous ``vws``
clients.
"""

import asyncio
import io
import uuid

import httpx
import pytest
from vws import AsyncCloudRecoService, AsyncVuMarkService, AsyncVWS
from vws import VWS, CloudRecoService, VuMarkService
from vws.exceptions.vws_exceptions import UnknownTargetError
from vws.reports import TargetStatuses
from vws.transports import HTTPXTransport
from vws.vumark_accept import VuMarkAccept

from mock_vws import MockVWS
Expand All @@ -19,157 +19,143 @@
from mock_vws.target import VuMarkTarget


class TestAsyncVWS:
"""Asynchronous ``vws-python`` client usage through the mock."""
class TestVWS:
"""Synchronous ``vws-python`` client usage through the mock via
``httpx``.
"""

@staticmethod
def test_response_delay_causes_httpx_timeout() -> None:
"""``httpx`` timeouts are surfaced through ``AsyncVWS``."""
"""``httpx`` timeouts are surfaced through ``VWS``."""
database = CloudDatabase()
calls: list[float] = []

async def run_test() -> None:
"""Trigger a timed request through the client."""
async with AsyncVWS(
server_access_key=database.server_access_key,
server_secret_key=database.server_secret_key,
request_timeout_seconds=0.1,
) as client:
await client.get_database_summary_report()

with MockVWS(
response_delay_seconds=5.0,
sleep_fn=calls.append,
processing_time_seconds=0,
) as mock:
mock.add_cloud_database(cloud_database=database)
client = VWS(
server_access_key=database.server_access_key,
server_secret_key=database.server_secret_key,
request_timeout_seconds=0.1,
transport=HTTPXTransport(),
)
with pytest.raises(expected_exception=httpx.ReadTimeout):
asyncio.run(run_test())
client.get_database_summary_report()

assert calls == [0.1]

@staticmethod
def test_custom_base_vws_url_with_path_prefix() -> None:
"""``AsyncVWS`` works with a custom VWS base URL path prefix."""
"""``VWS`` works with a custom VWS base URL path prefix."""
database = CloudDatabase()
base_vws_url = "https://vuforia.vws.example.com/prefix"

async def run_test() -> str:
"""Return the database name via the custom base URL."""
async with AsyncVWS(
with MockVWS(base_vws_url=base_vws_url) as mock:
mock.add_cloud_database(cloud_database=database)
client = VWS(
server_access_key=database.server_access_key,
server_secret_key=database.server_secret_key,
base_vws_url=base_vws_url,
) as client:
report = await client.get_database_summary_report()
return report.name

with MockVWS(base_vws_url=base_vws_url) as mock:
mock.add_cloud_database(cloud_database=database)
database_name = asyncio.run(run_test())
transport=HTTPXTransport(),
)
report = client.get_database_summary_report()
database_name = report.name

assert database_name == database.database_name

@staticmethod
def test_add_get_and_delete_target(
image_file_success_state_low_rating: io.BytesIO,
) -> None:
"""A target life cycle works through ``AsyncVWS``."""
"""A target life cycle works through ``VWS``."""
database = CloudDatabase()
target_name = "async-target"

async def run_test() -> None:
"""Exercise the target life cycle."""
async with AsyncVWS(
with MockVWS(processing_time_seconds=0) as mock:
mock.add_cloud_database(cloud_database=database)
client = VWS(
server_access_key=database.server_access_key,
server_secret_key=database.server_secret_key,
) as client:
target_id = await client.add_target(
name=target_name,
width=1,
image=image_file_success_state_low_rating,
application_metadata=None,
active_flag=True,
)
await client.wait_for_target_processed(target_id=target_id)
target_record = await client.get_target_record(
target_id=target_id,
)
assert target_record.status == TargetStatuses.SUCCESS
assert target_record.target_record.name == target_name

await client.delete_target(target_id=target_id)

with pytest.raises(expected_exception=UnknownTargetError):
await client.get_target_record(target_id=target_id)
transport=HTTPXTransport(),
)
target_id = client.add_target(
name=target_name,
width=1,
image=image_file_success_state_low_rating,
application_metadata=None,
active_flag=True,
)
client.wait_for_target_processed(target_id=target_id)
target_record = client.get_target_record(target_id=target_id)
assert target_record.status == TargetStatuses.SUCCESS
assert target_record.target_record.name == target_name

with MockVWS(processing_time_seconds=0) as mock:
mock.add_cloud_database(cloud_database=database)
asyncio.run(run_test())
client.delete_target(target_id=target_id)

with pytest.raises(expected_exception=UnknownTargetError):
client.get_target_record(target_id=target_id)

class TestAsyncCloudRecoService:
"""Asynchronous cloud query usage through the mock."""

class TestCloudRecoService:
"""Synchronous cloud query usage through the mock via ``httpx``."""

@staticmethod
def test_query_returns_match(high_quality_image: io.BytesIO) -> None:
"""``AsyncCloudRecoService`` returns a match via the mock."""
"""``CloudRecoService`` returns a match via the mock."""
database = CloudDatabase()

async def run_test() -> None:
"""Add a target and query it using the clients."""
async with (
AsyncVWS(
server_access_key=database.server_access_key,
server_secret_key=database.server_secret_key,
) as vws_client,
AsyncCloudRecoService(
client_access_key=database.client_access_key,
client_secret_key=database.client_secret_key,
) as query_client,
):
target_id = await vws_client.add_target(
name="query-target",
width=1,
image=high_quality_image,
application_metadata=None,
active_flag=True,
)
await vws_client.wait_for_target_processed(target_id=target_id)
results = await query_client.query(image=high_quality_image)
assert [result.target_id for result in results] == [target_id]

with MockVWS(
processing_time_seconds=0,
query_match_checker=ExactMatcher(),
) as mock:
mock.add_cloud_database(cloud_database=database)
asyncio.run(run_test())


class TestAsyncVuMarkService:
"""Asynchronous VuMark generation usage through the mock."""
vws_client = VWS(
server_access_key=database.server_access_key,
server_secret_key=database.server_secret_key,
transport=HTTPXTransport(),
)
query_client = CloudRecoService(
client_access_key=database.client_access_key,
client_secret_key=database.client_secret_key,
transport=HTTPXTransport(),
)
target_id = vws_client.add_target(
name="query-target",
width=1,
image=high_quality_image,
application_metadata=None,
active_flag=True,
)
vws_client.wait_for_target_processed(target_id=target_id)
results = query_client.query(image=high_quality_image)
assert [result.target_id for result in results] == [target_id]


class TestVuMarkService:
"""Synchronous VuMark generation usage through the mock via
``httpx``.
"""

@staticmethod
def test_generate_vumark_instance_returns_png_bytes() -> None:
"""``AsyncVuMarkService`` returns VuMark image bytes."""
"""``VuMarkService`` returns VuMark image bytes."""
vumark_target = VuMarkTarget(name="test-target")
database = VuMarkDatabase(vumark_targets={vumark_target})

async def run_test() -> bytes:
"""Generate a VuMark instance image and return its bytes."""
async with AsyncVuMarkService(
server_access_key=database.server_access_key,
server_secret_key=database.server_secret_key,
) as client:
return await client.generate_vumark_instance(
target_id=vumark_target.target_id,
instance_id=uuid.uuid4().hex,
accept=VuMarkAccept.PNG,
)

with MockVWS() as mock:
mock.add_vumark_database(vumark_database=database)
response_content = asyncio.run(run_test())
client = VuMarkService(
server_access_key=database.server_access_key,
server_secret_key=database.server_secret_key,
transport=HTTPXTransport(),
)
response_content = client.generate_vumark_instance(
target_id=vumark_target.target_id,
instance_id=uuid.uuid4().hex,
accept=VuMarkAccept.PNG,
)

assert response_content.startswith(b"\x89PNG")