Skip to content

Commit 6d09d03

Browse files
authored
Fix DrainAsync Reason at Logging and Prevent Queue Listening During Partition Release (#1208)
1 parent 463c052 commit 6d09d03

File tree

1 file changed

+15
-7
lines changed

1 file changed

+15
-7
lines changed

src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,13 @@ public async Task<ReadTableReponse> ReadAndWriteTableAsync(bool isShuttingDown,
426426

427427
// Ensure worker is listening to the control queue iff either:
428428
// 1) worker just claimed the lease,
429-
// 2) worker was already the owner in the partitions table and is not actively draining the queue. Note that during draining, we renew the lease but do not want to listen to new messages. Otherwise, we'll never finish draining our in-memory messages.
430-
bool isRenewingToDrainQueue = renewedLease & response.IsDrainingPartition;
429+
// 2) worker was already the owner in the partitions table and is not actively draining the queue.
430+
// Note that during draining, we renew the lease but do not want to listen to new messages.
431+
// Otherwise, we'll never finish draining our in-memory messages.
432+
// When draining completes, and the worker may decide to release the lease. In that moment,
433+
// IsDrainingPartition can still be true but renewedLease can be false — without checking
434+
// !releasedLease, the worker could incorrectly resume listening just before releasing the lease.
435+
bool isRenewingToDrainQueue = renewedLease && response.IsDrainingPartition && !releasedLease;
431436
if (claimedLease || !isRenewingToDrainQueue)
432437
{
433438
// Notify the orchestration session manager that we acquired a lease for one of the partitions.
@@ -505,7 +510,8 @@ void RenewOrReleaseMyLease(
505510
partition,
506511
ref releasedLease,
507512
ref renewedLease,
508-
ref drainedLease);
513+
ref drainedLease,
514+
CloseReason.LeaseLost);
509515
}
510516
}
511517

@@ -583,7 +589,8 @@ void TryDrainAndReleaseAllPartitions(
583589
partition,
584590
ref releasedLease,
585591
ref renewedLease,
586-
ref drainedLease);
592+
ref drainedLease,
593+
CloseReason.Shutdown);
587594

588595
if (releasedLease)
589596
{
@@ -661,7 +668,7 @@ await this.partitionTable.ReplaceEntityAsync(
661668
partition,
662669
etag,
663670
forceShutdownToken);
664-
671+
665672
this.settings.Logger.LeaseStealingSucceeded(
666673
this.storageAccountName,
667674
this.settings.TaskHubName,
@@ -815,7 +822,8 @@ void CheckDrainTask(
815822
TablePartitionLease partition,
816823
ref bool releasedLease,
817824
ref bool renewedLease,
818-
ref bool drainedLease)
825+
ref bool drainedLease,
826+
CloseReason reason)
819827
{
820828
// Check if drain process has started.
821829
if (this.backgroundDrainTasks.TryGetValue(partition.RowKey!, out Task? drainTask))
@@ -844,7 +852,7 @@ void CheckDrainTask(
844852
}
845853
else// If drain task hasn't been started yet, start it and keep renewing the lease to prevent it from expiring.
846854
{
847-
this.DrainPartition(partition, CloseReason.Shutdown);
855+
this.DrainPartition(partition, reason);
848856
this.RenewLease(partition);
849857
renewedLease = true;
850858
drainedLease = true;

0 commit comments

Comments
 (0)