-
Notifications
You must be signed in to change notification settings - Fork 104
Add parallelize class method to maintenance_tasks::task #1337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
adrianna-chang-shopify
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this feature could make sense, I do think we'll need to make it a bit safer to use so that we don't risk creating a ton of threads or exhausting AR's connection pool. Left some thoughts inline!
| exceptions = [] | ||
| exception_mutex = Mutex.new | ||
|
|
||
| threads = items.map do |item| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batches can be of arbitrary size, e.g. 1000+ items. There are risks of performance degradation / system instability in generating an unbounded number of threads. Should we implement some sort of thread pool with a configurable size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(We may also want to coordinate with Rails' connection pool size, which defaults to 5 connections)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another idea is to make the thread count part of the API, ie. parallelize(threads: 5). I don't think we should tie thread count to the batch size though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I don't want to allow people spawning unbounded number of threads if they just follow the conventions which for in_batches is 1000 elements per batch
| # implement an override for this method. | ||
| def process(_item) | ||
| raise NoMethodError, "#{self.class.name} must implement `process`." | ||
| def process_item(_item) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure exactly what to name this, but I think we need an API that's more distinct from #process that indicates that this is for parallel processing in a batch. Maybe #process_for_batch?
| items = batch.respond_to?(:to_a) ? batch.to_a : Array(batch) | ||
|
|
||
| # Execute items in parallel, storing errored item for context | ||
| ParallelExecutor.execute(items) do |item| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we could return the exceptions array ([{ item: <item> , error: <error> }]) directly from .execute instead of raising the error. This would simplify things a lot, ie.
class ParallelExecutor
class << self
def execute(items, &block)
...
threads = items.map do |item|
Thread.new do
ActiveRecord::Base.connection_pool.with_connection do
block.call(item)
rescue => error
exception_mutex.synchronize do
exceptions << { item: item, error: error }
end
end
end
end
threads.each(&:join)
exceptions
end
...And then here:
exceptions = ParallelExecutor.execute(items) do |item|
process_item(item)
end
if exceptions.any?
@errored_element = exceptions.first[:item]
raise exceptions.first.error
end|
I think it's a bad idea, we already have a unit of work, it's the job, and we don't have to handle it, the queue does, and it's not something we should get into IMO. |
Add
parallelizeclass method toMaintenanceTasks::TaskSummary
This PR adds a
parallelizeclass method toMaintenanceTasks::Taskthat enables parallel processing of batch items using threads. This provides a cleaner, more Rails-like API compared to including a concern.Usage
Changes
parallelizedclass attribute to track parallel processing modeparallelizeclass method to enable parallel processingparallelized?class and instance methodsprocess_iteminstance method placeholderprocessto route to parallel execution when enabledParallelExecutorfor thread-safe parallel item processingNotes
process_itemis idempotent.process_itemmust be thread-safe. Avoid shared mutable state.