Skip to content

Commit 2d83148

Browse files
Abel Milashclaude
andcommitted
Replace _SyncResponseWrapper with _HttpResponse in async HTTP layer
Adds _HttpResponse to _async_http.py — a materialized response that buffers the body bytes in _request, exposing sync .text and .json() so callers need no await. Eliminates _SyncResponseWrapper from _async_batch.py and removes all remaining await r.json() calls throughout the async layer (_async_odata.py, _async_relationships.py). Also removes aiohttp.ContentTypeError from except clauses now that json() is a plain json.loads call (ValueError covers it). All 2155 tests pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 1f82219 commit 2d83148

8 files changed

Lines changed: 135 additions & 169 deletions

File tree

src/PowerPlatform/Dataverse/aio/core/_async_http.py

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,45 @@
1212
from __future__ import annotations
1313

1414
import asyncio
15+
import json as _json
1516
import time
16-
from typing import TYPE_CHECKING, Any, Optional
17+
from typing import TYPE_CHECKING, Any, Dict, Optional
1718

1819
import aiohttp
1920

2021
if TYPE_CHECKING:
2122
from ...core._http_logger import _HttpLogger
2223

2324

25+
class _HttpResponse:
26+
"""Materialized HTTP response returned by :class:`_AsyncHttpClient._request`.
27+
28+
The body is fully buffered before this object is constructed, so all
29+
accessors are synchronous — no ``await`` required.
30+
31+
:param status: HTTP status code.
32+
:param headers: Response headers as a plain dict.
33+
:param body: Raw response body bytes.
34+
"""
35+
36+
__slots__ = ("status", "status_code", "headers", "_body")
37+
38+
def __init__(self, status: int, headers: Dict[str, str], body: bytes) -> None:
39+
self.status = status
40+
self.status_code = status # alias used by batch response parser
41+
self.headers = headers
42+
self._body = body
43+
44+
@property
45+
def text(self) -> str:
46+
"""Response body decoded as UTF-8 text."""
47+
return self._body.decode("utf-8", errors="replace") if self._body else ""
48+
49+
def json(self, content_type: Any = None) -> Any:
50+
"""Parse and return the response body as JSON."""
51+
return _json.loads(self._body) if self._body else {}
52+
53+
2454
class _AsyncHttpClient:
2555
"""
2656
Async HTTP client with configurable retry logic and timeout handling.
@@ -58,24 +88,24 @@ def __init__(
5888
self._session = session
5989
self._logger = logger
6090

61-
async def _request(self, method: str, url: str, **kwargs: Any) -> aiohttp.ClientResponse:
91+
async def _request(self, method: str, url: str, **kwargs: Any) -> _HttpResponse:
6292
"""
6393
Execute an HTTP request asynchronously with automatic retry logic and timeout management.
6494
6595
Applies default timeouts based on HTTP method (120s for POST/DELETE, 10s for others)
6696
and retries on network errors with exponential backoff.
6797
68-
The response body is fully buffered before returning, so the caller may freely
69-
call ``await resp.text()`` or ``await resp.json()`` without managing the connection lifecycle.
98+
The response body is fully buffered and returned as a :class:`_HttpResponse` whose
99+
accessors (``.text``, ``.json()``) are synchronous — no ``await`` required on the caller side.
70100
71101
:param method: HTTP method (GET, POST, PUT, DELETE, etc.).
72102
:type method: :class:`str`
73103
:param url: Target URL for the request.
74104
:type url: :class:`str`
75105
:param kwargs: Additional arguments passed to ``aiohttp.ClientSession.request()``,
76106
including headers, data, etc.
77-
:return: HTTP response object with body fully buffered.
78-
:rtype: :class:`aiohttp.ClientResponse`
107+
:return: Materialized HTTP response with body fully buffered.
108+
:rtype: :class:`_HttpResponse`
79109
:raises aiohttp.ClientError: If all retry attempts fail.
80110
:raises RuntimeError: If no session has been set.
81111
"""
@@ -113,27 +143,23 @@ async def _request(self, method: str, url: str, **kwargs: Any) -> aiohttp.Client
113143
try:
114144
t0 = time.monotonic()
115145
async with self._session.request(method, url, **kwargs) as resp:
116-
await resp.read()
146+
body = await resp.read()
147+
materialized = _HttpResponse(resp.status, dict(resp.headers), body)
117148
elapsed_ms = (time.monotonic() - t0) * 1000
118149

119150
if self._logger is not None:
120-
# Only decode resp.text when body logging is enabled — avoids
151+
# Only decode text when body logging is enabled — avoids
121152
# unnecessary overhead for large payloads when max_body_bytes == 0.
122-
resp_body = None
123-
if self._logger.body_logging_enabled:
124-
try:
125-
resp_body = await resp.text()
126-
except Exception:
127-
pass
153+
resp_body = materialized.text if self._logger.body_logging_enabled else None
128154
self._logger.log_response(
129155
method,
130156
url,
131-
status_code=resp.status,
132-
headers=dict(resp.headers),
157+
status_code=materialized.status,
158+
headers=materialized.headers,
133159
body=resp_body,
134160
elapsed_ms=elapsed_ms,
135161
)
136-
return resp
162+
return materialized
137163
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
138164
if self._logger is not None:
139165
self._logger.log_error(

src/PowerPlatform/Dataverse/aio/data/_async_batch.py

Lines changed: 3 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from __future__ import annotations
77

88
import uuid
9-
from typing import TYPE_CHECKING, Any, Dict, List, Union
9+
from typing import TYPE_CHECKING, Any, List, Union
1010

1111
from ...core.errors import MetadataError, ValidationError
1212
from ...core._error_codes import METADATA_TABLE_NOT_FOUND, METADATA_COLUMN_NOT_FOUND
@@ -43,32 +43,6 @@
4343
__all__ = []
4444

4545

46-
class _SyncResponseWrapper:
47-
"""Minimal requests-compatible wrapper around a materialized aiohttp response.
48-
49-
Used to feed the sync-only :meth:`_BatchBase._parse_batch_response` helper with
50-
pre-fetched body content, avoiding the need to ``await`` inside sync code.
51-
52-
:param status_code: HTTP status code (e.g. 200, 400).
53-
:type status_code: :class:`int`
54-
:param headers: Response headers as a plain dict.
55-
:type headers: :class:`dict`
56-
:param text: Full response body as a decoded string.
57-
:type text: :class:`str`
58-
:param json_payload: Pre-parsed JSON body (or empty dict if parsing failed).
59-
:type json_payload: ``Any``
60-
"""
61-
62-
def __init__(self, status_code: int, headers: Dict[str, str], text: str, json_payload: Any) -> None:
63-
self.status_code = status_code
64-
self.headers = headers
65-
self.text = text
66-
self._json = json_payload
67-
68-
def json(self) -> Any:
69-
return self._json
70-
71-
7246
# ---------------------------------------------------------------------------
7347
# Batch client: resolves intents → raw requests → multipart body → HTTP → result
7448
# ---------------------------------------------------------------------------
@@ -116,7 +90,7 @@ async def execute(
11690
batch_boundary = f"batch_{uuid.uuid4()}"
11791
body = self._build_batch_body(resolved, batch_boundary)
11892

119-
headers: Dict[str, str] = {
93+
headers: dict[str, str] = {
12094
"Content-Type": f'multipart/mixed; boundary="{batch_boundary}"',
12195
}
12296
if continue_on_error:
@@ -135,22 +109,7 @@ async def execute(
135109
expected=(200, 202, 207, 400),
136110
)
137111

138-
# Materialise the response body so the sync _parse_batch_response can
139-
# access it without awaiting. Body is already buffered (await resp.read()
140-
# in _AsyncHttpClient._request), so these calls are fast and non-blocking.
141-
text = await r.text()
142-
try:
143-
json_payload = await r.json(content_type=None)
144-
except Exception:
145-
json_payload = {}
146-
147-
wrapped = _SyncResponseWrapper(
148-
status_code=r.status,
149-
headers=dict(r.headers),
150-
text=text,
151-
json_payload=json_payload,
152-
)
153-
return self._parse_batch_response(wrapped)
112+
return self._parse_batch_response(r)
154113

155114
# ------------------------------------------------------------------
156115
# Intent resolution dispatcher

src/PowerPlatform/Dataverse/aio/data/_async_odata.py

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616

1717
from urllib.parse import quote as _url_quote
1818

19-
import aiohttp
20-
21-
from ..core._async_http import _AsyncHttpClient
19+
from ..core._async_http import _AsyncHttpClient, _HttpResponse
2220
from ._async_upload import _AsyncFileUploadMixin
2321
from ._async_relationships import _AsyncRelationshipOperationsMixin
2422
from ...core.errors import *
@@ -109,7 +107,7 @@ async def _headers(self) -> Dict[str, str]:
109107
"User-Agent": ua,
110108
}
111109

112-
async def _raw_request(self, method: str, url: str, **kwargs) -> aiohttp.ClientResponse:
110+
async def _raw_request(self, method: str, url: str, **kwargs) -> _HttpResponse:
113111
return await self._http._request(method, url, **kwargs)
114112

115113
async def _request(
@@ -119,7 +117,7 @@ async def _request(
119117
*,
120118
expected: tuple[int, ...] = _DEFAULT_EXPECTED_STATUSES,
121119
**kwargs,
122-
) -> aiohttp.ClientResponse:
120+
) -> _HttpResponse:
123121
# Acquire base headers once (async), then use a sync closure for _RequestContext.build.
124122
# _RequestContext.build is a sync classmethod defined in _ODataBase and shared by both
125123
# sync and async clients — keeping it sync avoids duplicating the header-injection logic.
@@ -145,11 +143,7 @@ def _merge(h: Optional[Dict[str, str]]) -> Dict[str, str]:
145143
return r
146144

147145
response_headers = getattr(r, "headers", {}) or {}
148-
raw_text = ""
149-
try:
150-
raw_text = await r.text()
151-
except Exception:
152-
pass
146+
raw_text = r.text
153147
body_excerpt = raw_text[:200]
154148
svc_code = None
155149
msg = f"HTTP {r.status}"
@@ -207,7 +201,7 @@ async def _execute_raw(
207201
req: _RawRequest,
208202
*,
209203
expected: tuple[int, ...] = _DEFAULT_EXPECTED_STATUSES,
210-
) -> aiohttp.ClientResponse:
204+
) -> _HttpResponse:
211205
"""Execute a ``_RawRequest`` and return the HTTP response.
212206
213207
Encodes the pre-serialised body (if present) as UTF-8 and merges any
@@ -278,8 +272,8 @@ async def _create_multiple(
278272
raise TypeError("All items for multi-create must be dicts")
279273
r = await self._execute_raw(await self._build_create_multiple(entity_set, table_schema_name, records))
280274
try:
281-
body = await r.json(content_type=None)
282-
except (ValueError, aiohttp.ContentTypeError):
275+
body = r.json()
276+
except ValueError:
283277
body = {}
284278
if not isinstance(body, dict):
285279
return []
@@ -468,8 +462,8 @@ async def _delete_multiple(
468462
)
469463
job_id = None
470464
try:
471-
body = await response.json(content_type=None)
472-
except (ValueError, aiohttp.ContentTypeError):
465+
body = response.json()
466+
except ValueError:
473467
body = {}
474468
if isinstance(body, dict):
475469
job_id = body.get("JobId")
@@ -556,7 +550,7 @@ async def _get(
556550
r = await self._execute_raw(
557551
await self._build_get(table_schema_name, key, select=select, expand=expand, include_annotations=include_annotations)
558552
)
559-
return await r.json(content_type=None)
553+
return r.json()
560554

561555
async def _get_multiple(
562556
self,
@@ -609,8 +603,8 @@ async def _do_request(url: str, *, params: Optional[Dict[str, Any]] = None) -> D
609603
headers = extra_headers if extra_headers else None
610604
r = await self._request("get", url, headers=headers, params=params)
611605
try:
612-
return await r.json(content_type=None)
613-
except (ValueError, aiohttp.ContentTypeError):
606+
return r.json()
607+
except ValueError:
614608
return {}
615609

616610
entity_set = await self._entity_set_from_schema_name(table_schema_name)
@@ -682,8 +676,8 @@ async def _query_sql(self, sql: str) -> list[dict[str, Any]]:
682676

683677
r = await self._execute_raw(await self._build_sql(sql))
684678
try:
685-
body = await r.json(content_type=None)
686-
except (ValueError, aiohttp.ContentTypeError):
679+
body = r.json()
680+
except ValueError:
687681
return []
688682

689683
# Collect first page
@@ -745,8 +739,8 @@ async def _query_sql(self, sql: str) -> list[dict[str, Any]]:
745739
)
746740
break
747741
try:
748-
page_body = await page_resp.json(content_type=None)
749-
except (ValueError, aiohttp.ContentTypeError) as exc:
742+
page_body = page_resp.json()
743+
except ValueError as exc:
750744
warnings.warn(
751745
f"SQL pagination stopped after {len(results)} rows — "
752746
f"the next-page response was not valid JSON: {exc}. "
@@ -790,9 +784,9 @@ async def _entity_set_from_schema_name(self, table_schema_name: str) -> str:
790784
}
791785
r = await self._request("get", url, params=params)
792786
try:
793-
body = await r.json(content_type=None)
787+
body = r.json()
794788
items = body.get("value", []) if isinstance(body, dict) else []
795-
except (ValueError, aiohttp.ContentTypeError):
789+
except ValueError:
796790
items = []
797791
if not items:
798792
plural_hint = (
@@ -838,7 +832,7 @@ async def _get_entity_by_table_schema_name(
838832
"$filter": f"LogicalName eq '{logical_escaped}'",
839833
}
840834
r = await self._request("get", url, params=params, headers=headers)
841-
items = (await r.json(content_type=None)).get("value", [])
835+
items = (r.json()).get("value", [])
842836
return items[0] if items else None
843837

844838
async def _create_entity(
@@ -903,8 +897,8 @@ async def _get_attribute_metadata(
903897
}
904898
r = await self._request("get", url, params=params)
905899
try:
906-
body = await r.json(content_type=None)
907-
except (ValueError, aiohttp.ContentTypeError):
900+
body = r.json()
901+
except ValueError:
908902
return None
909903
items = body.get("value") if isinstance(body, dict) else None
910904
if isinstance(items, list) and items:
@@ -955,7 +949,7 @@ async def _list_columns(
955949
if filter:
956950
params["$filter"] = filter
957951
r = await self._request("get", url, params=params)
958-
return (await r.json(content_type=None)).get("value", [])
952+
return (r.json()).get("value", [])
959953

960954
async def _wait_for_attribute_visibility(
961955
self,
@@ -991,7 +985,7 @@ async def _wait_for_attribute_visibility(
991985
f"after {total_wait} seconds (exhausted all retries)."
992986
) from last_error
993987

994-
async def _request_metadata_with_retry(self, method: str, url: str, **kwargs) -> aiohttp.ClientResponse:
988+
async def _request_metadata_with_retry(self, method: str, url: str, **kwargs) -> _HttpResponse:
995989
"""Fetch metadata with retries on transient errors."""
996990
max_attempts = 5
997991
backoff_seconds = 0.4
@@ -1038,7 +1032,7 @@ async def _bulk_fetch_picklists(self, table_schema_name: str) -> None:
10381032
f"?$select=LogicalName&$expand=OptionSet($select=Options)"
10391033
)
10401034
response = await self._request_metadata_with_retry("get", url)
1041-
body = await response.json(content_type=None)
1035+
body = response.json()
10421036
items = body.get("value", []) if isinstance(body, dict) else []
10431037

10441038
picklists: Dict[str, Dict[str, int]] = {}
@@ -1165,7 +1159,7 @@ async def _list_tables(
11651159
:raises HttpError: If the metadata request fails.
11661160
"""
11671161
r = await self._execute_raw(self._build_list_entities(filter=filter, select=select))
1168-
return (await r.json(content_type=None)).get("value", [])
1162+
return (r.json()).get("value", [])
11691163

11701164
async def _delete_table(self, table_schema_name: str) -> None:
11711165
"""Delete a table by schema name.
@@ -1264,7 +1258,7 @@ async def _get_alternate_keys(self, table_schema_name: str) -> List[Dict[str, An
12641258
logical_name = ent.get("LogicalName", table_schema_name.lower())
12651259
url = f"{self.api}/EntityDefinitions(LogicalName='{logical_name}')/Keys"
12661260
r = await self._request("get", url)
1267-
return (await r.json(content_type=None)).get("value", [])
1261+
return (r.json()).get("value", [])
12681262

12691263
async def _delete_alternate_key(self, table_schema_name: str, key_id: str) -> None:
12701264
"""Delete an alternate key by metadata ID.

0 commit comments

Comments
 (0)