Skip to content

Commit 2c577b2

Browse files
committed
fix: stop scheduling new futures after shutdown
1 parent fac5343 commit 2c577b2

File tree

1 file changed

+60
-17
lines changed

1 file changed

+60
-17
lines changed

laygo/transformers/strategies/process.py

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,19 +57,40 @@ def _ordered_generator(
5757
) -> Iterator[list[Out]]:
5858
"""Generate results in their original order."""
5959
futures = deque()
60+
61+
# Pre-submit initial batch of futures
6062
for _ in range(self.max_workers + 1):
6163
try:
6264
chunk = next(chunks_iter)
6365
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
6466
except StopIteration:
6567
break
68+
except RuntimeError as e:
69+
if "cannot schedule new futures after shutdown" in str(e):
70+
break
71+
raise
72+
6673
while futures:
67-
yield futures.popleft().result()
6874
try:
69-
chunk = next(chunks_iter)
70-
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
71-
except StopIteration:
72-
continue
75+
yield futures.popleft().result()
76+
77+
# Try to submit the next chunk
78+
try:
79+
chunk = next(chunks_iter)
80+
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
81+
except StopIteration:
82+
continue
83+
except RuntimeError as e:
84+
if "cannot schedule new futures after shutdown" in str(e):
85+
# Executor is shut down, stop submitting new work
86+
break
87+
raise
88+
except Exception:
89+
# Cancel remaining futures and re-raise
90+
for future in futures:
91+
future.cancel()
92+
futures.clear()
93+
raise
7394

7495
def _unordered_generator(
7596
self,
@@ -79,16 +100,38 @@ def _unordered_generator(
79100
context_handle: IContextHandle,
80101
) -> Iterator[list[Out]]:
81102
"""Generate results as they complete."""
82-
futures = {
83-
executor.submit(_worker_process_chunk, transformer, context_handle, chunk)
84-
for chunk in itertools.islice(chunks_iter, self.max_workers + 1)
85-
}
103+
futures = set()
104+
105+
# Pre-submit initial batch
106+
try:
107+
for chunk in itertools.islice(chunks_iter, self.max_workers + 1):
108+
futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
109+
except RuntimeError as e:
110+
if "cannot schedule new futures after shutdown" in str(e):
111+
# If we can't submit any futures, there's nothing to process
112+
return
113+
raise
114+
86115
while futures:
87-
done, futures = wait(futures, return_when=FIRST_COMPLETED)
88-
for future in done:
89-
yield future.result()
90-
try:
91-
chunk = next(chunks_iter)
92-
futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
93-
except StopIteration:
94-
continue
116+
try:
117+
done, futures = wait(futures, return_when=FIRST_COMPLETED)
118+
for future in done:
119+
yield future.result()
120+
121+
# Try to submit next chunk if available
122+
try:
123+
chunk = next(chunks_iter)
124+
futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
125+
except StopIteration:
126+
continue
127+
except RuntimeError as e:
128+
if "cannot schedule new futures after shutdown" in str(e):
129+
# Executor is shut down, stop submitting new work
130+
break
131+
raise
132+
except Exception:
133+
# Cancel remaining futures and re-raise
134+
for future in futures:
135+
future.cancel()
136+
futures.clear()
137+
raise

0 commit comments

Comments
 (0)