Skip to content

Overhaul publish_dataset extension#9217

Open
crusaderky wants to merge 5 commits intodask:mainfrom
crusaderky:publish_dataset
Open

Overhaul publish_dataset extension#9217
crusaderky wants to merge 5 commits intodask:mainfrom
crusaderky:publish_dataset

Conversation

@crusaderky
Copy link
Copy Markdown
Collaborator

@crusaderky crusaderky commented Apr 1, 2026

This PR thoroughly reviews the publish_dataset API:

Bugfixes

  • Fixed bug where publish_dataset(..., override=True) would cause any keys from the original dataset to become immortal unless they were also present in the new dataset
  • Fixed race condition where the user calls get_dataset() and immediately afterwards unpublish_dataset(), thinking that they are holding a reference to the futures locally, but the scheduler hasn't noted it by the time unpublish_dataset lands on the scheduler. This caused the client holding a reference to forgotten keys. This is symmetrical to what Fix race condition for published futures with annotations #8577 fixed for publish_dataset.
  • Fixed memory leak, introduced by Fix race condition for published futures with annotations #8577, where each call to publish_dataset would create an immortal asyncio.Event instance on the scheduler.

New features

  • Added syntax publish_dataset({name: value, ...}). Note that this is the only way one can publish multiple datasets with non-string names at once.
  • get_dataset and unpublish_dataset can now get/delete multiple datasets at once.

Performance improvements

  • publish_dataset was previously performing 2 RPC calls per dataset, had a latency of 2x RTT, and was typically opening additional TCP/IP connections equal to the number of datasets beyond the first. Changed to a single RPC call and 1x RTT regardless of the number of datasets being published.

This, together with the new features, should offer a significant speedup for users that were previously publishing/getting/retrieving many datasets at once (and, for all cases other than publishing string-named datasets, were forced to use a tight for loop of RPC calls). This is particularly significant when the client-scheduler comms suffers from high latency, e.g. in Coiled.

Minor tweaks

  • The override flag was undocumented. Added documentation.
  • Support for names other than strings was undocumented. Added documentation.
  • Clarified documentation of publish_dataset in regards to keys that have not been persisted. Added test coverage for the use case.
  • Syntax for publishing a tuple of collections as a single name publish_dataset(x, y, name="foo") was untested. Added test coverage.
  • Cleaned up test suite
  • Achieved 100% test coverage
  • Added type annotations

@crusaderky crusaderky requested a review from fjetter as a code owner April 1, 2026 11:44
Comment on lines -48 to -51
if not override and name in self.datasets:
raise KeyError("Dataset %s already exists" % name)
self.scheduler.client_desires_keys(keys, f"published-{stringify(name)}")
self.datasets[name] = {"data": data, "keys": keys}
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed bug where publish_dataset(..., override=True) would cause any keys from the original dataset to become immortal unless they were also present in the new dataset

out = self.datasets.pop(name, {"keys": []})
self.scheduler.client_releases_keys(out["keys"], f"published-{stringify(name)}")
async def delete(self, names: tuple[Key, ...], uid: bytes) -> None:
await self._sync_batched_send(uid)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed race condition where the user calls get_dataset() and immediately afterwards unpublish_dataset(), thinking that they are holding a reference to the futures locally, but the scheduler hasn't noted it by the time unpublish_dataset lands on the scheduler. This caused the client holding a reference to forgotten keys. This is symmetrical to what #8577 fixed for publish_dataset.

try:
await self._flush_received[uid].wait()
finally:
del self._flush_received[uid]
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed memory leak where each call to publish_dataset would create an immortal asyncio.Event instance on the scheduler.

Note that there is still a potential memory leak left here, where the client flushes the batched comms, but then disconnects before the RPC call can be executed.
I tried fixing this use case but gave up, as I ended up with code that was both severely over-engineered and fragile to race conditions. Namely, one must be thoughtful when testing scheduler.clients, because the register-client endpoint is neither an async RPC nor a batched comm, and I found myself in cases where the batched comms had arrived but the client hadn't been registered yet.

Comment on lines +5663 to +5665
warnings.warn(
f"Client {client!r} desires key {k!r} but key is unknown."
)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This came up many times during this exercise.


def publish_dataset(
self, *args: Any, name: Key | None = None, override: bool = False, **kwargs
):
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: offering four different syntaxes to achieve the same thing is in clear violation of the Zen of Python:

There should be one-- and preferably only one --obvious way to do it.

Removing some of these syntaxes however would be a breaking change and is beyond the scope of this PR.

def publish_dataset(self, *args, **kwargs):
@overload
def publish_dataset(
self, *args: Mapping[Key, Any], override: bool = False, **kwargs
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frustratingly, mypy complains if you write

    @overload
    def publish_dataset(
        self, arg: Mapping[Key, Any], /, *, override: bool = False, **kwargs
    ): ...

which would be more correct.

Comment on lines -3011 to -3012
kwargs : dict
additional keyword arguments to _get_dataset
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kwargs actually contain the situationally useful asynchronous=False to be passed to a synchronous client.
However their lack of documentation is endemic and is out of scope for this PR.
This change simply aligns the documentation of this method to all other client methods.

Comment on lines -119 to -122
x = delayed(inc)(1)
y = delayed(inc)(2)
await c.publish_dataset(x=1, y=2)
datasets = await c.scheduler.publish_list()
assert datasets == ("x", "y")

await c.publish_dataset(x=x, y=y)
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was misleading: these datasets are not persisted so they are no different than just plain python objects.



@gen_cluster(client=True, worker_kwargs={"resources": {"A": 1}})
async def test_publish_submit_ordering(c, s, a, b):
Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test from #8577. Now folded into test_publish_unpublish_wait_for_batched_comms.
I found this test to be quite confusing, as it mentions annotations and resources, but the race condition has nothing to do with them.

async def test_publish_submit_ordering(c, s, a, b):
RESOURCES = {"A": 1}
@gen_cluster(client=True)
async def test_publish_unpublish_wait_for_batched_comms(c, s, a, b):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting out

await self._sync_batched_send(uid)

either in PublishExtension.put or in PublishExtension.delete consistently trips this test.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 1, 2026

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    31 files  ±    0      31 suites  ±0   11h 10m 58s ⏱️ + 27m 0s
 4 123 tests +    7   4 015 ✅ +    7    104 💤  -  2  4 ❌ +2 
59 783 runs  +1 543  57 305 ✅ +1 506  2 474 💤 +36  4 ❌ +1 

For more details on these failures, see this check.

Results for commit 406af40. ± Comparison against base commit 4182308.

This pull request removes 5 and adds 12 tests. Note that renamed tests count towards both.
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---nanny]
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---no-nanny]
distributed.tests.test_publish ‑ test_datasets_republish
distributed.tests.test_publish ‑ test_publish_non_string_key
distributed.tests.test_publish ‑ test_publish_submit_ordering
distributed.tests.test_publish ‑ test_publish_bad_syntax
distributed.tests.test_publish ‑ test_publish_dataset_override
distributed.tests.test_publish ‑ test_publish_dataset_override_partial
distributed.tests.test_publish ‑ test_publish_dataset_override_releases_old_keys
distributed.tests.test_publish ‑ test_publish_multiple_collections_one_name
distributed.tests.test_publish ‑ test_publish_multiple_names
distributed.tests.test_publish ‑ test_publish_non_string_names[8]
distributed.tests.test_publish ‑ test_publish_non_string_names[9.0]
distributed.tests.test_publish ‑ test_publish_non_string_names[name0]
distributed.tests.test_publish ‑ test_publish_unpersisted
…

@crusaderky
Copy link
Copy Markdown
Collaborator Author

All test failures are unrelated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant