File tree Expand file tree Collapse file tree
tests/unit/task_scheduler/interactive Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1+ import queue
2+ import unittest
3+ from threading import Lock
4+ from concurrent .futures import Future
5+
6+ from executorlib .task_scheduler .interactive .blockallocation import _drain_dead_worker
7+ from executorlib .task_scheduler .interactive .shared import task_done
8+ from executorlib .standalone .interactive .communication import ExecutorlibSocketError
9+
10+
11+ class TestDrainDeadWorker (unittest .TestCase ):
12+ def test_fail_tasks_when_no_workers_remain (self ):
13+ future_queue = queue .Queue ()
14+ alive_workers = [1 ]
15+ alive_workers_lock = Lock ()
16+ future = Future ()
17+
18+ # Add a task and then the shutdown sentinel
19+ future_queue .put ({"fn" : lambda : 42 , "future" : future })
20+ future_queue .put ({"shutdown" : True })
21+
22+ _drain_dead_worker (
23+ future_queue = future_queue ,
24+ alive_workers = alive_workers ,
25+ alive_workers_lock = alive_workers_lock ,
26+ )
27+
28+ # Worker count should be decremented
29+ self .assertEqual (alive_workers [0 ], 0 )
30+
31+ # Task should fail with ExecutorlibSocketError
32+ with self .assertRaises (ExecutorlibSocketError ):
33+ future .result ()
You can’t perform that action at this time.
0 commit comments