Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/source/api-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ API Reference
:undoc-members:
:members:

.. automodule:: vws.async_vws
:undoc-members:
:members:

.. automodule:: vws.async_query
:undoc-members:
:members:

.. automodule:: vws.async_vumark_service
:undoc-members:
:members:

.. automodule:: vws.reports
:undoc-members:
:members:
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ optional-dependencies.dev = [
"pyright==1.1.408",
"pyroma==5.0.1",
"pytest==9.0.2",
"pytest-asyncio==1.3.0",
"pytest-cov==7.0.0",
"pyyaml==6.0.3",
"ruff==0.15.2",
Expand Down Expand Up @@ -359,6 +360,7 @@ ignore_path = [
# but Vulture does not enable this.
ignore_names = [
# Public API classes imported by users from vws.transports
"AsyncHTTPXTransport",
"HTTPXTransport",
# pytest configuration
"pytest_collect_file",
Expand Down
2 changes: 2 additions & 0 deletions spelling_private_dict.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ admin
api
args
ascii
async
asyncio
beartype
bool
boolean
Expand Down
6 changes: 6 additions & 0 deletions src/vws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
"""A library for Vuforia Web Services."""

from .async_query import AsyncCloudRecoService
from .async_vumark_service import AsyncVuMarkService
from .async_vws import AsyncVWS
from .query import CloudRecoService
from .vumark_service import VuMarkService
from .vws import VWS

__all__ = [
"VWS",
"AsyncCloudRecoService",
"AsyncVWS",
"AsyncVuMarkService",
"CloudRecoService",
"VuMarkService",
]
77 changes: 77 additions & 0 deletions src/vws/_async_vws_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Internal helper for making authenticated async requests to the
Vuforia Target API.
"""

from beartype import BeartypeConf, beartype
from vws_auth_tools import authorization_header, rfc_1123_date

from vws.response import Response
from vws.transports import AsyncTransport


@beartype(conf=BeartypeConf(is_pep484_tower=True))
async def async_target_api_request(
*,
content_type: str,
server_access_key: str,
server_secret_key: str,
method: str,
data: bytes,
request_path: str,
base_vws_url: str,
request_timeout_seconds: float | tuple[float, float],
extra_headers: dict[str, str],
transport: AsyncTransport,
) -> Response:
"""Make an async request to the Vuforia Target API.

Args:
content_type: The content type of the request.
server_access_key: A VWS server access key.
server_secret_key: A VWS server secret key.
method: The HTTP method which will be used in the
request.
data: The request body which will be used in the
request.
request_path: The path to the endpoint which will be
used in the request.
base_vws_url: The base URL for the VWS API.
request_timeout_seconds: The timeout for the request.
This can be a float to set both the connect and
read timeouts, or a (connect, read) tuple.
extra_headers: Additional headers to include in the
request.
transport: The async HTTP transport to use for the
request.

Returns:
The response to the request.
"""
date_string = rfc_1123_date()

signature_string = authorization_header(
access_key=server_access_key,
secret_key=server_secret_key,
method=method,
content=data,
content_type=content_type,
date=date_string,
request_path=request_path,
)

headers = {
"Authorization": signature_string,
"Date": date_string,
"Content-Type": content_type,
**extra_headers,
}

url = base_vws_url.rstrip("/") + request_path

return await transport(
method=method,
url=url,
headers=headers,
data=data,
request_timeout=request_timeout_seconds,
)
226 changes: 226 additions & 0 deletions src/vws/async_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
"""Async tools for interacting with the Vuforia Cloud Recognition
Web APIs.
"""

import datetime
import json
from http import HTTPMethod, HTTPStatus
from typing import Any, Self

from beartype import BeartypeConf, beartype
from urllib3.filepost import encode_multipart_formdata
from vws_auth_tools import authorization_header, rfc_1123_date

from vws._image_utils import ImageType as _ImageType
from vws._image_utils import get_image_data as _get_image_data
from vws.exceptions.cloud_reco_exceptions import (
AuthenticationFailureError,
BadImageError,
InactiveProjectError,
MaxNumResultsOutOfRangeError,
RequestTimeTooSkewedError,
)
from vws.exceptions.custom_exceptions import (
RequestEntityTooLargeError,
ServerError,
)
from vws.include_target_data import CloudRecoIncludeTargetData
from vws.reports import QueryResult, TargetData
from vws.transports import AsyncHTTPXTransport, AsyncTransport


@beartype(conf=BeartypeConf(is_pep484_tower=True))
class AsyncCloudRecoService:
"""An async interface to the Vuforia Cloud Recognition Web
APIs.
"""

def __init__(
self,
*,
client_access_key: str,
client_secret_key: str,
base_vwq_url: str = "https://cloudreco.vuforia.com",
request_timeout_seconds: float | tuple[float, float] = 30.0,
transport: AsyncTransport | None = None,
) -> None:
"""
Args:
client_access_key: A VWS client access key.
client_secret_key: A VWS client secret key.
base_vwq_url: The base URL for the VWQ API.
request_timeout_seconds: The timeout for each
HTTP request. This can be a float to set both
the connect and read timeouts, or a
(connect, read) tuple.
transport: The async HTTP transport to use for
requests. Defaults to
``AsyncHTTPXTransport()``.
"""
self._client_access_key = client_access_key
self._client_secret_key = client_secret_key
self._base_vwq_url = base_vwq_url
self._request_timeout_seconds = request_timeout_seconds
self._transport = transport or AsyncHTTPXTransport()

async def aclose(self) -> None:
"""Close the underlying transport if it supports closing."""
close = getattr(self._transport, "aclose", None)
if close is not None:
await close()

async def __aenter__(self) -> Self:
"""Enter the async context manager."""
return self

async def __aexit__(self, *_args: object) -> None:
"""Exit the async context manager and close the transport."""
await self.aclose()

async def query(
self,
*,
image: _ImageType,
max_num_results: int = 1,
include_target_data: CloudRecoIncludeTargetData = (
CloudRecoIncludeTargetData.TOP
),
) -> list[QueryResult]:
"""Use the Vuforia Web Query API to make an Image
Recognition Query.

See
https://developer.vuforia.com/library/web-api/vuforia-query-web-api
for parameter details.

Args:
image: The image to make a query against.
max_num_results: The maximum number of matching
targets to be returned.
include_target_data: Indicates if target_data
records shall be returned for the matched
targets. Accepted values are top (default
value, only return target_data for top ranked
match), none (return no target_data), all
(for all matched targets).

Raises:
~vws.exceptions.cloud_reco_exceptions.AuthenticationFailureError:
The client access key pair is not correct.
~vws.exceptions.cloud_reco_exceptions.MaxNumResultsOutOfRangeError:
``max_num_results`` is not within the range (1, 50).
~vws.exceptions.cloud_reco_exceptions.InactiveProjectError: The
project is inactive.
~vws.exceptions.cloud_reco_exceptions.RequestTimeTooSkewedError:
There is an error with the time sent to Vuforia.
~vws.exceptions.cloud_reco_exceptions.BadImageError: There is a
problem with the given image. For example, it must be a JPEG or
PNG file in the grayscale or RGB color space.
~vws.exceptions.custom_exceptions.RequestEntityTooLargeError: The
given image is too large.
~vws.exceptions.custom_exceptions.ServerError: There is an
error with Vuforia's servers.

Returns:
An ordered list of target details of matching
targets.
"""
image_content = _get_image_data(image=image)
body: dict[str, Any] = {
"image": (
"image.jpeg",
image_content,
"image/jpeg",
),
"max_num_results": (
None,
int(max_num_results),
"text/plain",
),
"include_target_data": (
None,
include_target_data.value,
"text/plain",
),
}
date = rfc_1123_date()
request_path = "/v1/query"
content, content_type_header = encode_multipart_formdata(fields=body)
method = HTTPMethod.POST

authorization_string = authorization_header(
access_key=self._client_access_key,
secret_key=self._client_secret_key,
method=method,
content=content,
# Note that this is not the actual Content-Type
# header value sent.
content_type="multipart/form-data",
date=date,
request_path=request_path,
)

headers = {
"Authorization": authorization_string,
"Date": date,
"Content-Type": content_type_header,
}

response = await self._transport(
method=method,
url=self._base_vwq_url.rstrip("/") + request_path,
headers=headers,
data=content,
request_timeout=self._request_timeout_seconds,
)

if response.status_code == HTTPStatus.REQUEST_ENTITY_TOO_LARGE:
raise RequestEntityTooLargeError(response=response)

if "Integer out of range" in response.text:
raise MaxNumResultsOutOfRangeError(
response=response,
)

if (
response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR
): # pragma: no cover
raise ServerError(response=response)

result_code = json.loads(s=response.text)["result_code"]
if result_code != "Success":
exception = {
"AuthenticationFailure": (AuthenticationFailureError),
"BadImage": BadImageError,
"InactiveProject": InactiveProjectError,
"RequestTimeTooSkewed": (RequestTimeTooSkewedError),
}[result_code]
raise exception(response=response)

result: list[QueryResult] = []
result_list = list(
json.loads(s=response.text)["results"],
)
for item in result_list:
target_data: TargetData | None = None
if "target_data" in item:
target_data_dict = item["target_data"]
metadata = target_data_dict["application_metadata"]
timestamp_string = target_data_dict["target_timestamp"]
target_timestamp = datetime.datetime.fromtimestamp(
timestamp=timestamp_string,
tz=datetime.UTC,
)
target_data = TargetData(
name=target_data_dict["name"],
application_metadata=metadata,
target_timestamp=target_timestamp,
)

query_result = QueryResult(
target_id=item["target_id"],
target_data=target_data,
)

result.append(query_result)
return result
Loading