Skip to content

Commit c9bdc91

Browse files
Abel Milashclaude
andcommitted
Improve concurrency test coverage, fix docstrings, revert _MAX_WORKERS to 3
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 62c0ab6 commit c9bdc91

3 files changed

Lines changed: 141 additions & 77 deletions

File tree

src/PowerPlatform/Dataverse/data/_odata.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
_DEFAULT_EXPECTED_STATUSES: tuple[int, ...] = (200, 201, 202, 204)
5959
_MULTIPLE_BATCH_SIZE = 1000
6060
# Concurrent chunk dispatch settings
61-
_MAX_WORKERS = 3 # maximum concurrent worker threads; values above this are silently clamped
61+
_MAX_WORKERS = 3 # maximum concurrent worker threads; values above this are silently capped
6262
_CHUNK_RETRY_LIMIT = 3 # max retries per chunk on transient errors
6363
_CHUNK_RETRY_DEFAULT_WAIT = 60 # seconds to wait when Retry-After header is absent
6464
_CHUNK_RETRY_JITTER_MAX = 5 # seconds of random jitter added to Retry-After to desynchronise workers
@@ -67,7 +67,7 @@
6767
def _dispatch_chunks(fn: Callable, chunks: List, max_workers: int) -> List:
6868
"""Dispatch ``fn(chunk)`` for each chunk, sequentially or concurrently.
6969
70-
``max_workers`` is silently clamped to ``_MAX_WORKERS`` (3) so callers
70+
``max_workers`` is silently capped to ``_MAX_WORKERS`` (3) so callers
7171
that pass a larger value are not penalised with an error.
7272
7373
When ``max_workers == 1`` or there is only one chunk, runs sequentially
@@ -85,7 +85,7 @@ def _dispatch_chunks(fn: Callable, chunks: List, max_workers: int) -> List:
8585
8686
:param fn: Callable that accepts a single chunk and returns a result.
8787
:param chunks: List of chunks to process.
88-
:param max_workers: Maximum number of concurrent worker threads (clamped to ``_MAX_WORKERS``).
88+
:param max_workers: Maximum number of concurrent worker threads (capped to ``_MAX_WORKERS``).
8989
:return: List of results in chunk submission order.
9090
"""
9191
max_workers = min(max_workers, _MAX_WORKERS)
@@ -570,6 +570,9 @@ def _upsert_multiple(
570570
:param records: List of record payload dictionaries, one per record.
571571
Must be the same length as ``alternate_keys``.
572572
:type records: ``list[dict[str, Any]]``
573+
:param max_workers: Maximum number of concurrent worker threads for chunk dispatch.
574+
Values above ``_MAX_WORKERS`` are silently capped.
575+
:type max_workers: ``int``
573576
574577
:return: ``None``
575578
:rtype: ``None``
@@ -741,14 +744,18 @@ def _update_multiple(
741744
"""Bulk update existing records via the collection-bound ``UpdateMultiple`` action.
742745
743746
Large record lists are automatically split into chunks of up to
744-
``_MULTIPLE_BATCH_SIZE`` records and dispatched sequentially.
747+
``_MULTIPLE_BATCH_SIZE`` records and dispatched sequentially or concurrently
748+
depending on ``max_workers``.
745749
746750
:param entity_set: Resolved entity set (plural) name.
747751
:type entity_set: ``str``
748752
:param table_schema_name: Schema name of the table, e.g. "new_MyTestTable".
749753
:type table_schema_name: ``str``
750754
:param records: List of patch dictionaries. Each must include the true primary key attribute (e.g. ``accountid``) and one or more fields to update.
751755
:type records: ``list[dict[str, Any]]``
756+
:param max_workers: Maximum number of concurrent worker threads for chunk dispatch.
757+
Values above ``_MAX_WORKERS`` are silently capped.
758+
:type max_workers: ``int``
752759
:return: ``None``
753760
:rtype: ``None``
754761

src/PowerPlatform/Dataverse/operations/records.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def create(
100100
print(f"Created {len(guids)} accounts")
101101
"""
102102
if not isinstance(max_workers, int) or max_workers < 1:
103-
raise ValueError("max_workers must be a positive integer (1–3; values above 3 are clamped to 3)")
103+
raise ValueError("max_workers must be a positive integer (1–3; values above 3 are capped at 3)")
104104
with self._client._scoped_odata() as od:
105105
entity_set = od._entity_set_from_schema_name(table)
106106
if isinstance(data, dict):
@@ -171,7 +171,7 @@ def update(
171171
)
172172
"""
173173
if not isinstance(max_workers, int) or max_workers < 1:
174-
raise ValueError("max_workers must be a positive integer (1–3; values above 3 are clamped to 3)")
174+
raise ValueError("max_workers must be a positive integer (1–3; values above 3 are capped at 3)")
175175
with self._client._scoped_odata() as od:
176176
if isinstance(ids, str):
177177
if not isinstance(changes, dict):
@@ -566,7 +566,7 @@ def upsert(self, table: str, items: List[Union[UpsertItem, Dict[str, Any]]], *,
566566
``{"accountnumber": "ACC-001", "address1_postalcode": "98052"}``.
567567
"""
568568
if not isinstance(max_workers, int) or max_workers < 1:
569-
raise ValueError("max_workers must be a positive integer (1–3; values above 3 are clamped to 3)")
569+
raise ValueError("max_workers must be a positive integer (1–3; values above 3 are capped at 3)")
570570
if not isinstance(items, list) or not items:
571571
raise TypeError("items must be a non-empty list of UpsertItem or dicts")
572572
normalized: List[UpsertItem] = []

0 commit comments

Comments
 (0)