Skip to content

Commit b32987a

Browse files
Abel Milashclaude
andcommitted
Async review pass: fix http/retry bugs, remove asyncio.gather, raise coverage to 98%
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 7233455 commit b32987a

26 files changed

Lines changed: 765 additions & 85 deletions
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT license.
33

4+
"""
5+
Async namespace for the PowerPlatform Dataverse SDK.
6+
7+
Import the async client via::
8+
9+
from PowerPlatform.Dataverse.aio.async_client import AsyncDataverseClient
10+
"""
11+
412
__all__ = []
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,9 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT license.
3+
4+
"""
5+
Async core infrastructure components for the Dataverse SDK.
6+
7+
This module contains the foundational async components including authentication,
8+
HTTP client, and error handling.
9+
"""

src/PowerPlatform/Dataverse/aio/core/_async_http.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,19 @@ async def _request(self, method: str, url: str, **kwargs: Any) -> aiohttp.Client
112112
for attempt in range(self.max_attempts):
113113
try:
114114
t0 = time.monotonic()
115-
resp = await self._session.request(method, url, **kwargs)
116-
# Buffer the full body so the caller can read it freely after this call
117-
# returns, and so the connection is released back to the pool.
118-
await resp.read()
115+
async with self._session.request(method, url, **kwargs) as resp:
116+
await resp.read()
119117
elapsed_ms = (time.monotonic() - t0) * 1000
120118

121119
if self._logger is not None:
122120
# Only decode resp.text when body logging is enabled — avoids
123121
# unnecessary overhead for large payloads when max_body_bytes == 0.
124-
resp_body = await resp.text() if self._logger.body_logging_enabled else None
122+
resp_body = None
123+
if self._logger.body_logging_enabled:
124+
try:
125+
resp_body = await resp.text()
126+
except Exception:
127+
pass
125128
self._logger.log_response(
126129
method,
127130
url,
@@ -131,7 +134,7 @@ async def _request(self, method: str, url: str, **kwargs: Any) -> aiohttp.Client
131134
elapsed_ms=elapsed_ms,
132135
)
133136
return resp
134-
except aiohttp.ClientError as exc:
137+
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
135138
if self._logger is not None:
136139
self._logger.log_error(
137140
method,
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,9 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT license.
3+
4+
"""
5+
Async data access layer for the Dataverse SDK.
6+
7+
This module contains async OData protocol handling, CRUD operations, metadata management,
8+
SQL query functionality, and file upload capabilities.
9+
"""

src/PowerPlatform/Dataverse/aio/data/_async_batch.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
from __future__ import annotations
77

8-
import asyncio
98
import uuid
109
from typing import TYPE_CHECKING, Any, Dict, List, Union
1110

@@ -251,7 +250,7 @@ async def _resolve_record_delete(self, op: _RecordDelete) -> List[_RawRequest]:
251250
return []
252251
if op.use_bulk_delete:
253252
return [await self._od._build_delete_multiple(op.table, ids)]
254-
return list(await asyncio.gather(*[self._od._build_delete(op.table, rid) for rid in ids]))
253+
return [await self._od._build_delete(op.table, rid) for rid in ids]
255254

256255
async def _resolve_record_get(self, op: _RecordGet) -> List[_RawRequest]:
257256
return [
@@ -315,10 +314,10 @@ async def _resolve_table_add_columns(self, op: _TableAddColumns) -> List[_RawReq
315314
async def _resolve_table_remove_columns(self, op: _TableRemoveColumns) -> List[_RawRequest]:
316315
columns = [op.columns] if isinstance(op.columns, str) else list(op.columns)
317316
metadata_id = await self._require_entity_metadata(op.table)
318-
attr_metas = await asyncio.gather(*[
319-
self._od._get_attribute_metadata(metadata_id, col_name, extra_select="@odata.type,AttributeType")
317+
attr_metas = [
318+
await self._od._get_attribute_metadata(metadata_id, col_name, extra_select="@odata.type,AttributeType")
320319
for col_name in columns
321-
])
320+
]
322321
requests: List[_RawRequest] = []
323322
for col_name, attr_meta in zip(columns, attr_metas):
324323
if not attr_meta or not attr_meta.get("MetadataId"):

src/PowerPlatform/Dataverse/aio/data/_async_odata.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def _merge(h: Optional[Dict[str, str]]) -> Dict[str, str]:
141141
if r.status in request_context.expected:
142142
return r
143143

144-
response_headers = dict(r.headers) if r.headers else {}
144+
response_headers = getattr(r, "headers", {}) or {}
145145
raw_text = ""
146146
try:
147147
raw_text = await r.text()
@@ -368,9 +368,7 @@ async def _upsert_multiple(
368368
)
369369
logical_name = table_schema_name.lower()
370370
lowered_records = [self._lowercase_keys(r) for r in records]
371-
converted = await asyncio.gather(*[
372-
self._convert_labels_to_ints(table_schema_name, r) for r in lowered_records
373-
])
371+
converted = [await self._convert_labels_to_ints(table_schema_name, r) for r in lowered_records]
374372
targets: List[Dict[str, Any]] = []
375373
for alt_key, record_processed in zip(alternate_keys, converted):
376374
alt_key_lower = self._lowercase_keys(alt_key)
@@ -1477,10 +1475,10 @@ async def _delete_columns(
14771475
deleted: List[str] = []
14781476
needs_picklist_flush = False
14791477

1480-
attr_metas = await asyncio.gather(*[
1481-
self._get_attribute_metadata(metadata_id, col, extra_select="@odata.type,AttributeType")
1478+
attr_metas = [
1479+
await self._get_attribute_metadata(metadata_id, col, extra_select="@odata.type,AttributeType")
14821480
for col in names
1483-
])
1481+
]
14841482
for column_name, attr_meta in zip(names, attr_metas):
14851483
if not attr_meta:
14861484
raise MetadataError(
@@ -1538,7 +1536,7 @@ async def _build_create_multiple(
15381536
raise TypeError("All items for multi-create must be dicts")
15391537
logical_name = table.lower()
15401538
lowered = [self._lowercase_keys(r) for r in records]
1541-
converted = await asyncio.gather(*[self._convert_labels_to_ints(table, r) for r in lowered])
1539+
converted = [await self._convert_labels_to_ints(table, r) for r in lowered]
15421540
enriched = [
15431541
{**r, "@odata.type": f"Microsoft.Dynamics.CRM.{logical_name}"} if "@odata.type" not in r else r
15441542
for r in converted
@@ -1591,7 +1589,7 @@ async def _build_update_multiple_from_records(
15911589
"""
15921590
logical_name = table.lower()
15931591
lowered = [self._lowercase_keys(r) for r in records]
1594-
converted = await asyncio.gather(*[self._convert_labels_to_ints(table, r) for r in lowered])
1592+
converted = [await self._convert_labels_to_ints(table, r) for r in lowered]
15951593
enriched = [
15961594
{**r, "@odata.type": f"Microsoft.Dynamics.CRM.{logical_name}"} if "@odata.type" not in r else r
15971595
for r in converted
@@ -1661,7 +1659,7 @@ async def _build_upsert_multiple(
16611659
)
16621660
logical_name = table.lower()
16631661
lowered_records = [self._lowercase_keys(r) for r in records]
1664-
converted = await asyncio.gather(*[self._convert_labels_to_ints(table, r) for r in lowered_records])
1662+
converted = [await self._convert_labels_to_ints(table, r) for r in lowered_records]
16651663
targets: List[Dict[str, Any]] = []
16661664
for alt_key, record_processed in zip(alternate_keys, converted):
16671665
alt_key_lower = self._lowercase_keys(alt_key)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT license.
33

4+
"""
5+
Async data models and type definitions for the Dataverse SDK.
6+
7+
Provides async-specific models for Dataverse entities:
8+
9+
- :class:`~PowerPlatform.Dataverse.aio.models.async_query_builder.AsyncQueryBuilder`: Async fluent query builder.
10+
- :class:`~PowerPlatform.Dataverse.aio.models.async_fetchxml_query.AsyncFetchXmlQuery`: Async FetchXML query.
11+
"""
12+
413
__all__ = []

tests/unit/aio/core/test_async_http.py

Lines changed: 103 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,34 @@
99
from PowerPlatform.Dataverse.aio.core._async_http import _AsyncHttpClient
1010

1111

12-
def _make_session(status: int = 200) -> MagicMock:
13-
"""Return a mock aiohttp.ClientSession whose request() returns a buffered response."""
14-
session = MagicMock(spec=aiohttp.ClientSession)
15-
resp = AsyncMock()
12+
def _make_resp(status: int = 200) -> MagicMock:
13+
"""Return a mock aiohttp.ClientResponse."""
14+
resp = MagicMock()
1615
resp.status = status
1716
resp.headers = {}
1817
resp.read = AsyncMock(return_value=b"")
1918
resp.text = AsyncMock(return_value="")
20-
session.request = AsyncMock(return_value=resp)
19+
return resp
20+
21+
22+
def _make_cm(resp=None, exc=None) -> MagicMock:
23+
"""Return an async context manager mock.
24+
25+
If exc is given, __aenter__ raises it. Otherwise it returns resp.
26+
"""
27+
cm = MagicMock()
28+
if exc is not None:
29+
cm.__aenter__ = AsyncMock(side_effect=exc)
30+
else:
31+
cm.__aenter__ = AsyncMock(return_value=resp)
32+
cm.__aexit__ = AsyncMock(return_value=False)
33+
return cm
34+
35+
36+
def _make_session(status: int = 200) -> MagicMock:
37+
"""Return a mock aiohttp.ClientSession whose request() is an async context manager."""
38+
session = MagicMock(spec=aiohttp.ClientSession)
39+
session.request = MagicMock(return_value=_make_cm(_make_resp(status)))
2140
return session
2241

2342

@@ -99,13 +118,11 @@ class TestAsyncHttpClientRetry:
99118
async def test_retries_on_client_error_and_succeeds(self):
100119
"""Retries after a ClientError and returns response on second attempt."""
101120
session = MagicMock(spec=aiohttp.ClientSession)
102-
good_resp = AsyncMock()
103-
good_resp.status = 200
104-
good_resp.headers = {}
105-
good_resp.read = AsyncMock(return_value=b"")
106-
good_resp.text = AsyncMock(return_value="")
107-
108-
session.request = AsyncMock(side_effect=[aiohttp.ClientConnectionError("timeout"), good_resp])
121+
good_resp = _make_resp(200)
122+
session.request = MagicMock(side_effect=[
123+
_make_cm(exc=aiohttp.ClientConnectionError("timeout")),
124+
_make_cm(good_resp),
125+
])
109126
client = _AsyncHttpClient(retries=2, backoff=0, session=session)
110127
with patch("asyncio.sleep", new_callable=AsyncMock):
111128
result = await client._request("get", "https://example.com/data")
@@ -116,7 +133,9 @@ async def test_retries_on_client_error_and_succeeds(self):
116133
async def test_raises_after_all_retries_exhausted(self):
117134
"""Raises ClientError after all retry attempts fail."""
118135
session = MagicMock(spec=aiohttp.ClientSession)
119-
session.request = AsyncMock(side_effect=aiohttp.ClientConnectionError("timeout"))
136+
session.request = MagicMock(
137+
return_value=_make_cm(exc=aiohttp.ClientConnectionError("timeout"))
138+
)
120139
client = _AsyncHttpClient(retries=3, backoff=0, session=session)
121140
with patch("asyncio.sleep", new_callable=AsyncMock):
122141
with pytest.raises(aiohttp.ClientError):
@@ -125,19 +144,12 @@ async def test_raises_after_all_retries_exhausted(self):
125144
async def test_backoff_delay_between_retries(self):
126145
"""Sleeps with exponential backoff between retry attempts."""
127146
session = MagicMock(spec=aiohttp.ClientSession)
128-
good_resp = AsyncMock()
129-
good_resp.status = 200
130-
good_resp.headers = {}
131-
good_resp.read = AsyncMock(return_value=b"")
132-
good_resp.text = AsyncMock(return_value="")
133-
134-
session.request = AsyncMock(
135-
side_effect=[
136-
aiohttp.ClientConnectionError(),
137-
aiohttp.ClientConnectionError(),
138-
good_resp,
139-
]
140-
)
147+
good_resp = _make_resp(200)
148+
session.request = MagicMock(side_effect=[
149+
_make_cm(exc=aiohttp.ClientConnectionError()),
150+
_make_cm(exc=aiohttp.ClientConnectionError()),
151+
_make_cm(good_resp),
152+
])
141153
client = _AsyncHttpClient(retries=3, backoff=1.0, session=session)
142154
with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
143155
await client._request("get", "https://example.com/data")
@@ -151,6 +163,22 @@ async def test_no_retry_on_success(self):
151163
await client._request("get", "https://example.com/data")
152164
assert session.request.call_count == 1
153165

166+
async def test_retries_on_timeout_error(self):
167+
"""Retries on asyncio.TimeoutError (not a subclass of aiohttp.ClientError)."""
168+
import asyncio
169+
session = MagicMock(spec=aiohttp.ClientSession)
170+
good_resp = _make_resp(200)
171+
session.request = MagicMock(side_effect=[
172+
_make_cm(exc=asyncio.TimeoutError()),
173+
_make_cm(good_resp),
174+
])
175+
client = _AsyncHttpClient(retries=2, backoff=0, session=session)
176+
with patch("asyncio.sleep", new_callable=AsyncMock):
177+
result = await client._request("get", "https://example.com/data")
178+
179+
assert session.request.call_count == 2
180+
assert result is good_resp
181+
154182

155183
class TestAsyncHttpClientClose:
156184
"""Tests for _AsyncHttpClient.close()."""
@@ -194,15 +222,58 @@ async def test_response_logged_when_logger_set(self):
194222
async def test_error_logged_on_retry(self):
195223
"""Transport errors are logged before each retry."""
196224
session = MagicMock(spec=aiohttp.ClientSession)
197-
good_resp = AsyncMock()
198-
good_resp.status = 200
199-
good_resp.headers = {}
200-
good_resp.read = AsyncMock(return_value=b"")
201-
good_resp.text = AsyncMock(return_value="")
202-
session.request = AsyncMock(side_effect=[aiohttp.ClientConnectionError(), good_resp])
225+
good_resp = _make_resp(200)
226+
session.request = MagicMock(side_effect=[
227+
_make_cm(exc=aiohttp.ClientConnectionError()),
228+
_make_cm(good_resp),
229+
])
203230
mock_logger = MagicMock()
204231
mock_logger.body_logging_enabled = False
205232
client = _AsyncHttpClient(retries=2, backoff=0, session=session, logger=mock_logger)
206233
with patch("asyncio.sleep", new_callable=AsyncMock):
207234
await client._request("get", "https://example.com/data")
208235
mock_logger.log_error.assert_called_once()
236+
237+
async def test_request_body_logged_from_json_kwarg(self):
238+
"""json= kwarg body is extracted and passed to log_request."""
239+
session = _make_session()
240+
mock_logger = MagicMock()
241+
mock_logger.body_logging_enabled = False
242+
client = _AsyncHttpClient(retries=1, session=session, logger=mock_logger)
243+
await client._request("post", "https://example.com/data", json={"key": "value"})
244+
_, log_kwargs = mock_logger.log_request.call_args
245+
assert log_kwargs["body"] == {"key": "value"}
246+
247+
async def test_request_body_logged_from_data_kwarg(self):
248+
"""data= kwarg body is extracted when json= is absent."""
249+
session = _make_session()
250+
mock_logger = MagicMock()
251+
mock_logger.body_logging_enabled = False
252+
client = _AsyncHttpClient(retries=1, session=session, logger=mock_logger)
253+
await client._request("post", "https://example.com/data", data=b"raw bytes")
254+
_, log_kwargs = mock_logger.log_request.call_args
255+
assert log_kwargs["body"] == b"raw bytes"
256+
257+
async def test_response_body_decoded_when_body_logging_enabled(self):
258+
"""When body_logging_enabled=True, resp.text() is awaited and passed to log_response."""
259+
session = _make_session()
260+
session.request.return_value.__aenter__.return_value.text = AsyncMock(return_value='{"ok": true}')
261+
mock_logger = MagicMock()
262+
mock_logger.body_logging_enabled = True
263+
client = _AsyncHttpClient(retries=1, session=session, logger=mock_logger)
264+
await client._request("get", "https://example.com/data")
265+
_, log_kwargs = mock_logger.log_response.call_args
266+
assert log_kwargs["body"] == '{"ok": true}'
267+
268+
async def test_response_body_decode_error_is_swallowed(self):
269+
"""If resp.text() raises, body is None and log_response is still called."""
270+
session = _make_session()
271+
session.request.return_value.__aenter__.return_value.text = AsyncMock(
272+
side_effect=UnicodeDecodeError("utf-8", b"\xff", 0, 1, "invalid start byte")
273+
)
274+
mock_logger = MagicMock()
275+
mock_logger.body_logging_enabled = True
276+
client = _AsyncHttpClient(retries=1, session=session, logger=mock_logger)
277+
await client._request("get", "https://example.com/data")
278+
_, log_kwargs = mock_logger.log_response.call_args
279+
assert log_kwargs["body"] is None

tests/unit/aio/data/test_async_batch_internal.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
"""Unit tests for _AsyncBatchClient internals."""
55

6-
import json
76
from unittest.mock import AsyncMock, MagicMock, patch
87

98
import pytest

0 commit comments

Comments
 (0)