Skip to content

Conversation

@vanitabhagwat
Copy link
Collaborator

@vanitabhagwat vanitabhagwat commented Oct 17, 2025

This PR is to create a more generic solution for write rate limiting that can be utilized by all online stores.

@vanitabhagwat vanitabhagwat changed the title add centralized rate limiter feat: add centralized rate limiter Oct 17, 2025
@vanitabhagwat vanitabhagwat changed the title feat: add centralized rate limiter feat: Add centralized rate limiter Oct 17, 2025
Comment on lines 300 to 408
# Leaving one core for operating system and other background processes
num_processes = num_spark_driver_cores - 1

if table.num_rows < num_processes:
num_processes = table.num_rows

# Input table is split into smaller chunks and processed in parallel
chunks = self.split_table(num_processes, table)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you intentionally deleting the comments here? There are some comment deletions throughout.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't intentional. I ran the black and ruff formatting which probably did that. Added them back.

Comment on lines 34 to 47
backoff = 0.005 # initial minimal sleep
while not self.acquire():
# Compute estimated sleep until oldest timestamp exits window.
# We use the current index position as the next candidate slot.
now = time.time()
with self._lock:
oldest_ts = self.timestamps[self.index]
remaining = oldest_ts + self.period - now
if remaining <= 0:
continue
# Sleep the smaller of remaining and a capped value to re-check frequently.
time.sleep(min(remaining, 0.05))
# Optional exponential backoff (bounded) if still not free.
backoff = min(backoff * 2, 0.05)
Copy link
Collaborator

@piket piket Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like backoff is actually used anywhere, it's only recalculated. Is this meant to be part of the minimum sleep instead of the hardcoded 0.05?

entities_to_keep: Sequence[Entity],
partial: bool,
):
# Call update only if there is an online store
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you recomment this as well.

Copy link
Collaborator

@piket piket left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants