Conversation
| if not override and name in self.datasets: | ||
| raise KeyError("Dataset %s already exists" % name) | ||
| self.scheduler.client_desires_keys(keys, f"published-{stringify(name)}") | ||
| self.datasets[name] = {"data": data, "keys": keys} |
There was a problem hiding this comment.
Fixed bug where publish_dataset(..., override=True) would cause any keys from the original dataset to become immortal unless they were also present in the new dataset
| out = self.datasets.pop(name, {"keys": []}) | ||
| self.scheduler.client_releases_keys(out["keys"], f"published-{stringify(name)}") | ||
| async def delete(self, names: tuple[Key, ...], uid: bytes) -> None: | ||
| await self._sync_batched_send(uid) |
There was a problem hiding this comment.
Fixed race condition where the user calls get_dataset() and immediately afterwards unpublish_dataset(), thinking that they are holding a reference to the futures locally, but the scheduler hasn't noted it by the time unpublish_dataset lands on the scheduler. This caused the client holding a reference to forgotten keys. This is symmetrical to what #8577 fixed for publish_dataset.
| try: | ||
| await self._flush_received[uid].wait() | ||
| finally: | ||
| del self._flush_received[uid] |
There was a problem hiding this comment.
Fixed memory leak where each call to publish_dataset would create an immortal asyncio.Event instance on the scheduler.
Note that there is still a potential memory leak left here, where the client flushes the batched comms, but then disconnects before the RPC call can be executed.
I tried fixing this use case but gave up, as I ended up with code that was both severely over-engineered and fragile to race conditions. Namely, one must be thoughtful when testing scheduler.clients, because the register-client endpoint is neither an async RPC nor a batched comm, and I found myself in cases where the batched comms had arrived but the client hadn't been registered yet.
| warnings.warn( | ||
| f"Client {client!r} desires key {k!r} but key is unknown." | ||
| ) |
There was a problem hiding this comment.
This came up many times during this exercise.
|
|
||
| def publish_dataset( | ||
| self, *args: Any, name: Key | None = None, override: bool = False, **kwargs | ||
| ): |
There was a problem hiding this comment.
Aside: offering four different syntaxes to achieve the same thing is in clear violation of the Zen of Python:
There should be one-- and preferably only one --obvious way to do it.
Removing some of these syntaxes however would be a breaking change and is beyond the scope of this PR.
| def publish_dataset(self, *args, **kwargs): | ||
| @overload | ||
| def publish_dataset( | ||
| self, *args: Mapping[Key, Any], override: bool = False, **kwargs |
There was a problem hiding this comment.
Frustratingly, mypy complains if you write
@overload
def publish_dataset(
self, arg: Mapping[Key, Any], /, *, override: bool = False, **kwargs
): ...which would be more correct.
| kwargs : dict | ||
| additional keyword arguments to _get_dataset |
There was a problem hiding this comment.
kwargs actually contain the situationally useful asynchronous=False to be passed to a synchronous client.
However their lack of documentation is endemic and is out of scope for this PR.
This change simply aligns the documentation of this method to all other client methods.
| x = delayed(inc)(1) | ||
| y = delayed(inc)(2) | ||
| await c.publish_dataset(x=1, y=2) | ||
| datasets = await c.scheduler.publish_list() | ||
| assert datasets == ("x", "y") | ||
|
|
||
| await c.publish_dataset(x=x, y=y) |
There was a problem hiding this comment.
This was misleading: these datasets are not persisted so they are no different than just plain python objects.
|
|
||
|
|
||
| @gen_cluster(client=True, worker_kwargs={"resources": {"A": 1}}) | ||
| async def test_publish_submit_ordering(c, s, a, b): |
There was a problem hiding this comment.
Test from #8577. Now folded into test_publish_unpublish_wait_for_batched_comms.
I found this test to be quite confusing, as it mentions annotations and resources, but the race condition has nothing to do with them.
| async def test_publish_submit_ordering(c, s, a, b): | ||
| RESOURCES = {"A": 1} | ||
| @gen_cluster(client=True) | ||
| async def test_publish_unpublish_wait_for_batched_comms(c, s, a, b): |
There was a problem hiding this comment.
Commenting out
await self._sync_batched_send(uid)either in PublishExtension.put or in PublishExtension.delete consistently trips this test.
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 31 files ± 0 31 suites ±0 11h 10m 58s ⏱️ + 27m 0s For more details on these failures, see this check. Results for commit 406af40. ± Comparison against base commit 4182308. This pull request removes 5 and adds 12 tests. Note that renamed tests count towards both. |
|
All test failures are unrelated. |
This PR thoroughly reviews the
publish_datasetAPI:Bugfixes
publish_dataset(..., override=True)would cause any keys from the original dataset to become immortal unless they were also present in the new datasetget_dataset()and immediately afterwardsunpublish_dataset(), thinking that they are holding a reference to the futures locally, but the scheduler hasn't noted it by the timeunpublish_datasetlands on the scheduler. This caused the client holding a reference to forgotten keys. This is symmetrical to what Fix race condition for published futures with annotations #8577 fixed forpublish_dataset.publish_datasetwould create an immortalasyncio.Eventinstance on the scheduler.New features
publish_dataset({name: value, ...}). Note that this is the only way one can publish multiple datasets with non-string names at once.get_datasetandunpublish_datasetcan now get/delete multiple datasets at once.Performance improvements
publish_datasetwas previously performing 2 RPC calls per dataset, had a latency of 2x RTT, and was typically opening additional TCP/IP connections equal to the number of datasets beyond the first. Changed to a single RPC call and 1x RTT regardless of the number of datasets being published.This, together with the new features, should offer a significant speedup for users that were previously publishing/getting/retrieving many datasets at once (and, for all cases other than publishing string-named datasets, were forced to use a tight for loop of RPC calls). This is particularly significant when the client-scheduler comms suffers from high latency, e.g. in Coiled.
Minor tweaks
overrideflag was undocumented. Added documentation.publish_datasetin regards to keys that have not been persisted. Added test coverage for the use case.publish_dataset(x, y, name="foo")was untested. Added test coverage.