Skip to content

Commit 2601bd4

Browse files
Abel Milashclaude
andcommitted
Emit UserWarning when max_workers exceeds cap, revert cap to 3
Replace silent capping in _dispatch_chunks with an explicit UserWarning so callers are informed when their max_workers value is reduced. Revert _MAX_WORKERS to 3. Remove two internal implementation comments from _upsert_multiple. Update tests to assert the warning is emitted. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 704b91c commit 2601bd4

2 files changed

Lines changed: 34 additions & 27 deletions

File tree

src/PowerPlatform/Dataverse/data/_odata.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
_DEFAULT_EXPECTED_STATUSES: tuple[int, ...] = (200, 201, 202, 204)
5959
_MULTIPLE_BATCH_SIZE = 1000
6060
# Concurrent chunk dispatch settings
61-
_MAX_WORKERS = 3 # maximum concurrent worker threads; values above this are silently capped
61+
_MAX_WORKERS = 3 # maximum concurrent worker threads; values above this are capped
6262
_CHUNK_RETRY_LIMIT = 3 # max retries per chunk on transient errors
6363
_CHUNK_RETRY_DEFAULT_WAIT = 60 # seconds to wait when Retry-After header is absent
6464
_CHUNK_RETRY_JITTER_MAX = 5 # seconds of random jitter added to Retry-After to desynchronise workers
@@ -67,8 +67,8 @@
6767
def _dispatch_chunks(fn: Callable, chunks: List, max_workers: int) -> List:
6868
"""Dispatch ``fn(chunk)`` for each chunk, sequentially or concurrently.
6969
70-
``max_workers`` is silently capped to ``_MAX_WORKERS`` (3) so callers
71-
that pass a larger value are not penalised with an error.
70+
If ``max_workers`` exceeds ``_MAX_WORKERS`` (3) a :class:`UserWarning` is
71+
issued and the value is capped.
7272
7373
When ``max_workers == 1`` or there is only one chunk, runs sequentially
7474
with no thread overhead. When ``max_workers > 1`` and there are multiple
@@ -85,10 +85,16 @@ def _dispatch_chunks(fn: Callable, chunks: List, max_workers: int) -> List:
8585
8686
:param fn: Callable that accepts a single chunk and returns a result.
8787
:param chunks: List of chunks to process.
88-
:param max_workers: Maximum number of concurrent worker threads (capped to ``_MAX_WORKERS``).
88+
:param max_workers: Maximum number of concurrent worker threads.
8989
:return: List of results in chunk submission order.
9090
"""
91-
max_workers = min(max_workers, _MAX_WORKERS)
91+
if max_workers > _MAX_WORKERS:
92+
warnings.warn(
93+
f"max_workers={max_workers} exceeds the maximum of {_MAX_WORKERS}; capping to {_MAX_WORKERS}.",
94+
UserWarning,
95+
stacklevel=2,
96+
)
97+
max_workers = _MAX_WORKERS
9298

9399
def _execute_with_retry(chunk):
94100
for attempt in range(_CHUNK_RETRY_LIMIT + 1):
@@ -585,18 +591,11 @@ def _upsert_multiple(
585591
When input exceeds ``_MULTIPLE_BATCH_SIZE`` records, the operation is
586592
split into multiple requests and is **not atomic** across batches.
587593
"""
588-
# Validation uses ValueError (not ValidationError) because this is a
589-
# caller-facing precondition check, not a service error. The batch path
590-
# (_build_upsert_multiple) raises ValidationError for the same conditions
591-
# because batch errors carry structured subcodes.
592594
if len(alternate_keys) != len(records):
593595
raise ValueError(
594596
f"alternate_keys and records must have the same length " f"({len(alternate_keys)} != {len(records)})"
595597
)
596598
logical_name = table_schema_name.lower()
597-
# Pre-process all targets before chunking so that validation (key
598-
# conflicts, label conversion) runs eagerly. This means all records
599-
# are held in memory at once, which is acceptable for typical workloads.
600599
targets: List[Dict[str, Any]] = []
601600
for alt_key, record in zip(alternate_keys, records):
602601
alt_key_lower = self._lowercase_keys(alt_key)
@@ -1386,15 +1385,15 @@ def _bulk_fetch_picklists(self, table_schema_name: str) -> None:
13861385
"""
13871386
table_key = self._normalize_cache_key(table_schema_name)
13881387
now = time.time()
1389-
# Fast path — lock-free read for the warm-cache case (common in sequential and
1388+
# Lock-free read for the warm-cache case (common in sequential and
13901389
# subsequent concurrent calls once the cache is populated).
13911390
table_entry = self._picklist_label_cache.get(table_key)
13921391
if isinstance(table_entry, dict) and (now - table_entry.get("ts", 0)) < self._picklist_cache_ttl_seconds:
13931392
return
13941393

1395-
# Slow path — serialise concurrent cold-start fetches so only one thread
1396-
# makes the metadata HTTP call. Re-check inside the lock (double-checked
1397-
# locking) in case another thread populated the cache while we waited.
1394+
# Serialise concurrent cold-start fetches so only one thread makes the
1395+
# metadata HTTP call. Re-check inside the lock (double-checked locking)
1396+
# in case another thread populated the cache while we waited.
13981397
with self._picklist_cache_lock:
13991398
now = time.time()
14001399
table_entry = self._picklist_label_cache.get(table_key)

tests/unit/data/test_multiple_chunking.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,8 @@ def fn(_):
714714
call_count[0] += 1
715715
return "result"
716716

717-
results = self._dispatch(fn, [["only"]], max_workers=4)
717+
with self.assertWarns(UserWarning):
718+
results = self._dispatch(fn, [["only"]], max_workers=4)
718719
self.assertEqual(results, ["result"])
719720
self.assertEqual(call_count[0], 1)
720721

@@ -809,7 +810,8 @@ def test_max_workers_above_cap_is_capped(self):
809810
"PowerPlatform.Dataverse.data._odata.ThreadPoolExecutor",
810811
wraps=ThreadPoolExecutor,
811812
) as mock_pool:
812-
results = self._dispatch(lambda c: c, chunks, max_workers=_MAX_WORKERS + 100)
813+
with self.assertWarns(UserWarning):
814+
results = self._dispatch(lambda c: c, chunks, max_workers=_MAX_WORKERS + 100)
813815

814816
mock_pool.assert_called_once_with(max_workers=_MAX_WORKERS)
815817
self.assertEqual(results, chunks)
@@ -1088,30 +1090,36 @@ def test_create_true_max_workers_accepted(self):
10881090

10891091

10901092
class TestDispatchChunksCap(unittest.TestCase):
1091-
"""_dispatch_chunks silently caps max_workers to _MAX_WORKERS."""
1093+
"""_dispatch_chunks caps max_workers to _MAX_WORKERS and emits a UserWarning."""
10921094

10931095
def setUp(self):
10941096
from PowerPlatform.Dataverse.data._odata import _dispatch_chunks, _MAX_WORKERS
10951097

10961098
self._dispatch = _dispatch_chunks
10971099
self._cap = _MAX_WORKERS
10981100

1099-
def test_above_cap_is_capped(self):
1100-
"""max_workers above _MAX_WORKERS is silently capped; no error raised."""
1101+
def test_above_cap_emits_warning(self):
1102+
"""max_workers above _MAX_WORKERS emits a UserWarning and still returns results."""
11011103
called = []
11021104

11031105
def fn(chunk):
11041106
called.append(chunk)
11051107
return chunk
11061108

1107-
# 2 chunks with max_workers above cap — should not raise
1108-
result = self._dispatch(fn, ["a", "b"], max_workers=self._cap + 10)
1109+
with self.assertWarns(UserWarning) as cm:
1110+
result = self._dispatch(fn, ["a", "b"], max_workers=self._cap + 10)
1111+
11091112
self.assertEqual(result, ["a", "b"])
11101113
self.assertEqual(called, ["a", "b"])
1111-
1112-
def test_exactly_at_cap_is_accepted(self):
1113-
"""max_workers == _MAX_WORKERS dispatches concurrently without capping."""
1114-
results = self._dispatch(lambda c: c, ["x", "y"], max_workers=self._cap)
1114+
self.assertIn(str(self._cap + 10), str(cm.warning))
1115+
self.assertIn(str(self._cap), str(cm.warning))
1116+
1117+
def test_exactly_at_cap_no_warning(self):
1118+
"""max_workers == _MAX_WORKERS dispatches without capping or warning."""
1119+
import warnings as _warnings
1120+
with _warnings.catch_warnings():
1121+
_warnings.simplefilter("error")
1122+
results = self._dispatch(lambda c: c, ["x", "y"], max_workers=self._cap)
11151123
self.assertEqual(results, ["x", "y"])
11161124

11171125
def test_max_workers_1_is_accepted(self):

0 commit comments

Comments
 (0)