Skip to content

Conversation

@evanh
Copy link
Member

@evanh evanh commented Jan 16, 2026

Have the postgres adapter only fetch and do upkeep on activations that are part of the partition that the consumer is assigned. The broker can still update tasks outside its partitions, in case a worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions to the store whenever partitions are assigned.

evanh added 16 commits January 13, 2026 16:39
This adds a postgres storage adapter for the taskbroker, as well as providing a way to choose
between the adapters in the configuration. This adapter will also work with AlloyDB.

In postgres, the keyword `offset` is reserved, so that column is called `kafka_offset` in the PG
tables and converted to `offset`.

The tests were updated to run with both the SQLite and Postgres adapter using the rstest crate. The
`create_test_store` function was updated to be the standard for all tests, and to allow choosing
between a SQLite and Postgres DB.

A `remove_db` function was added to the trait and the existing adapters, since the tests create a
unique PG database on every run that should be cleaned up.

The `create_test_store` function was updated to be the standard for all tests, and to allow choosing
between an SQLite and Postgres DB.
@evanh evanh requested a review from a team as a code owner January 16, 2026 20:43
@evanh evanh requested review from a team and removed request for a team January 16, 2026 20:43
Have the postgres adapter only fetch and do upkeep on activations that are part of the partition
that the consumer is assigned. The broker can still update tasks outside its partitions, in case a
worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions
to the store whenever partitions are assigned.

This was originally tested with PARTITION BY, but that requires manually keeping track of the
partition tables which isn't a desired behaviour.
@evanh evanh force-pushed the evanh/fix/use-partitions-in-postgres branch from 8689312 to 05125fe Compare January 16, 2026 21:56
state = match (state, event) {
(ConsumerState::Ready, Event::Assign(tpl)) => {
metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64);
// Note: This assumes we only process one topic per consumer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far that has always been true.

read_pool: PgPool,
write_pool: PgPool,
config: PostgresActivationStoreConfig,
partitions: RwLock<Vec<i32>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you use a tokio RwLock to avoid locking up one of the tokio threads?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because updating the partitions isn't an asynchronous operation. So I want this to block.

Base automatically changed from evanh/feat/use-postgresql-interface to main January 28, 2026 21:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants