Skip to content

Commit c87bc60

Browse files
Abel Milashclaude
andcommitted
Async review pass: align with GA API, fix async patterns, remove deprecated surface
- Remove deprecated filter_eq/filter_ne/filter_in methods from async_records and async_query (not in GA API) - Remove stale tests for removed methods - Use asyncio.gather() for independent I/O: relationship sub-requests, batch delete/remove-columns, multi-record label conversion, delete_columns metadata lookups - Fix blocking file I/O in _async_upload: asyncio.to_thread() for reads, Path instead of os.path - Remove lazy imports in _async_relationships; move to module level - Fix test_async_batch: wrap deprecated records.get() call in pytest.warns(DeprecationWarning) - Remove unused Optional import from _async_batch - Remove redundant return None statements in _async_odata Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent c02b6e9 commit c87bc60

15 files changed

Lines changed: 254 additions & 1106 deletions

src/PowerPlatform/Dataverse/aio/async_client.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ class AsyncDataverseClient:
7171
Example:
7272
**Recommended -- async context manager** (enables HTTP connection pooling)::
7373
74-
from azure.identity.aio import DefaultAzureCredential
74+
from azure.identity.aio import InteractiveBrowserCredential
7575
from PowerPlatform.Dataverse.aio.async_client import AsyncDataverseClient
7676
77-
credential = DefaultAzureCredential()
77+
credential = InteractiveBrowserCredential()
7878
7979
async with AsyncDataverseClient("https://org.crm.dynamics.com", credential) as client:
8080
record_id = await client.records.create("account", {"name": "Contoso Ltd"})
@@ -118,12 +118,17 @@ def _get_odata(self) -> _AsyncODataClient:
118118
Get or create the internal async OData client instance.
119119
120120
This method implements lazy initialization of the low-level async OData
121-
client, deferring construction until the first API call.
121+
client, deferring construction until the first API call. When used outside
122+
of an ``async with`` block, a :class:`aiohttp.ClientSession` is created
123+
lazily here so that standalone usage (without a context manager) works
124+
without requiring the caller to manage the session explicitly.
122125
123126
:return: The lazily-initialized low-level async client.
124127
:rtype: ~PowerPlatform.Dataverse.aio.data._async_odata._AsyncODataClient
125128
"""
126129
if self._odata is None:
130+
if self._session is None:
131+
self._session = aiohttp.ClientSession()
127132
self._odata = _AsyncODataClient(
128133
self.auth,
129134
self._base_url,
@@ -199,3 +204,32 @@ def _check_closed(self) -> None:
199204
"""Raise :class:`RuntimeError` if the client has been closed."""
200205
if self._closed:
201206
raise RuntimeError("AsyncDataverseClient is closed")
207+
208+
# ---------------- Cache utilities ----------------
209+
210+
async def flush_cache(self, kind: str) -> int:
211+
"""
212+
Flush cached client metadata or state.
213+
214+
:param kind: Cache kind to flush. Currently supported values:
215+
216+
- ``"picklist"``: Clears picklist label cache used for label-to-integer conversion
217+
218+
Future kinds (e.g. ``"entityset"``, ``"primaryid"``) may be added without
219+
breaking this signature.
220+
:type kind: :class:`str`
221+
222+
:return: Number of cache entries removed.
223+
:rtype: :class:`int`
224+
225+
Example:
226+
Clear the picklist cache::
227+
228+
removed = await client.flush_cache("picklist")
229+
print(f"Cleared {removed} cached picklist entries")
230+
"""
231+
async with self._scoped_odata() as od:
232+
return od._flush_cache(kind)
233+
234+
235+
__all__ = ["AsyncDataverseClient"]

src/PowerPlatform/Dataverse/aio/data/_async_batch.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55

66
from __future__ import annotations
77

8+
import asyncio
89
import uuid
9-
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
10+
from typing import TYPE_CHECKING, Any, Dict, List, Union
1011

1112
from ...core.errors import MetadataError, ValidationError
1213
from ...core._error_codes import METADATA_TABLE_NOT_FOUND, METADATA_COLUMN_NOT_FOUND
@@ -250,10 +251,7 @@ async def _resolve_record_delete(self, op: _RecordDelete) -> List[_RawRequest]:
250251
return []
251252
if op.use_bulk_delete:
252253
return [await self._od._build_delete_multiple(op.table, ids)]
253-
requests: List[_RawRequest] = []
254-
for rid in ids:
255-
requests.append(await self._od._build_delete(op.table, rid))
256-
return requests
254+
return list(await asyncio.gather(*[self._od._build_delete(op.table, rid) for rid in ids]))
257255

258256
async def _resolve_record_get(self, op: _RecordGet) -> List[_RawRequest]:
259257
return [
@@ -317,11 +315,12 @@ async def _resolve_table_add_columns(self, op: _TableAddColumns) -> List[_RawReq
317315
async def _resolve_table_remove_columns(self, op: _TableRemoveColumns) -> List[_RawRequest]:
318316
columns = [op.columns] if isinstance(op.columns, str) else list(op.columns)
319317
metadata_id = await self._require_entity_metadata(op.table)
318+
attr_metas = await asyncio.gather(*[
319+
self._od._get_attribute_metadata(metadata_id, col_name, extra_select="@odata.type,AttributeType")
320+
for col_name in columns
321+
])
320322
requests: List[_RawRequest] = []
321-
for col_name in columns:
322-
attr_meta = await self._od._get_attribute_metadata(
323-
metadata_id, col_name, extra_select="@odata.type,AttributeType"
324-
)
323+
for col_name, attr_meta in zip(columns, attr_metas):
325324
if not attr_meta or not attr_meta.get("MetadataId"):
326325
raise MetadataError(
327326
f"Column '{col_name}' not found on table '{op.table}'.",

src/PowerPlatform/Dataverse/aio/data/_async_odata.py

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -367,11 +367,13 @@ async def _upsert_multiple(
367367
f"alternate_keys and records must have the same length " f"({len(alternate_keys)} != {len(records)})"
368368
)
369369
logical_name = table_schema_name.lower()
370+
lowered_records = [self._lowercase_keys(r) for r in records]
371+
converted = await asyncio.gather(*[
372+
self._convert_labels_to_ints(table_schema_name, r) for r in lowered_records
373+
])
370374
targets: List[Dict[str, Any]] = []
371-
for alt_key, record in zip(alternate_keys, records):
375+
for alt_key, record_processed in zip(alternate_keys, converted):
372376
alt_key_lower = self._lowercase_keys(alt_key)
373-
record_processed = self._lowercase_keys(record)
374-
record_processed = await self._convert_labels_to_ints(table_schema_name, record_processed)
375377
conflicting = {
376378
k for k in set(alt_key_lower) & set(record_processed) if alt_key_lower[k] != record_processed[k]
377379
}
@@ -429,7 +431,7 @@ async def _update_by_ids(
429431
if isinstance(changes, dict):
430432
batch = [{pk_attr: rid, **changes} for rid in ids]
431433
await self._update_multiple(entity_set, table_schema_name, batch)
432-
return None
434+
return
433435
if not isinstance(changes, list):
434436
raise TypeError("changes must be dict or list[dict]")
435437
if len(changes) != len(ids):
@@ -440,7 +442,6 @@ async def _update_by_ids(
440442
raise TypeError("Each patch must be a dict")
441443
batch.append({pk_attr: rid, **patch})
442444
await self._update_multiple(entity_set, table_schema_name, batch)
443-
return None
444445

445446
async def _delete_multiple(
446447
self,
@@ -513,7 +514,6 @@ async def _update_multiple(
513514
if not isinstance(records, list) or not records or not all(isinstance(r, dict) for r in records):
514515
raise TypeError("records must be a non-empty list[dict]")
515516
await self._execute_raw(await self._build_update_multiple_from_records(entity_set, table_schema_name, records))
516-
return None
517517

518518
async def _delete(self, table_schema_name: str, key: str) -> None:
519519
"""Delete a record by GUID.
@@ -1477,10 +1477,11 @@ async def _delete_columns(
14771477
deleted: List[str] = []
14781478
needs_picklist_flush = False
14791479

1480-
for column_name in names:
1481-
attr_meta = await self._get_attribute_metadata(
1482-
metadata_id, column_name, extra_select="@odata.type,AttributeType"
1483-
)
1480+
attr_metas = await asyncio.gather(*[
1481+
self._get_attribute_metadata(metadata_id, col, extra_select="@odata.type,AttributeType")
1482+
for col in names
1483+
])
1484+
for column_name, attr_meta in zip(names, attr_metas):
14841485
if not attr_meta:
14851486
raise MetadataError(
14861487
f"Column '{column_name}' not found on table '{entity_schema}'.",
@@ -1536,13 +1537,12 @@ async def _build_create_multiple(
15361537
if not all(isinstance(r, dict) for r in records):
15371538
raise TypeError("All items for multi-create must be dicts")
15381539
logical_name = table.lower()
1539-
enriched = []
1540-
for r in records:
1541-
r = self._lowercase_keys(r)
1542-
r = await self._convert_labels_to_ints(table, r)
1543-
if "@odata.type" not in r:
1544-
r = {**r, "@odata.type": f"Microsoft.Dynamics.CRM.{logical_name}"}
1545-
enriched.append(r)
1540+
lowered = [self._lowercase_keys(r) for r in records]
1541+
converted = await asyncio.gather(*[self._convert_labels_to_ints(table, r) for r in lowered])
1542+
enriched = [
1543+
{**r, "@odata.type": f"Microsoft.Dynamics.CRM.{logical_name}"} if "@odata.type" not in r else r
1544+
for r in converted
1545+
]
15461546
return _RawRequest(
15471547
method="POST",
15481548
url=f"{self.api}/{entity_set}/Microsoft.Dynamics.CRM.CreateMultiple",
@@ -1590,13 +1590,12 @@ async def _build_update_multiple_from_records(
15901590
:meth:`_build_update_multiple` (which assembles from ids + changes).
15911591
"""
15921592
logical_name = table.lower()
1593-
enriched = []
1594-
for r in records:
1595-
r = self._lowercase_keys(r)
1596-
r = await self._convert_labels_to_ints(table, r)
1597-
if "@odata.type" not in r:
1598-
r = {**r, "@odata.type": f"Microsoft.Dynamics.CRM.{logical_name}"}
1599-
enriched.append(r)
1593+
lowered = [self._lowercase_keys(r) for r in records]
1594+
converted = await asyncio.gather(*[self._convert_labels_to_ints(table, r) for r in lowered])
1595+
enriched = [
1596+
{**r, "@odata.type": f"Microsoft.Dynamics.CRM.{logical_name}"} if "@odata.type" not in r else r
1597+
for r in converted
1598+
]
16001599
return _RawRequest(
16011600
method="POST",
16021601
url=f"{self.api}/{entity_set}/Microsoft.Dynamics.CRM.UpdateMultiple",
@@ -1661,11 +1660,11 @@ async def _build_upsert_multiple(
16611660
subcode="upsert_length_mismatch",
16621661
)
16631662
logical_name = table.lower()
1663+
lowered_records = [self._lowercase_keys(r) for r in records]
1664+
converted = await asyncio.gather(*[self._convert_labels_to_ints(table, r) for r in lowered_records])
16641665
targets: List[Dict[str, Any]] = []
1665-
for alt_key, record in zip(alternate_keys, records):
1666+
for alt_key, record_processed in zip(alternate_keys, converted):
16661667
alt_key_lower = self._lowercase_keys(alt_key)
1667-
record_processed = self._lowercase_keys(record)
1668-
record_processed = await self._convert_labels_to_ints(table, record_processed)
16691668
conflicting = {
16701669
k for k in set(alt_key_lower) & set(record_processed) if alt_key_lower[k] != record_processed[k]
16711670
}

src/PowerPlatform/Dataverse/aio/data/_async_relationships.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,13 @@
1111

1212
__all__ = []
1313

14+
import asyncio
1415
import re
1516
from typing import Any, Dict, List, Optional
1617

18+
from ...core.errors import MetadataError
19+
from ...core._error_codes import METADATA_TABLE_NOT_FOUND
20+
1721

1822
class _AsyncRelationshipOperationsMixin:
1923
"""
@@ -207,9 +211,6 @@ async def _list_table_relationships(
207211
:raises MetadataError: If the table is not found.
208212
:raises HttpError: If the Web API request fails.
209213
"""
210-
from ...core.errors import MetadataError
211-
from ...core._error_codes import METADATA_TABLE_NOT_FOUND
212-
213214
ent = await self._get_entity_by_table_schema_name(table_schema_name)
214215
if not ent or not ent.get("MetadataId"):
215216
raise MetadataError(
@@ -237,9 +238,12 @@ async def _list_table_relationships(
237238
many_to_one_url = f"{self.api}/EntityDefinitions({metadata_id})/ManyToOneRelationships"
238239
many_to_many_url = f"{self.api}/EntityDefinitions({metadata_id})/ManyToManyRelationships"
239240

240-
r1 = await self._request("get", one_to_many_url, headers=await self._headers(), params=one_to_many_params)
241-
r2 = await self._request("get", many_to_one_url, headers=await self._headers(), params=one_to_many_params)
242-
r3 = await self._request("get", many_to_many_url, headers=await self._headers(), params=many_to_many_params)
241+
headers = await self._headers()
242+
r1, r2, r3 = await asyncio.gather(
243+
self._request("get", one_to_many_url, headers=headers, params=one_to_many_params),
244+
self._request("get", many_to_one_url, headers=headers, params=one_to_many_params),
245+
self._request("get", many_to_many_url, headers=headers, params=many_to_many_params),
246+
)
243247

244248
return (
245249
(await r1.json()).get("value", []) + (await r2.json()).get("value", []) + (await r3.json()).get("value", [])

src/PowerPlatform/Dataverse/aio/data/_async_upload.py

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55

66
from __future__ import annotations
77

8+
import asyncio
9+
from pathlib import Path
810
from typing import Optional
11+
from urllib.parse import quote
912

1013

1114
class _AsyncFileUploadMixin:
@@ -40,8 +43,6 @@ async def _upload_file(
4043
if_none_match : :class:`bool`
4144
When True (default) only succeeds if column empty. When False overwrites (If-Match: *).
4245
"""
43-
import os
44-
4546
# Resolve entity set from table schema name
4647
entity_set = await self._entity_set_from_schema_name(table_schema_name)
4748

@@ -61,9 +62,10 @@ async def _upload_file(
6162
mode = (mode or "auto").lower()
6263

6364
if mode == "auto":
64-
if not os.path.isfile(path):
65+
p = Path(path)
66+
if not p.is_file():
6567
raise FileNotFoundError(f"File not found: {path}")
66-
size = os.path.getsize(path)
68+
size = p.stat().st_size
6769
mode = "small" if size < 128 * 1024 * 1024 else "chunk"
6870

6971
# Convert schema name to lowercase logical name for URL usage
@@ -87,19 +89,17 @@ async def _upload_file_small(
8789
if_none_match: bool = True,
8890
) -> None:
8991
"""Upload a file (<128MB) via single PATCH."""
90-
import os
91-
9292
if not record_id:
9393
raise ValueError("record_id required")
94-
if not os.path.isfile(path):
94+
p = Path(path)
95+
if not p.is_file():
9596
raise FileNotFoundError(f"File not found: {path}")
96-
size = os.path.getsize(path)
97+
size = p.stat().st_size
9798
limit = 128 * 1024 * 1024
9899
if size > limit:
99100
raise ValueError(f"File size {size} exceeds single-upload limit {limit}; use chunk mode.")
100-
with open(path, "rb") as fh:
101-
data = fh.read()
102-
fname = os.path.basename(path)
101+
data = await asyncio.to_thread(p.read_bytes)
102+
fname = p.name
103103
key = self._format_key(record_id)
104104
url = f"{self.api}/{entity_set}{key}/{file_name_attribute}"
105105
headers = {
@@ -112,7 +112,6 @@ async def _upload_file_small(
112112
headers["If-Match"] = "*"
113113
# Single PATCH upload; allow default success codes (includes 204)
114114
await self._request("patch", url, headers=headers, data=data)
115-
return None
116115

117116
async def _upload_file_chunk(
118117
self,
@@ -139,20 +138,19 @@ async def _upload_file_chunk(
139138
if_none_match : :class:`bool`
140139
When True sends ``If-None-Match: null`` to only succeed if the column is currently empty.
141140
Set False to always overwrite (uses ``If-Match: *``).
141+
142142
Returns
143143
-------
144144
None
145145
Returns nothing on success. Any failure raises an exception.
146146
"""
147-
import os, math
148-
from urllib.parse import quote
149-
150147
if not record_id:
151148
raise ValueError("record_id required")
152-
if not os.path.isfile(path):
149+
p = Path(path)
150+
if not p.is_file():
153151
raise FileNotFoundError(f"File not found: {path}")
154-
total_size = os.path.getsize(path)
155-
fname = os.path.basename(path)
152+
total_size = p.stat().st_size
153+
fname = p.name
156154
key = self._format_key(record_id)
157155
init_url = f"{self.api}/{entity_set}{key}/{file_name_attribute}?x-ms-file-name={quote(fname)}"
158156
headers = {
@@ -174,11 +172,11 @@ async def _upload_file_chunk(
174172
effective_size = recommended_size or (4 * 1024 * 1024)
175173
if effective_size <= 0:
176174
raise ValueError("effective chunk size must be positive")
177-
total_chunks = int(math.ceil(total_size / effective_size)) if total_size else 1
175+
total_chunks = (total_size + effective_size - 1) // effective_size if total_size else 1
178176
uploaded_bytes = 0
179-
with open(path, "rb") as fh:
180-
for idx in range(total_chunks):
181-
chunk = fh.read(effective_size)
177+
with p.open("rb") as fh:
178+
for _ in range(total_chunks):
179+
chunk = await asyncio.to_thread(fh.read, effective_size)
182180
if not chunk:
183181
break
184182
start = uploaded_bytes
@@ -192,4 +190,3 @@ async def _upload_file_chunk(
192190
# Each chunk returns 206 (partial) or 204 (final). Accept both.
193191
await self._request("patch", location, headers=c_headers, data=chunk, expected=(206, 204))
194192
uploaded_bytes += len(chunk)
195-
return None

0 commit comments

Comments
 (0)