-
Notifications
You must be signed in to change notification settings - Fork 4
Fix race condition in unsubscribe by waiting for in-flight updates #248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
This change fixes a race condition where calling unsubscribe() while a feed update is in progress could result in cache writes occurring after cache cleanup, leading to stale data remaining in the cache. Changes: - Add getCurrentUpdate() method to GbfsSubscription interface to track in-flight updates using CompletableFuture - Implement update tracking in GbfsV2Subscription and GbfsV3Subscription by wrapping update logic with CompletableFuture lifecycle management - Add unsubscribeAsync() methods to GbfsSubscriptionManager that wait for in-flight updates to complete before returning (with configurable timeout) - Update existing unsubscribe() to wrap unsubscribeAsync() for backwards compatibility (now deprecated in favor of async version) - Add comprehensive unit tests for race condition scenarios including: * Waiting for in-flight updates * Timeout handling for stuck updates * Immediate completion when no updates in progress * Double unsubscribe handling Benefits: - Eliminates race condition without polling (event-driven waiting) - Minimal performance overhead (single CompletableFuture per update) - Backwards compatible (existing unsubscribe() still works) - Benefits all users of the library Related to Lamassu issue: Race condition between FeedUpdater.stopSubscription and ongoing feed update callbacks. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR attempts to fix a race condition where calling unsubscribe() while a feed update is in progress could result in cache writes occurring after cache cleanup. The solution introduces asynchronous tracking of in-flight updates using CompletableFuture.
Key Changes:
- Adds
getCurrentUpdate()method to theGbfsSubscriptioninterface that returns aCompletableFuture<Void>representing in-flight updates - Implements update tracking in both
GbfsV2SubscriptionandGbfsV3Subscriptionby creating a future at the start of each update - Adds
unsubscribeAsync()methods with configurable timeout to wait for in-flight updates
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
src/main/java/org/entur/gbfs/loader/GbfsSubscription.java |
Adds getCurrentUpdate() method to interface |
src/main/java/org/entur/gbfs/loader/v2/GbfsV2Subscription.java |
Implements update tracking using volatile CompletableFuture field |
src/main/java/org/entur/gbfs/loader/v3/GbfsV3Subscription.java |
Implements update tracking using volatile CompletableFuture field |
src/main/java/org/entur/gbfs/GbfsSubscriptionManager.java |
Adds unsubscribeAsync() methods with timeout and deprecates synchronous unsubscribe() |
src/test/java/org/entur/gbfs/GBFSSubscriptionTest.java |
Adds tests for async unsubscribe behavior including timeout scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Thread.currentThread().interrupt(); | ||
| throw new RuntimeException("Interrupted while unsubscribing", e); | ||
| } catch (ExecutionException e) { | ||
| throw new RuntimeException("Error while unsubscribing", e); |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message "Error while unsubscribing" is not helpful for debugging. The ExecutionException wraps the actual exception from the update (or TimeoutException). Consider unwrapping and including more context, such as:
throw new RuntimeException("Error waiting for in-flight update to complete during unsubscribe", e.getCause() != null ? e.getCause() : e);Or handle TimeoutException separately to provide a more specific message.
| throw new RuntimeException("Error while unsubscribing", e); | |
| Throwable cause = e.getCause(); | |
| if (cause instanceof java.util.concurrent.TimeoutException) { | |
| throw new RuntimeException("Timeout while waiting for in-flight update to complete during unsubscribe", cause); | |
| } else { | |
| throw new RuntimeException("Error waiting for in-flight update to complete during unsubscribe", cause != null ? cause : e); | |
| } |
| // Create a future for this update to enable waiting during unsubscribe | ||
| CompletableFuture<Void> updateFuture = new CompletableFuture<>(); | ||
| currentUpdate = updateFuture; |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a race condition between when update() is scheduled on the thread pool (via GbfsSubscriptionManager.update()) and when the currentUpdate future is actually created. If unsubscribeAsync() is called after update() is scheduled but before this code executes, getCurrentUpdate() will return a completed future, causing unsubscribe to complete prematurely.
Consider creating and setting the currentUpdate future before the update work begins, or using a different synchronization mechanism that tracks pending updates at the subscription manager level.
| CompletableFuture<Void> updateFuture = new CompletableFuture<>(); | ||
| currentUpdate = updateFuture; |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a race condition between when update() is scheduled on the thread pool (via GbfsSubscriptionManager.update()) and when the currentUpdate future is actually created. If unsubscribeAsync() is called after update() is scheduled but before this code executes, getCurrentUpdate() will return a completed future, causing unsubscribe to complete prematurely.
Consider creating and setting the currentUpdate future before the update work begins, or using a different synchronization mechanism that tracks pending updates at the subscription manager level.
| CompletableFuture<Void> updateFuture = new CompletableFuture<>(); | |
| currentUpdate = updateFuture; | |
| currentUpdate = new CompletableFuture<>(); | |
| CompletableFuture<Void> updateFuture = currentUpdate; |
| * Update the subscription by updating the loader and push a new delivery | ||
| * to the consumer if the update had changes | ||
| */ | ||
| public void update() { |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The update() method is not thread-safe. If multiple threads call update() concurrently, they will race to set currentUpdate, potentially causing one update's future to be lost. This could lead to unsubscribe waiting for the wrong update or completing prematurely.
Consider adding synchronization to prevent concurrent updates, or documenting that update() must not be called concurrently for the same subscription instance.
| public void update() { | |
| public synchronized void update() { |
| * Update the subscription by updating the loader and push a new delivery | ||
| * to the consumer if the update had changes | ||
| */ | ||
| public void update() { |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The update() method is not thread-safe. If multiple threads call update() concurrently, they will race to set currentUpdate, potentially causing one update's future to be lost. This could lead to unsubscribe waiting for the wrong update or completing prematurely.
Consider adding synchronization to prevent concurrent updates, or documenting that update() must not be called concurrently for the same subscription instance.
| Thread.sleep(500); | ||
| Assertions.assertFalse(unsubscribeFuture.isDone()); |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Thread.sleep() for test synchronization is unreliable and can lead to flaky tests. The test might pass on fast machines but fail on slow/loaded CI systems, or vice versa.
Consider using a more deterministic approach, such as checking unsubscribeFuture.isDone() in a loop with a timeout, or using CompletableFuture.get() with a timeout and asserting it throws TimeoutException.
| Thread.sleep(500); | |
| Assertions.assertFalse(unsubscribeFuture.isDone()); | |
| Assertions.assertThrows( | |
| TimeoutException.class, | |
| () -> unsubscribeFuture.get(300, TimeUnit.MILLISECONDS) | |
| ); |
| ); | ||
| manager.update(id); | ||
|
|
||
| Thread.sleep(500); // Let update start |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Thread.sleep() for test synchronization is unreliable and can lead to flaky tests. On a heavily loaded system, the update might not start within 500ms, causing the test to pass incorrectly when it should verify timeout behavior.
Consider using a CountDownLatch to signal when the update has started (similar to the first test), ensuring the update is guaranteed to be in progress before testing the timeout.
| /** | ||
| * Stop a subscription on a GBFS feed (blocking version). | ||
| * This method blocks until the subscription's in-flight update completes. | ||
| * | ||
| * @param identifier An identifier returned by subscribe method. | ||
| * @param identifier An identifier returned by subscribe method | ||
| * @deprecated Use {@link #unsubscribeAsync(String)} for better async composition | ||
| */ |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation is misleading. The method doesn't always block until the update completes - it uses a 30-second timeout and will throw a RuntimeException (wrapping TimeoutException) if the update doesn't complete in time. The update will continue running in the background even after the timeout.
Consider updating the documentation to mention the timeout behavior and that the update may still be running when this method returns via exception.
Addresses Copilot review feedback from PR #248: 1. **Critical Fix: Race condition in future creation** - Future is now created and set on subscription BEFORE scheduling - Added setCurrentUpdate() method to GbfsSubscription interface - Manager calls setCurrentUpdate() before pool.execute() - This ensures getCurrentUpdate() sees the future even if task hasn't started 2. **Critical Fix: Thread-safety in update() method** - Added synchronized keyword to update() in both V2 and V3 subscriptions - Added synchronized to getCurrentUpdate() for visibility - Prevents concurrent updates from overwriting currentUpdate reference 3. **Implementation details** - update() now checks if future was pre-set by manager - Falls back to creating new future if called directly (backwards compatible) - Cleaner separation between manager orchestration and subscription execution These fixes ensure that unsubscribe cannot miss in-flight updates due to timing issues between task scheduling and execution.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public synchronized void update() { | ||
| // Use the future set by the manager, or create a new one if called directly |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method overrides GbfsSubscription.update; it is advisable to add an Override annotation.
| public synchronized void update() { | |
| // Use the future set by the manager, or create a new one if called directly | |
| @Override | |
| public synchronized void update() { | |
| // Use the future set by the manager, or create a new one if called directly |
| public synchronized void update() { | ||
| // Use the future set by the manager, or create a new one if called directly |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method overrides GbfsSubscription.update; it is advisable to add an Override annotation.
| public synchronized void update() { | |
| // Use the future set by the manager, or create a new one if called directly | |
| @Override | |
| public synchronized void update() { | |
| // Use the future set by the manager, or create a new one if called directly |




This change fixes a race condition where calling unsubscribe() while a feed update is in progress could result in cache writes occurring after cache cleanup, leading to stale data remaining in the cache.
Changes:
Benefits:
Related to Lamassu issue: Race condition between FeedUpdater.stopSubscription and ongoing feed update callbacks.
🤖 Generated with Claude Code