Skip to content

Commit 62c0ab6

Browse files
author
Abel Milash
committed
Add max_workers concurrency support to bulk create/update/upsert
1 parent d488eef commit 62c0ab6

6 files changed

Lines changed: 302 additions & 106 deletions

File tree

.claude/skills/dataverse-sdk-use/SKILL.md

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Use the PowerPlatform Dataverse Client Python SDK to interact with Microsoft Dat
2525
- `client.batch` -- batch multiple operations into a single HTTP request
2626

2727
### Bulk Operations
28-
The SDK supports Dataverse's native bulk operations: Pass lists to `create()`, `update()`, or `upsert()` for automatic bulk processing; for `delete()`, set `use_bulk_delete=True`. Lists exceeding 1,000 records are automatically split into 1,000-record chunks — no manual pre-splitting needed. By default chunks are dispatched sequentially; pass `max_workers=N` (recommended: 3–4) to dispatch chunks concurrently via threads. Operations across chunks are **not atomic**: a failure mid-way may leave earlier chunks applied. Callers that require atomicity should limit their input to ≤ 1,000 records.
28+
The SDK supports Dataverse's native bulk operations: Pass lists to `create()`, `update()`, or `upsert()` for automatic bulk processing; for `delete()`, pass a list and set `use_bulk_delete=True` to use bulk operation. Lists exceeding 1,000 records are automatically split into 1,000-record chunks — no manual pre-splitting needed. By default chunks are dispatched sequentially; pass `max_workers=N` (max: 3, default: 1) to dispatch chunks concurrently via threads. Operations across chunks are **not atomic**: a failure mid-way may leave earlier chunks applied. Callers that require atomicity should limit their input to ≤ 1,000 records.
2929

3030
### Paging
3131
- Control page size with `page_size` parameter
@@ -465,12 +465,11 @@ except ValidationError as e:
465465
### Performance Optimization
466466

467467
1. **Use bulk operations** - Pass lists to create/update/delete for automatic optimization
468-
2. **Use `max_workers`** - Pass `max_workers=3` (or 4) to dispatch 1,000-record chunks concurrently; safe for large datasets where throughput matters more than strict sequential ordering. Dataverse throttles concurrent requests server-side, so values above 4 rarely help and may trigger 429 rate-limiting
469-
3. **Specify select fields** - Limit returned columns to reduce payload size
470-
4. **Control page size** - Use `top` and `page_size` parameters appropriately
471-
5. **Reuse client instances** - Don't create new clients for each operation
472-
6. **Use production credentials** - ClientSecretCredential or CertificateCredential for unattended operations
473-
7. **Error handling** - Implement retry logic for transient errors (`e.is_transient`)
468+
2. **Specify select fields** - Limit returned columns to reduce payload size
469+
3. **Control page size** - Use `top` and `page_size` parameters appropriately
470+
4. **Reuse client instances** - Don't create new clients for each operation
471+
5. **Use production credentials** - ClientSecretCredential or CertificateCredential for unattended operations
472+
6. **Error handling** - Implement retry logic for transient errors (`e.is_transient`)
474473
7. **Always include customization prefix** for custom tables/columns
475474
8. **Use lowercase for column names, match `$metadata` for navigation properties** - Column names in `$select`/`$filter`/record payloads use lowercase LogicalNames. Navigation properties in `$expand` and `@odata.bind` keys are case-sensitive and must match the entity's `$metadata` (PascalCase for custom lookups like `new_CustomerId`, lowercase for system lookups like `parentaccountid`)
476475
9. **Test in non-production environments** first

README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,14 @@ client.records.update("account", ids, {"industry": "Technology"})
185185
# Bulk delete
186186
client.records.delete("account", ids, use_bulk_delete=True)
187187

188-
# Concurrent chunk dispatch — max_workers sends chunks in parallel via threads
189-
# Recommended for large datasets when latency matters more than strict ordering
188+
# Concurrent chunk dispatch — max_workers sends chunks in parallel via threads (max: 3, default: 1)
190189
ids = client.records.create("account", payloads, max_workers=3)
191190
client.records.update("account", ids, {"industry": "Technology"}, max_workers=3)
192191
```
193192

194193
> **Large batches**: Lists exceeding 1,000 records are automatically split into 1,000-record
195194
> chunks — no manual pre-splitting needed. By default chunks are dispatched sequentially.
196-
> Pass `max_workers=N` (recommended: 3–4) to send chunks concurrently via threads — useful
195+
> Pass `max_workers=N` (max: 3, default: 1) to send chunks concurrently via threads — useful
197196
> when throughput matters more than strict sequential ordering. Note that chunked operations
198197
> are **not atomic**: a failure mid-way may leave earlier chunks applied. Callers that
199198
> require atomicity should limit their input to ≤ 1,000 records.

src/PowerPlatform/Dataverse/claude_skill/dataverse-sdk-use/SKILL.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Use the PowerPlatform Dataverse Client Python SDK to interact with Microsoft Dat
2525
- `client.batch` -- batch multiple operations into a single HTTP request
2626

2727
### Bulk Operations
28-
The SDK supports Dataverse's native bulk operations: Pass lists to `create()`, `update()`, or `upsert()` for automatic bulk processing; for `delete()`, set `use_bulk_delete=True`. Lists exceeding 1,000 records are automatically split into 1,000-record chunks — no manual pre-splitting needed. By default chunks are dispatched sequentially; pass `max_workers=N` (recommended: 3–4) to dispatch chunks concurrently via threads. Operations across chunks are **not atomic**: a failure mid-way may leave earlier chunks applied. Callers that require atomicity should limit their input to ≤ 1,000 records.
28+
The SDK supports Dataverse's native bulk operations: Pass lists to `create()`, `update()`, or `upsert()` for automatic bulk processing; for `delete()`, pass a list and set `use_bulk_delete=True` to use bulk operation. Lists exceeding 1,000 records are automatically split into 1,000-record chunks — no manual pre-splitting needed. By default chunks are dispatched sequentially; pass `max_workers=N` (max: 3, default: 1) to dispatch chunks concurrently via threads. Operations across chunks are **not atomic**: a failure mid-way may leave earlier chunks applied. Callers that require atomicity should limit their input to ≤ 1,000 records.
2929

3030
### Paging
3131
- Control page size with `page_size` parameter

src/PowerPlatform/Dataverse/data/_odata.py

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -57,47 +57,56 @@
5757
_CALL_SCOPE_CORRELATION_ID: ContextVar[Optional[str]] = ContextVar("_CALL_SCOPE_CORRELATION_ID", default=None)
5858
_DEFAULT_EXPECTED_STATUSES: tuple[int, ...] = (200, 201, 202, 204)
5959
_MULTIPLE_BATCH_SIZE = 1000
60-
# Concurrent chunk dispatch: 429 retry settings
61-
_CHUNK_RETRY_LIMIT = 3 # max retries per chunk on 429
60+
# Concurrent chunk dispatch settings
61+
_MAX_WORKERS = 3 # maximum concurrent worker threads; values above this are silently clamped
62+
_CHUNK_RETRY_LIMIT = 3 # max retries per chunk on transient errors
6263
_CHUNK_RETRY_DEFAULT_WAIT = 60 # seconds to wait when Retry-After header is absent
6364
_CHUNK_RETRY_JITTER_MAX = 5 # seconds of random jitter added to Retry-After to desynchronise workers
6465

6566

6667
def _dispatch_chunks(fn: Callable, chunks: List, max_workers: int) -> List:
6768
"""Dispatch ``fn(chunk)`` for each chunk, sequentially or concurrently.
6869
70+
``max_workers`` is silently clamped to ``_MAX_WORKERS`` (3) so callers
71+
that pass a larger value are not penalised with an error.
72+
6973
When ``max_workers == 1`` or there is only one chunk, runs sequentially
7074
with no thread overhead. When ``max_workers > 1`` and there are multiple
71-
chunks, submits all chunks to a :class:`~concurrent.futures.ThreadPoolExecutor`
72-
and collects results in submission order (preserving chunk ordering).
75+
chunks, submits all chunks to a :class:`~concurrent.futures.ThreadPoolExecutor`.
76+
Results are collected by iterating the futures list in submission order —
77+
``futures[i].result()`` blocks until chunk *i* finishes, so the returned
78+
list is always in chunk-submission order regardless of thread completion order.
7379
74-
On HTTP 429 (rate limit) each worker retries up to ``_CHUNK_RETRY_LIMIT``
75-
times, sleeping for the ``Retry-After`` duration (falling back to
76-
``_CHUNK_RETRY_DEFAULT_WAIT`` seconds) plus a random jitter of up to
77-
``_CHUNK_RETRY_JITTER_MAX`` seconds to desynchronise concurrent retries.
80+
On transient HTTP errors (429, 502, 503, 504) each worker retries up to
81+
``_CHUNK_RETRY_LIMIT`` times, sleeping for the ``Retry-After`` duration
82+
(falling back to ``_CHUNK_RETRY_DEFAULT_WAIT`` seconds) plus a random jitter
83+
of up to ``_CHUNK_RETRY_JITTER_MAX`` seconds to desynchronise concurrent
84+
retries. The sequential path applies the same retry logic.
7885
7986
:param fn: Callable that accepts a single chunk and returns a result.
8087
:param chunks: List of chunks to process.
81-
:param max_workers: Maximum number of concurrent worker threads.
88+
:param max_workers: Maximum number of concurrent worker threads (clamped to ``_MAX_WORKERS``).
8289
:return: List of results in chunk submission order.
8390
"""
84-
if max_workers == 1 or len(chunks) <= 1:
85-
return [fn(chunk) for chunk in chunks]
91+
max_workers = min(max_workers, _MAX_WORKERS)
8692

87-
def _with_backoff(chunk):
93+
def _execute_with_retry(chunk):
8894
for attempt in range(_CHUNK_RETRY_LIMIT + 1):
8995
try:
9096
return fn(chunk)
9197
except HttpError as exc:
92-
if exc.status_code == 429 and attempt < _CHUNK_RETRY_LIMIT:
93-
wait = (exc.details.get("retry_after") or _CHUNK_RETRY_DEFAULT_WAIT)
98+
if exc.is_transient and attempt < _CHUNK_RETRY_LIMIT:
99+
wait = float(exc.details.get("retry_after") or _CHUNK_RETRY_DEFAULT_WAIT)
94100
wait += random.uniform(0, _CHUNK_RETRY_JITTER_MAX)
95101
time.sleep(wait)
96102
else:
97103
raise
98104

105+
if max_workers == 1 or len(chunks) <= 1:
106+
return [_execute_with_retry(chunk) for chunk in chunks]
107+
99108
with ThreadPoolExecutor(max_workers=max_workers) as pool:
100-
futures = [pool.submit(_with_backoff, chunk) for chunk in chunks]
109+
futures = [pool.submit(_execute_with_retry, chunk) for chunk in chunks]
101110
return [f.result() for f in futures]
102111

103112

@@ -248,7 +257,7 @@ def __init__(
248257
self._logical_primaryid_cache: dict[str, str] = {}
249258
self._picklist_label_cache: dict[str, dict] = {}
250259
self._picklist_cache_ttl_seconds = 3600 # 1 hour TTL
251-
self._picklist_cache_lock = threading.Lock() # serialises cold-start fetches under concurrent workers
260+
self._picklist_cache_lock = threading.Lock() # prevents concurrent threads from making duplicate picklist metadata fetches on cold start
252261

253262
@contextmanager
254263
def _call_scope(self):
@@ -431,23 +440,21 @@ def _create_multiple(
431440
``1`` (default) dispatches sequentially.
432441
:type max_workers: ``int``
433442
434-
:return: List of created record GUIDs in chunk-submission order
435-
(may be empty if response lacks IDs).
443+
:return: List of created record GUIDs (may be empty if response lacks IDs).
436444
:rtype: ``list[str]``
437445
438446
.. note::
439447
Logical type stamping: if any payload omits ``@odata.type`` the client injects ``Microsoft.Dynamics.CRM.<table_logical_name>``. If all payloads already include ``@odata.type`` no modification occurs.
440448
441449
.. warning::
442-
When input exceeds ``_MULTIPLE_BATCH_SIZE`` records, the operation is
443-
split into multiple requests and is **not atomic**. If a later batch
444-
fails, earlier batches are already committed. Callers that require
445-
atomicity should limit input to ``<= _MULTIPLE_BATCH_SIZE`` records.
450+
When input exceeds ``_MULTIPLE_BATCH_SIZE`` records, the operation is split into multiple requests
451+
and is **not atomic**. If a later batch fails, earlier batches are already committed. Callers
452+
that require atomicity should limit input to ``<= _MULTIPLE_BATCH_SIZE`` records.
446453
"""
447454
if not all(isinstance(r, dict) for r in records):
448455
raise TypeError("All items for multi-create must be dicts")
449456

450-
def _send(chunk: List[Dict[str, Any]]) -> List[str]:
457+
def _execute_chunk(chunk: List[Dict[str, Any]]) -> List[str]:
451458
r = self._execute_raw(self._build_create_multiple(entity_set, table_schema_name, chunk))
452459
try:
453460
body = r.json() if r.text else {}
@@ -463,6 +470,7 @@ def _send(chunk: List[Dict[str, Any]]) -> List[str]:
463470
out: List[str] = []
464471
for item in value:
465472
if isinstance(item, dict):
473+
# Heuristic: look for a property ending with 'id'
466474
for k, v in item.items():
467475
if isinstance(k, str) and k.lower().endswith("id") and isinstance(v, str) and len(v) >= 32:
468476
out.append(v)
@@ -471,7 +479,7 @@ def _send(chunk: List[Dict[str, Any]]) -> List[str]:
471479
return []
472480

473481
chunks = [records[i : i + _MULTIPLE_BATCH_SIZE] for i in range(0, len(records), _MULTIPLE_BATCH_SIZE)]
474-
results = _dispatch_chunks(_send, chunks, max_workers)
482+
results = _dispatch_chunks(_execute_chunk, chunks, max_workers)
475483
return [guid for batch_ids in results for guid in batch_ids]
476484

477485
def _build_alternate_key_str(self, alternate_key: Dict[str, Any]) -> str:
@@ -596,11 +604,11 @@ def _upsert_multiple(
596604

597605
url = f"{self.api}/{entity_set}/Microsoft.Dynamics.CRM.UpsertMultiple"
598606

599-
def _send(chunk):
607+
def _execute_chunk(chunk):
600608
self._request("post", url, json={"Targets": chunk}, expected=(200, 201, 204))
601609

602610
chunks = [targets[i : i + _MULTIPLE_BATCH_SIZE] for i in range(0, len(targets), _MULTIPLE_BATCH_SIZE)]
603-
_dispatch_chunks(_send, chunks, max_workers)
611+
_dispatch_chunks(_execute_chunk, chunks, max_workers)
604612

605613
# --- Derived helpers for high-level client ergonomics ---
606614
def _primary_id_attr(self, table_schema_name: str) -> str:
@@ -757,11 +765,11 @@ def _update_multiple(
757765
if not isinstance(records, list) or not records or not all(isinstance(r, dict) for r in records):
758766
raise TypeError("records must be a non-empty list[dict]")
759767

760-
def _send(chunk):
768+
def _execute_chunk(chunk):
761769
self._execute_raw(self._build_update_multiple_from_records(entity_set, table_schema_name, chunk))
762770

763771
chunks = [records[i : i + _MULTIPLE_BATCH_SIZE] for i in range(0, len(records), _MULTIPLE_BATCH_SIZE)]
764-
_dispatch_chunks(_send, chunks, max_workers)
772+
_dispatch_chunks(_execute_chunk, chunks, max_workers)
765773
return None
766774

767775
def _delete(self, table_schema_name: str, key: str) -> None:

src/PowerPlatform/Dataverse/operations/records.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def create(
100100
print(f"Created {len(guids)} accounts")
101101
"""
102102
if not isinstance(max_workers, int) or max_workers < 1:
103-
raise ValueError("max_workers must be a positive integer")
103+
raise ValueError("max_workers must be a positive integer (1–3; values above 3 are clamped to 3)")
104104
with self._client._scoped_odata() as od:
105105
entity_set = od._entity_set_from_schema_name(table)
106106
if isinstance(data, dict):
@@ -171,7 +171,7 @@ def update(
171171
)
172172
"""
173173
if not isinstance(max_workers, int) or max_workers < 1:
174-
raise ValueError("max_workers must be a positive integer")
174+
raise ValueError("max_workers must be a positive integer (1–3; values above 3 are clamped to 3)")
175175
with self._client._scoped_odata() as od:
176176
if isinstance(ids, str):
177177
if not isinstance(changes, dict):
@@ -566,7 +566,7 @@ def upsert(self, table: str, items: List[Union[UpsertItem, Dict[str, Any]]], *,
566566
``{"accountnumber": "ACC-001", "address1_postalcode": "98052"}``.
567567
"""
568568
if not isinstance(max_workers, int) or max_workers < 1:
569-
raise ValueError("max_workers must be a positive integer")
569+
raise ValueError("max_workers must be a positive integer (1–3; values above 3 are clamped to 3)")
570570
if not isinstance(items, list) or not items:
571571
raise TypeError("items must be a non-empty list of UpsertItem or dicts")
572572
normalized: List[UpsertItem] = []

0 commit comments

Comments
 (0)