Conversation
19a5443 to
ec316bf
Compare
This adds WORKER_TYPE setting. The default value is 'pulpcore'. When 'redis' is selected, the tasking system uses Redis to lock resources. Redis workers produce less load on the PostgreSQL database. closes: pulp#7210 Generated By: Claude Code.
da96f66 to
dd46ad1
Compare
Added redis connection checks to the worker so it shuts down if the connection is broken.
gerrod3
left a comment
There was a problem hiding this comment.
There's still a lot I haven't deeply reviewed yet, but this was getting long and I had a big idea around dispatch that I want to discuss
| current_app = AppStatus.objects.current() | ||
| if current_app: | ||
| _logger.info( | ||
| "TASK EXECUTION: Task %s being executed by %s (app_type=%s)", | ||
| task.pk, | ||
| current_app.name, | ||
| current_app.app_type, | ||
| ) | ||
| else: | ||
| _logger.info( | ||
| "TASK EXECUTION: Task %s being executed with no AppStatus.current()", task.pk | ||
| ) |
There was a problem hiding this comment.
Is this needed? Can this be moved to log_task_start? The value should be set on the task object after set_running is called.
| finally: | ||
| # Safety net: if we crashed before reaching the lock release above, | ||
| # still try to release locks here (e.g., if crash during task execution) | ||
| if safe_release_task_locks(task): |
There was a problem hiding this comment.
I'm pretty sure this is wrong. finally always runs regardless of exception or returning early.
| def execute_task(task): | ||
| """Redis-aware task execution that releases Redis locks for immediate tasks.""" | ||
| # This extra stack is needed to isolate the current_task ContextVar | ||
| contextvars.copy_context().run(_execute_task, task) |
There was a problem hiding this comment.
Reading through this version and the base version there is nothing much different between the two besides that this one calls safe_release_task_locks. Could this be a wrapper of the original with a try/finally to release the redis locks?
| current_app = AppStatus.objects.current() | ||
| lock_owner = current_app.name if current_app else f"immediate-{task.pk}" |
There was a problem hiding this comment.
Probably same question as before in _execute_task, but when is this ever None?
| except Exception: | ||
| # Exception during execute_task() | ||
| # Atomically release all locks as safety net | ||
| safe_release_task_locks(task, lock_owner) |
There was a problem hiding this comment.
Not needed, execute_task should already handle letting go of the locks.
| task = Task.objects.create(**task_payload) | ||
| if execute_now: | ||
| # Try to atomically acquire task lock and resource locks | ||
| # are_resources_available() now acquires ALL locks atomically | ||
| if are_resources_available(task): |
There was a problem hiding this comment.
Crazy idea, not sure if you want to try it: but how about trying to acquire the redis locks before the task hits the DB?
In the default worker scenario, dispatch acquires the lock on the task on creation because app_lock is set to the current task dispatcher (usually the API worker) and the task worker's fetch_task can only select from tasks that have this field be null.
The new redis worker selects from any task that is waiting and thus there is this time window between the the task object hitting the DB and line 470's are_resources_available that you have to account for inside dispatch. Instead we can acquire the task lock first, then create the task, then try to acquire the task's needed resources locks, if successful execute, else defer and finally do a safe release of the task lock. This way dispatch shouldn't be fighting against task workers to get the task lock.
| task = Task.objects.create(**task_payload) | |
| if execute_now: | |
| # Try to atomically acquire task lock and resource locks | |
| # are_resources_available() now acquires ALL locks atomically | |
| if are_resources_available(task): | |
| # note that the pulp_id is set once the object is instantiated even if not saved to the DB yet! | |
| task = Task(**task_payload) | |
| # new function to just acquire the lock on the task | |
| aquire_lock(task.pulp_id) | |
| task.save() | |
| if execute_now: | |
| # Change this function to only get task's resources since we already hold the task lock | |
| if are_resources_available(resources): | |
| # now guarenteed to have task + resource locks |
This adds WORKER_TYPE setting. The default value is 'pulpcore'. When 'redis' is selected, the tasking system uses Redis to lock resources. Redis workers produce less load on the PostgreSQL database.
closes: #7210
Generated By: Claude Code.
📜 Checklist
See: Pull Request Walkthrough