Skip to content

Commit cd6e70f

Browse files
Abel Milashclaude
andcommitted
Add AsyncFetchXmlQuery, AsyncQueryBuilder, and wire builder()/fetchxml() on AsyncQueryOperations
- New aio/models/async_fetchxml_query.py: async paging with cookie parsing, max-pages guard, URL-length guard - New aio/models/async_query_builder.py: async execute()/execute_pages() over QueryBuilder fluent interface - aio/operations/async_query.py: add builder() and fetchxml() factory methods with full validation - tests/unit/aio/test_async_query.py: 14 new tests covering builder, fetchxml, execute, execute_pages, and all error paths Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 33d4a08 commit cd6e70f

5 files changed

Lines changed: 558 additions & 5 deletions

File tree

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT license.
3+
4+
__all__ = []
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT license.
3+
4+
"""AsyncFetchXmlQuery — inert async query object returned by AsyncQueryOperations.fetchxml()."""
5+
6+
from __future__ import annotations
7+
8+
import warnings
9+
import xml.etree.ElementTree as _ET
10+
from typing import AsyncIterator, List, TYPE_CHECKING
11+
from urllib.parse import unquote as _url_unquote, quote as _url_quote
12+
13+
from ...core.errors import ValidationError
14+
from ...models.fetchxml_query import _MAX_URL_LENGTH, _MAX_PAGES, _PREFER_HEADER
15+
from ...models.record import QueryResult, Record
16+
17+
if TYPE_CHECKING:
18+
from ..async_client import AsyncDataverseClient
19+
20+
21+
__all__ = ["AsyncFetchXmlQuery"]
22+
23+
24+
class AsyncFetchXmlQuery:
25+
"""Inert async FetchXML query object. No HTTP request is made until
26+
:meth:`execute` or :meth:`execute_pages` is called.
27+
28+
Obtained via ``client.query.fetchxml(xml)``.
29+
30+
:param xml: Stripped, well-formed FetchXML string.
31+
:param entity_name: Entity schema name from the ``<entity>`` element.
32+
:param client: Parent :class:`~PowerPlatform.Dataverse.aio.async_client.AsyncDataverseClient`.
33+
"""
34+
35+
def __init__(self, xml: str, entity_name: str, client: "AsyncDataverseClient") -> None:
36+
self._xml = xml
37+
self._entity_name = entity_name
38+
self._client = client
39+
40+
async def execute(self) -> QueryResult:
41+
"""Execute the FetchXML query and return all results as a :class:`QueryResult`.
42+
43+
Awaitable — fetches all pages and holds every record in memory before
44+
returning. Use :meth:`execute_pages` when the result set may be large.
45+
46+
:return: All matching records across all pages.
47+
:rtype: :class:`~PowerPlatform.Dataverse.models.record.QueryResult`
48+
49+
Example::
50+
51+
rows = await client.query.fetchxml(xml).execute()
52+
df = rows.to_dataframe()
53+
"""
54+
all_records: List[Record] = []
55+
async for page in self.execute_pages():
56+
all_records.extend(page.records)
57+
return QueryResult(all_records)
58+
59+
async def execute_pages(self) -> AsyncIterator[QueryResult]:
60+
"""Lazily yield one :class:`QueryResult` per HTTP page.
61+
62+
Each iteration fires one HTTP request and yields one page. One-shot —
63+
do not iterate more than once.
64+
65+
:return: Async iterator of per-page :class:`QueryResult` objects.
66+
:rtype: AsyncIterator[:class:`~PowerPlatform.Dataverse.models.record.QueryResult`]
67+
68+
Example::
69+
70+
async for page in client.query.fetchxml(xml).execute_pages():
71+
process(page.to_dataframe())
72+
"""
73+
current_xml = self._xml
74+
page_num = 1
75+
page_count = 0
76+
77+
async with self._client._scoped_odata() as od:
78+
entity_set = await od._entity_set_from_schema_name(self._entity_name)
79+
base_url = f"{od.api}/{entity_set}"
80+
81+
while True:
82+
page_count += 1
83+
if page_count > _MAX_PAGES:
84+
raise ValidationError(
85+
f"FetchXML paging exceeded {_MAX_PAGES} pages. "
86+
"This may indicate a runaway query or a bug in paging cookie propagation."
87+
)
88+
89+
encoded_len = len(base_url) + len("?fetchXml=") + len(_url_quote(current_xml, safe=""))
90+
if encoded_len > _MAX_URL_LENGTH:
91+
raise ValidationError(
92+
f"FetchXML request URL exceeds {_MAX_URL_LENGTH} characters after encoding. "
93+
"Simplify the query or reduce attributes/conditions."
94+
)
95+
96+
r = await od._request(
97+
"get",
98+
base_url,
99+
headers={"Prefer": _PREFER_HEADER},
100+
params={"fetchXml": current_xml},
101+
)
102+
try:
103+
data = await r.json(content_type=None)
104+
except Exception:
105+
data = {}
106+
107+
items = data.get("value") if isinstance(data, dict) else None
108+
page_records: List[Record] = []
109+
if isinstance(items, list):
110+
for item in items:
111+
if isinstance(item, dict):
112+
page_records.append(Record.from_api_response(self._entity_name, item))
113+
114+
yield QueryResult(page_records)
115+
116+
more_raw = data.get("@Microsoft.Dynamics.CRM.morerecords", False) if isinstance(data, dict) else False
117+
more = more_raw is True or (isinstance(more_raw, str) and more_raw.lower() == "true")
118+
if not more:
119+
break
120+
121+
raw_cookie = (
122+
data.get("@Microsoft.Dynamics.CRM.fetchxmlpagingcookie", "") if isinstance(data, dict) else ""
123+
)
124+
125+
_cookie_parse_error = False
126+
if raw_cookie:
127+
try:
128+
cookie_el = _ET.fromstring(raw_cookie)
129+
inner_encoded = cookie_el.get("pagingcookie", "")
130+
if inner_encoded:
131+
cookie = _url_unquote(_url_unquote(inner_encoded))
132+
page_num = int(cookie_el.get("pagenumber", str(page_num + 1)))
133+
fetch_el = _ET.fromstring(current_xml)
134+
fetch_el.set("paging-cookie", cookie)
135+
fetch_el.set("page", str(page_num))
136+
current_xml = _ET.tostring(fetch_el, encoding="unicode")
137+
continue
138+
except (_ET.ParseError, ValueError) as exc:
139+
warnings.warn(
140+
f"FetchXML paging cookie could not be parsed ({exc}); "
141+
"falling back to simple paging.",
142+
UserWarning,
143+
stacklevel=2,
144+
)
145+
_cookie_parse_error = True
146+
147+
if not _cookie_parse_error:
148+
warnings.warn(
149+
"Dataverse did not return a paging cookie; falling back to simple paging "
150+
"(page-number increment only). Simple paging is capped at 50,000 records "
151+
"and degrades in performance at high page numbers. Consider reordering on "
152+
"a root-entity column to enable cookie-based paging.",
153+
UserWarning,
154+
stacklevel=2,
155+
)
156+
page_num += 1
157+
fetch_el = _ET.fromstring(current_xml)
158+
fetch_el.set("page", str(page_num))
159+
current_xml = _ET.tostring(fetch_el, encoding="unicode")
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT license.
3+
4+
"""AsyncQueryBuilder — async execution layer over the shared QueryBuilder."""
5+
6+
from __future__ import annotations
7+
8+
from typing import AsyncIterator, List
9+
10+
from ...models.query_builder import QueryBuilder
11+
from ...models.record import QueryResult, Record
12+
13+
14+
__all__ = ["AsyncQueryBuilder"]
15+
16+
17+
class AsyncQueryBuilder(QueryBuilder):
18+
"""Async-capable QueryBuilder.
19+
20+
Identical fluent interface to :class:`~PowerPlatform.Dataverse.models.query_builder.QueryBuilder`
21+
— all chaining methods (``select``, ``where``, ``order_by``, ``top``, ``page_size``,
22+
``count``, ``expand``, ``include_annotations``, ``include_formatted_values``) are
23+
inherited unchanged. Only the execution methods are overridden as coroutines.
24+
25+
Obtained via ``client.query.builder(table)`` on an async client.
26+
27+
Example::
28+
29+
from PowerPlatform.Dataverse.models.filters import col
30+
31+
result = await (client.query.builder("account")
32+
.select("name", "revenue")
33+
.where(col("statecode") == 0)
34+
.order_by("revenue", descending=True)
35+
.top(100)
36+
.execute())
37+
for record in result:
38+
print(record["name"])
39+
"""
40+
41+
async def execute(self) -> QueryResult:
42+
"""Execute the query and return all results as a :class:`QueryResult`.
43+
44+
Awaitable — fetches all pages and holds every record in memory before
45+
returning. Use :meth:`execute_pages` for lazy per-page streaming.
46+
47+
At least one of ``select()``, ``where()``, ``top()``, or
48+
``page_size()`` must be called first to prevent accidental full-table
49+
scans.
50+
51+
:return: All matching records across all pages.
52+
:rtype: :class:`~PowerPlatform.Dataverse.models.record.QueryResult`
53+
:raises ValueError: If no scope constraint has been set.
54+
:raises RuntimeError: If the builder was not created via
55+
``client.query.builder()``.
56+
57+
Example::
58+
59+
result = await (client.query.builder("account")
60+
.select("name")
61+
.where(col("statecode") == 0)
62+
.execute())
63+
for record in result:
64+
print(record["name"])
65+
"""
66+
if self._query_ops is None:
67+
raise RuntimeError(
68+
"Cannot execute: query was not created via client.query.builder(). "
69+
"Use build() and pass parameters to client.records.list() instead."
70+
)
71+
if not self._select and not self._filter_parts and self._top is None and self._page_size is None:
72+
raise ValueError(
73+
"At least one of select(), where(), top(), or page_size() must be called before "
74+
"execute() to prevent accidental full-table scans."
75+
)
76+
params = self.build()
77+
client = self._query_ops._client
78+
all_records: List[Record] = []
79+
async with client._scoped_odata() as od:
80+
async for page in od._get_multiple(
81+
params["table"],
82+
select=params.get("select"),
83+
filter=params.get("filter"),
84+
orderby=params.get("orderby"),
85+
top=params.get("top"),
86+
expand=params.get("expand"),
87+
page_size=params.get("page_size"),
88+
count=params.get("count", False),
89+
include_annotations=params.get("include_annotations"),
90+
):
91+
all_records.extend(Record.from_api_response(params["table"], row) for row in page)
92+
return QueryResult(all_records)
93+
94+
async def execute_pages(self) -> AsyncIterator[QueryResult]:
95+
"""Lazily yield one :class:`QueryResult` per HTTP page.
96+
97+
Each iteration triggers one network request. One-shot — do not
98+
iterate more than once.
99+
100+
At least one of ``select()``, ``where()``, ``top()``, or
101+
``page_size()`` must be called first to prevent accidental full-table
102+
scans.
103+
104+
:return: Async iterator of per-page
105+
:class:`~PowerPlatform.Dataverse.models.record.QueryResult` objects.
106+
:rtype: AsyncIterator[:class:`~PowerPlatform.Dataverse.models.record.QueryResult`]
107+
:raises ValueError: If no scope constraint has been set.
108+
:raises RuntimeError: If the builder was not created via
109+
``client.query.builder()``.
110+
111+
Example::
112+
113+
async for page in (client.query.builder("account")
114+
.select("name")
115+
.execute_pages()):
116+
process(page.to_dataframe())
117+
"""
118+
if self._query_ops is None:
119+
raise RuntimeError(
120+
"Cannot execute: query was not created via client.query.builder(). "
121+
"Use build() and pass parameters to client.records.list() instead."
122+
)
123+
if not self._select and not self._filter_parts and self._top is None and self._page_size is None:
124+
raise ValueError(
125+
"At least one of select(), where(), top(), or page_size() must be called before "
126+
"execute_pages() to prevent accidental full-table scans."
127+
)
128+
params = self.build()
129+
client = self._query_ops._client
130+
async with client._scoped_odata() as od:
131+
async for page in od._get_multiple(
132+
params["table"],
133+
select=params.get("select"),
134+
filter=params.get("filter"),
135+
orderby=params.get("orderby"),
136+
top=params.get("top"),
137+
expand=params.get("expand"),
138+
page_size=params.get("page_size"),
139+
count=params.get("count", False),
140+
include_annotations=params.get("include_annotations"),
141+
):
142+
yield QueryResult([Record.from_api_response(params["table"], row) for row in page])

0 commit comments

Comments
 (0)