Skip to content

Commit 82aafb6

Browse files
Abel Milashclaude
andcommitted
Add prefetch_pages to _get_multiple for overlapped page fetching
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 5b2f6e9 commit 82aafb6

10 files changed

Lines changed: 268 additions & 12 deletions

File tree

src/PowerPlatform/Dataverse/data/_odata.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,7 @@ def _get_multiple(
840840
page_size: Optional[int] = None,
841841
count: bool = False,
842842
include_annotations: Optional[str] = None,
843+
prefetch_pages: int = 0,
843844
) -> Iterable[List[Dict[str, Any]]]:
844845
"""Iterate records from an entity set, yielding one page (list of dicts) at a time.
845846
@@ -861,6 +862,11 @@ def _get_multiple(
861862
:type count: ``bool``
862863
:param include_annotations: OData annotation pattern for the ``Prefer: odata.include-annotations`` header (e.g. ``"*"`` or ``"OData.Community.Display.V1.FormattedValue"``), or ``None``.
863864
:type include_annotations: ``str`` | ``None``
865+
:param prefetch_pages: Number of pages to pre-fetch ahead of the caller. ``0`` (default) is
866+
fully sequential. ``1`` submits the next HTTP request immediately after receiving the
867+
current page, overlapping network I/O with the caller's processing time. Values above
868+
``1`` are clamped to ``1``.
869+
:type prefetch_pages: ``int``
864870
865871
:return: Iterator yielding pages (each page is a ``list`` of record dicts).
866872
:rtype: ``Iterable[list[dict[str, Any]]]``
@@ -905,21 +911,38 @@ def _do_request(url: str, *, params: Optional[Dict[str, Any]] = None) -> Dict[st
905911
if count:
906912
params["$count"] = "true"
907913

908-
data = _do_request(base_url, params=params)
909-
items = data.get("value") if isinstance(data, dict) else None
910-
if isinstance(items, list) and items:
911-
yield [x for x in items if isinstance(x, dict)]
912-
913-
next_link = None
914-
if isinstance(data, dict):
915-
next_link = data.get("@odata.nextLink") or data.get("odata.nextLink")
916-
917-
while next_link:
918-
data = _do_request(next_link)
914+
if prefetch_pages <= 0:
915+
data = _do_request(base_url, params=params)
919916
items = data.get("value") if isinstance(data, dict) else None
920917
if isinstance(items, list) and items:
921918
yield [x for x in items if isinstance(x, dict)]
922-
next_link = data.get("@odata.nextLink") or data.get("odata.nextLink") if isinstance(data, dict) else None
919+
920+
next_link = None
921+
if isinstance(data, dict):
922+
next_link = data.get("@odata.nextLink") or data.get("odata.nextLink")
923+
924+
while next_link:
925+
data = _do_request(next_link)
926+
items = data.get("value") if isinstance(data, dict) else None
927+
if isinstance(items, list) and items:
928+
yield [x for x in items if isinstance(x, dict)]
929+
next_link = data.get("@odata.nextLink") or data.get("odata.nextLink") if isinstance(data, dict) else None
930+
else:
931+
# Submit the next request before yielding the current page so
932+
# network I/O overlaps with the caller's processing time.
933+
ctx = copy_context()
934+
executor = ThreadPoolExecutor(max_workers=1)
935+
try:
936+
pending = executor.submit(ctx.run, _do_request, base_url, params=params)
937+
while pending is not None:
938+
data = pending.result()
939+
items = data.get("value") if isinstance(data, dict) else None
940+
next_link = (data.get("@odata.nextLink") or data.get("odata.nextLink")) if isinstance(data, dict) else None
941+
pending = executor.submit(ctx.run, _do_request, next_link) if next_link else None
942+
if isinstance(items, list) and items:
943+
yield [x for x in items if isinstance(x, dict)]
944+
finally:
945+
executor.shutdown(wait=False, cancel_futures=True)
923946

924947
# --------------------------- SQL Custom API -------------------------
925948
def _query_sql(self, sql: str) -> list[dict[str, Any]]:

src/PowerPlatform/Dataverse/operations/records.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ def get(
301301
page_size: Optional[int] = None,
302302
count: bool = False,
303303
include_annotations: Optional[str] = None,
304+
prefetch_pages: int = 0,
304305
) -> Iterable[List[Record]]:
305306
"""Fetch multiple records from a Dataverse table with pagination.
306307
@@ -338,6 +339,11 @@ def get(
338339
``Prefer: odata.include-annotations`` header (e.g. ``"*"`` or
339340
``"OData.Community.Display.V1.FormattedValue"``), or ``None``.
340341
:type include_annotations: :class:`str` or None
342+
:param prefetch_pages: When ``1``, the next page is fetched in a
343+
background thread while the caller processes the current page,
344+
overlapping network I/O with processing. ``0`` (default) is fully
345+
sequential. Values above ``1`` are clamped to ``1``.
346+
:type prefetch_pages: :class:`int`
341347
342348
:return: Generator yielding pages, where each page is a list of
343349
:class:`~PowerPlatform.Dataverse.models.record.Record` objects.
@@ -370,6 +376,7 @@ def get(
370376
page_size: Optional[int] = None,
371377
count: bool = False,
372378
include_annotations: Optional[str] = None,
379+
prefetch_pages: int = 0,
373380
) -> Union[Record, Iterable[List[Record]]]:
374381
"""Fetch a single record by ID, or fetch multiple records with pagination.
375382
@@ -485,6 +492,7 @@ def _paged() -> Iterable[List[Record]]:
485492
page_size=page_size,
486493
count=count,
487494
include_annotations=include_annotations,
495+
prefetch_pages=prefetch_pages,
488496
):
489497
yield [Record.from_api_response(table, row) for row in page]
490498

tests/perf/prefetch_pages.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT license.
3+
4+
"""Performance comparison: prefetch_pages=0 (sequential) vs prefetch_pages=1.
5+
6+
Usage:
7+
python tests/perf/prefetch_pages.py
8+
9+
Set PYTHONPATH=src before running. A browser login prompt will appear.
10+
11+
Environment variables (all optional):
12+
DATAVERSE_URL Org URL — if omitted, you will be prompted interactively
13+
PAGE_SIZE Records per page (default: 10)
14+
RUNS Repetitions per mode for averaging (default: 3)
15+
"""
16+
17+
import os
18+
import sys
19+
import time
20+
import statistics
21+
22+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
23+
24+
from azure.identity import InteractiveBrowserCredential
25+
from PowerPlatform.Dataverse.client import DataverseClient
26+
27+
_TABLE = "new_PrefetchPerfTest"
28+
_RECORD_COUNT = 50 # 50 records ÷ PAGE_SIZE pages for pagination
29+
30+
31+
def _backoff(op, delays=(0, 2, 5, 10, 20)):
32+
last = None
33+
for d in delays:
34+
if d:
35+
time.sleep(d)
36+
try:
37+
return op()
38+
except Exception as ex: # noqa: BLE001
39+
last = ex
40+
raise last
41+
42+
43+
def _setup(client: DataverseClient) -> None:
44+
"""Create table and populate with enough records to paginate."""
45+
print(f"\n-> client.tables.get('{_TABLE}')")
46+
info = _backoff(lambda: client.tables.get(_TABLE))
47+
if info:
48+
print(f"[OK] Table already exists: {info['table_schema_name']}")
49+
else:
50+
print(f"-> client.tables.create('{_TABLE}', ...)")
51+
_backoff(lambda: client.tables.create(_TABLE, {"new_Label": "string"}))
52+
print(f"[OK] Created table: {_TABLE}")
53+
54+
print(f"-> client.records.create('{_TABLE}', [{_RECORD_COUNT} records])")
55+
records = [{"new_Label": f"record-{i:04d}"} for i in range(_RECORD_COUNT)]
56+
ids = _backoff(lambda: client.records.create(_TABLE, records))
57+
print(f"[OK] Inserted {len(ids)} records")
58+
59+
60+
def _cleanup(client: DataverseClient) -> None:
61+
print(f"\n-> client.tables.delete('{_TABLE}')")
62+
try:
63+
_backoff(lambda: client.tables.delete(_TABLE))
64+
print(f"[OK] Deleted table: {_TABLE}")
65+
except Exception as ex: # noqa: BLE001
66+
print(f"[WARN] Cleanup failed: {ex}")
67+
68+
69+
def _consume(client: DataverseClient, page_size: int, prefetch: int) -> tuple[int, int, float]:
70+
"""Return (total_records, page_count, elapsed_seconds)."""
71+
total = 0
72+
pages = 0
73+
t0 = time.perf_counter()
74+
for page in client.records.get(_TABLE, page_size=page_size, prefetch_pages=prefetch):
75+
total += len(page)
76+
pages += 1
77+
time.sleep(0.05) # simulate per-page processing (e.g. db write, transform)
78+
return total, pages, time.perf_counter() - t0
79+
80+
81+
def _run(client: DataverseClient, page_size: int, runs: int) -> None:
82+
# Dry run to confirm page count
83+
_, page_count, _ = _consume(client, page_size, prefetch=0)
84+
print(f"\nPage size : {page_size}")
85+
print(f"Pages found : {page_count}")
86+
print(f"Runs : {runs}\n")
87+
88+
results: dict[int, list[float]] = {0: [], 1: []}
89+
90+
for run in range(1, runs + 1):
91+
for mode in (0, 1):
92+
_, _, elapsed = _consume(client, page_size, prefetch=mode)
93+
results[mode].append(elapsed)
94+
label = "sequential" if mode == 0 else "prefetch=1"
95+
print(f" Run {run} {label:12s} {elapsed:.3f}s")
96+
97+
print()
98+
print("=" * 50)
99+
print(f"{'Mode':<14} {'Mean':>7} {'Median':>7} {'Min':>7} {'Max':>7}")
100+
print("-" * 50)
101+
for mode, label in ((0, "sequential"), (1, "prefetch=1")):
102+
t = results[mode]
103+
print(f"{label:<14} {statistics.mean(t):>6.3f}s {statistics.median(t):>6.3f}s {min(t):>6.3f}s {max(t):>6.3f}s")
104+
print("=" * 50)
105+
106+
mean_seq = statistics.mean(results[0])
107+
mean_pre = statistics.mean(results[1])
108+
if mean_seq > 0:
109+
delta = (mean_seq - mean_pre) / mean_seq * 100
110+
direction = "faster" if delta > 0 else "slower"
111+
print(f"\nprefetch=1 is {abs(delta):.1f}% {direction} on average.")
112+
113+
114+
def main():
115+
print("=" * 60)
116+
print("prefetch_pages performance comparison")
117+
print("=" * 60)
118+
119+
base_url = os.environ.get("DATAVERSE_URL", "").rstrip("/")
120+
if not base_url:
121+
base_url = input("Enter Dataverse org URL: ").strip().rstrip("/")
122+
if not base_url:
123+
print("No URL provided; exiting.")
124+
sys.exit(1)
125+
126+
page_size = int(os.environ.get("PAGE_SIZE", "10"))
127+
runs = int(os.environ.get("RUNS", "3"))
128+
129+
print(f"\n-> InteractiveBrowserCredential()")
130+
credential = InteractiveBrowserCredential()
131+
132+
print(f"-> DataverseClient('{base_url}', ...)")
133+
with DataverseClient(base_url=base_url, credential=credential) as client:
134+
print(f"[OK] Connected to: {base_url}")
135+
try:
136+
_setup(client)
137+
_run(client, page_size, runs)
138+
finally:
139+
_cleanup(client)
140+
141+
142+
if __name__ == "__main__":
143+
main()

tests/unit/data/test_odata_internal.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,81 @@ def test_empty_value_list_yields_nothing(self):
10861086
self.assertEqual(pages, [])
10871087

10881088

1089+
class TestGetMultiplePrefetch(unittest.TestCase):
1090+
"""Behavioral tests for _get_multiple with prefetch_pages=1."""
1091+
1092+
def setUp(self):
1093+
self.od = _make_odata_client()
1094+
self.od._entity_set_from_schema_name = MagicMock(return_value="accounts")
1095+
1096+
def _two_page_responses(self):
1097+
page1 = _mock_response(
1098+
json_data={"value": [{"accountid": "id-1"}], "@odata.nextLink": "https://example.crm.dynamics.com/api/data/v9.2/accounts?$skiptoken=2"},
1099+
text="...",
1100+
)
1101+
page2 = _mock_response(
1102+
json_data={"value": [{"accountid": "id-2"}]},
1103+
text="...",
1104+
)
1105+
self.od._request.side_effect = [page1, page2]
1106+
1107+
def test_prefetch_returns_same_pages_as_sequential(self):
1108+
"""prefetch_pages=1 yields the same records in the same order as sequential mode."""
1109+
self._two_page_responses()
1110+
pages = list(self.od._get_multiple("account", prefetch_pages=1))
1111+
self.assertEqual(len(pages), 2)
1112+
self.assertEqual(pages[0][0]["accountid"], "id-1")
1113+
self.assertEqual(pages[1][0]["accountid"], "id-2")
1114+
1115+
def test_prefetch_values_above_one_clamped_to_one(self):
1116+
"""prefetch_pages=2 (or any value > 1) behaves identically to prefetch_pages=1."""
1117+
self._two_page_responses()
1118+
pages = list(self.od._get_multiple("account", prefetch_pages=2))
1119+
self.assertEqual(len(pages), 2)
1120+
self.assertEqual(pages[0][0]["accountid"], "id-1")
1121+
self.assertEqual(pages[1][0]["accountid"], "id-2")
1122+
# Both pages should have been fetched (max_workers=1 means sequential requests)
1123+
self.assertEqual(self.od._request.call_count, 2)
1124+
1125+
def test_prefetch_exception_propagates_to_caller(self):
1126+
"""An exception raised by the background request surfaces when the caller consumes the next page."""
1127+
page1 = _mock_response(
1128+
json_data={"value": [{"accountid": "id-1"}], "@odata.nextLink": "https://example.crm.dynamics.com/api/data/v9.2/accounts?$skiptoken=2"},
1129+
text="...",
1130+
)
1131+
self.od._request.side_effect = [page1, RuntimeError("network failure")]
1132+
gen = self.od._get_multiple("account", prefetch_pages=1)
1133+
first = next(gen)
1134+
self.assertEqual(first[0]["accountid"], "id-1")
1135+
with self.assertRaises(RuntimeError):
1136+
next(gen)
1137+
1138+
def test_prefetch_early_close_does_not_hang(self):
1139+
"""Closing the generator early (break) does not deadlock or raise an error."""
1140+
page1 = _mock_response(
1141+
json_data={"value": [{"accountid": "id-1"}], "@odata.nextLink": "https://example.crm.dynamics.com/api/data/v9.2/accounts?$skiptoken=2"},
1142+
text="...",
1143+
)
1144+
page2 = _mock_response(
1145+
json_data={"value": [{"accountid": "id-2"}]},
1146+
text="...",
1147+
)
1148+
self.od._request.side_effect = [page1, page2]
1149+
gen = self.od._get_multiple("account", prefetch_pages=1)
1150+
first = next(gen)
1151+
self.assertEqual(first[0]["accountid"], "id-1")
1152+
gen.close() # should not raise or hang
1153+
1154+
def test_prefetch_single_page_no_nextlink(self):
1155+
"""prefetch_pages=1 with a single page (no nextLink) yields exactly one page."""
1156+
data = {"value": [{"accountid": "id-1"}]}
1157+
self.od._request.return_value = _mock_response(json_data=data, text=str(data))
1158+
pages = list(self.od._get_multiple("account", prefetch_pages=1))
1159+
self.assertEqual(len(pages), 1)
1160+
self.assertEqual(pages[0][0]["accountid"], "id-1")
1161+
self.od._request.assert_called_once()
1162+
1163+
10891164
class TestQuerySql(unittest.TestCase):
10901165
"""Unit tests for _ODataClient._query_sql."""
10911166

tests/unit/test_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ def test_get_multiple(self):
128128
page_size=None,
129129
count=False,
130130
include_annotations=None,
131+
prefetch_pages=0,
131132
)
132133
self.assertEqual(len(results), 1)
133134
self.assertEqual(len(results[0]), 2)

tests/unit/test_client_dataframe.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ def test_get_passes_all_parameters(self):
112112
page_size=25,
113113
count=False,
114114
include_annotations=None,
115+
prefetch_pages=0,
115116
)
116117

117118

tests/unit/test_client_deprecations.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ def test_get_multiple_warns(self):
132132
page_size=None,
133133
count=False,
134134
include_annotations=None,
135+
prefetch_pages=0,
135136
)
136137

137138
# ------------------------------------------------------------- query_sql

tests/unit/test_dataframe_operations.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def test_get_passes_all_params(self):
9696
page_size=25,
9797
count=False,
9898
include_annotations=None,
99+
prefetch_pages=0,
99100
)
100101

101102
def test_get_record_id_with_query_params_raises(self):

tests/unit/test_query_operations.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def test_builder_execute_flat_default(self):
8383
page_size=None,
8484
count=False,
8585
include_annotations=None,
86+
prefetch_pages=0,
8687
)
8788
self.assertEqual(len(records), 1)
8889
self.assertEqual(records[0]["name"], "Test")
@@ -134,6 +135,7 @@ def test_builder_execute_all_params(self):
134135
page_size=25,
135136
count=False,
136137
include_annotations=None,
138+
prefetch_pages=0,
137139
)
138140

139141
def test_builder_execute_with_where(self):

tests/unit/test_records_operations.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ def test_get_paginated_with_all_params(self):
245245
page_size=25,
246246
count=False,
247247
include_annotations=None,
248+
prefetch_pages=0,
248249
)
249250

250251
# ------------------------------------------------------------------ upsert

0 commit comments

Comments
 (0)