Skip to content

Conversation

@Devake
Copy link

@Devake Devake commented Dec 3, 2025

Add parallelize class method to MaintenanceTasks::Task

Summary

This PR adds a parallelize class method to MaintenanceTasks::Task that enables parallel processing of batch items using threads. This provides a cleaner, more Rails-like API compared to including a concern.

Usage

class Maintenance::UpdateUsersTask < MaintenanceTasks::Task
  parallelize

  def collection
    User.where(status: 'pending').in_batches(of: 10)
  end

  def process_item(user)
    # Called in parallel (10 concurrent threads per batch)
    user.update!(status: 'processed')
  end
end

Changes

  • Added parallelized class attribute to track parallel processing mode
  • Added parallelize class method to enable parallel processing
  • Added parallelized? class and instance methods
  • Added process_item instance method placeholder
  • Modified process to route to parallel execution when enabled
  • Added ParallelExecutor for thread-safe parallel item processing
  • Added comprehensive test coverage

Notes

  • Cursor granularity: The cursor tracks batches, not individual items. If interrupted mid-batch, items will be reprocessed on resume. Ensure process_item is idempotent.
  • Thread safety: process_item must be thread-safe. Avoid shared mutable state.
  • Error handling: If any thread raises an exception, the entire batch fails and the first exception is propagated.
  • Progress tracking: Progress is tracked per batch, not per item.

@Devake Devake self-assigned this Dec 3, 2025
Copy link
Contributor

@adrianna-chang-shopify adrianna-chang-shopify left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this feature could make sense, I do think we'll need to make it a bit safer to use so that we don't risk creating a ton of threads or exhausting AR's connection pool. Left some thoughts inline!

exceptions = []
exception_mutex = Mutex.new

threads = items.map do |item|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batches can be of arbitrary size, e.g. 1000+ items. There are risks of performance degradation / system instability in generating an unbounded number of threads. Should we implement some sort of thread pool with a configurable size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(We may also want to coordinate with Rails' connection pool size, which defaults to 5 connections)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea is to make the thread count part of the API, ie. parallelize(threads: 5). I don't think we should tie thread count to the batch size though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I don't want to allow people spawning unbounded number of threads if they just follow the conventions which for in_batches is 1000 elements per batch

# implement an override for this method.
def process(_item)
raise NoMethodError, "#{self.class.name} must implement `process`."
def process_item(_item)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure exactly what to name this, but I think we need an API that's more distinct from #process that indicates that this is for parallel processing in a batch. Maybe #process_for_batch?

items = batch.respond_to?(:to_a) ? batch.to_a : Array(batch)

# Execute items in parallel, storing errored item for context
ParallelExecutor.execute(items) do |item|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we could return the exceptions array ([{ item: <item> , error: <error> }]) directly from .execute instead of raising the error. This would simplify things a lot, ie.

class ParallelExecutor
  class << self
     def execute(items, &block)
       ...

       threads = items.map do |item|
         Thread.new do
            ActiveRecord::Base.connection_pool.with_connection do
              block.call(item)
            rescue => error
              exception_mutex.synchronize do
                exceptions << { item: item, error: error }
              end
            end
          end
        end

        threads.each(&:join)

        exceptions
      end
...

And then here:

exceptions =  ParallelExecutor.execute(items) do |item|
  process_item(item)
end

if exceptions.any?
  @errored_element = exceptions.first[:item]
  raise exceptions.first.error
end

@etiennebarrie
Copy link
Member

I think it's a bad idea, we already have a unit of work, it's the job, and we don't have to handle it, the queue does, and it's not something we should get into IMO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants