Skip to content

Commit 5229cf4

Browse files
Fix deadlock in results_queue.join() during training
Add a 10-second timeout to results_queue.join() to prevent indefinite hangs when lingering results aren't properly consumed. If a timeout occurs, drain any remaining items from the queue to allow training to continue. This fixes an issue where training could deadlock between steps if results from a previous step remained unprocessed in the queue.
1 parent c0365f0 commit 5229cf4

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

src/art/unsloth/service.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,21 @@ async def train(
8888
) -> AsyncIterator[dict[str, float]]:
8989
# Get the packed tensors from disk
9090
packed_tensors = packed_tensors_from_dir(**disk_packed_tensors)
91-
# Wait for existing batches to finish
92-
await self.results_queue.join()
91+
# Wait for existing batches to finish, with timeout to prevent deadlock
92+
try:
93+
await asyncio.wait_for(self.results_queue.join(), timeout=10.0)
94+
except asyncio.TimeoutError:
95+
# Recover from deadlock by draining queue
96+
drained = 0
97+
while True:
98+
try:
99+
self.results_queue.get_nowait()
100+
self.results_queue.task_done()
101+
drained += 1
102+
except asyncio.QueueEmpty:
103+
break
104+
if verbose and drained > 0:
105+
print(f"Warning: Drained {drained} lingering result(s) from queue")
93106
# If we haven't already, start the training task
94107
if self._train_task is None:
95108
self._train_task = asyncio.create_task(

0 commit comments

Comments
 (0)