-
-
Notifications
You must be signed in to change notification settings - Fork 3
feat(postgres): Change the postgres adapter to be partition aware #534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This adds a postgres storage adapter for the taskbroker, as well as providing a way to choose between the adapters in the configuration. This adapter will also work with AlloyDB. In postgres, the keyword `offset` is reserved, so that column is called `kafka_offset` in the PG tables and converted to `offset`. The tests were updated to run with both the SQLite and Postgres adapter using the rstest crate. The `create_test_store` function was updated to be the standard for all tests, and to allow choosing between a SQLite and Postgres DB. A `remove_db` function was added to the trait and the existing adapters, since the tests create a unique PG database on every run that should be cleaned up. The `create_test_store` function was updated to be the standard for all tests, and to allow choosing between an SQLite and Postgres DB.
Have the postgres adapter only fetch and do upkeep on activations that are part of the partition that the consumer is assigned. The broker can still update tasks outside its partitions, in case a worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions to the store whenever partitions are assigned. This was originally tested with PARTITION BY, but that requires manually keeping track of the partition tables which isn't a desired behaviour.
8689312 to
05125fe
Compare
| state = match (state, event) { | ||
| (ConsumerState::Ready, Event::Assign(tpl)) => { | ||
| metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64); | ||
| // Note: This assumes we only process one topic per consumer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far that has always been true.
| read_pool: PgPool, | ||
| write_pool: PgPool, | ||
| config: PostgresActivationStoreConfig, | ||
| partitions: RwLock<Vec<i32>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you use a tokio RwLock to avoid locking up one of the tokio threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No because updating the partitions isn't an asynchronous operation. So I want this to block.
Have the postgres adapter only fetch and do upkeep on activations that are part of the partition that the consumer is assigned. The broker can still update tasks outside its partitions, in case a worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions to the store whenever partitions are assigned.