@@ -37,6 +37,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
3737 private final ExecutorService workerPool ;
3838
3939 private final TaskHubSidecarServiceBlockingStub sidecarClient ;
40+ private final boolean isExecutorServiceManaged ;
4041 private volatile boolean isNormalShutdown = false ;
4142
4243 DurableTaskGrpcWorker (DurableTaskGrpcWorkerBuilder builder ) {
@@ -67,6 +68,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
6768 this .dataConverter = builder .dataConverter != null ? builder .dataConverter : new JacksonDataConverter ();
6869 this .maximumTimerInterval = builder .maximumTimerInterval != null ? builder .maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL ;
6970 this .workerPool = builder .executorService != null ? builder .executorService : Executors .newCachedThreadPool ();
71+ this .isExecutorServiceManaged = builder .executorService == null ;
7072 }
7173
7274 /**
@@ -81,10 +83,10 @@ public void start() {
8183 }
8284
8385 /**
84- * Closes the internally managed gRPC channel, if one exists.
86+ * Closes the internally managed gRPC channel and executor service , if one exists.
8587 * <p>
86- * This method is a no-op if this client object was created using a builder with a gRPC channel object explicitly
87- * configured .
88+ * Only the internally managed GRPC Channel and Executor services are closed. If any of them are supplied,
89+ * it is the responsibility of the supplier to take care of them .
8890 */
8991 public void close () {
9092 this .isNormalShutdown = true ;
@@ -116,7 +118,7 @@ public void startAndBlock() {
116118 logger );
117119
118120 // TODO: How do we interrupt manually?
119- while (! this . workerPool . isShutdown () ) {
121+ while (true ) {
120122 try {
121123 GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest .newBuilder ().build ();
122124 Iterator <WorkItem > workItemStream = this .sidecarClient .getWorkItems (getWorkItemsRequest );
@@ -242,17 +244,19 @@ private void closeSideCarChannel() {
242244 }
243245
244246 private void shutDownWorkerPool () {
245- if (!this .isNormalShutdown ) {
246- logger .log (Level .WARNING , "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted" );
247- }
247+ if (this .isExecutorServiceManaged ) {
248+ if (!this .isNormalShutdown ) {
249+ logger .log (Level .WARNING , "ExecutorService shutdown initiated unexpectedly. No new tasks will be accepted" );
250+ }
248251
249- this .workerPool .shutdown ();
250- try {
251- if (!this .workerPool .awaitTermination (60 , TimeUnit .SECONDS )) {
252- this .workerPool .shutdownNow ();
252+ this .workerPool .shutdown ();
253+ try {
254+ if (!this .workerPool .awaitTermination (60 , TimeUnit .SECONDS )) {
255+ this .workerPool .shutdownNow ();
256+ }
257+ } catch (InterruptedException ex ) {
258+ Thread .currentThread ().interrupt ();
253259 }
254- } catch (InterruptedException ex ) {
255- Thread .currentThread ().interrupt ();
256260 }
257261 }
258262
0 commit comments