Skip to content

Conversation

@testower
Copy link
Collaborator

This change fixes a race condition where calling unsubscribe() while a feed update is in progress could result in cache writes occurring after cache cleanup, leading to stale data remaining in the cache.

Changes:

  • Add getCurrentUpdate() method to GbfsSubscription interface to track in-flight updates using CompletableFuture
  • Implement update tracking in GbfsV2Subscription and GbfsV3Subscription by wrapping update logic with CompletableFuture lifecycle management
  • Add unsubscribeAsync() methods to GbfsSubscriptionManager that wait for in-flight updates to complete before returning (with configurable timeout)
  • Update existing unsubscribe() to wrap unsubscribeAsync() for backwards compatibility (now deprecated in favor of async version)
  • Add comprehensive unit tests for race condition scenarios including:
    • Waiting for in-flight updates
    • Timeout handling for stuck updates
    • Immediate completion when no updates in progress
    • Double unsubscribe handling

Benefits:

  • Eliminates race condition without polling (event-driven waiting)
  • Minimal performance overhead (single CompletableFuture per update)
  • Backwards compatible (existing unsubscribe() still works)
  • Benefits all users of the library

Related to Lamassu issue: Race condition between FeedUpdater.stopSubscription and ongoing feed update callbacks.

🤖 Generated with Claude Code

This change fixes a race condition where calling unsubscribe() while a feed
update is in progress could result in cache writes occurring after cache
cleanup, leading to stale data remaining in the cache.

Changes:
- Add getCurrentUpdate() method to GbfsSubscription interface to track
  in-flight updates using CompletableFuture
- Implement update tracking in GbfsV2Subscription and GbfsV3Subscription
  by wrapping update logic with CompletableFuture lifecycle management
- Add unsubscribeAsync() methods to GbfsSubscriptionManager that wait for
  in-flight updates to complete before returning (with configurable timeout)
- Update existing unsubscribe() to wrap unsubscribeAsync() for backwards
  compatibility (now deprecated in favor of async version)
- Add comprehensive unit tests for race condition scenarios including:
  * Waiting for in-flight updates
  * Timeout handling for stuck updates
  * Immediate completion when no updates in progress
  * Double unsubscribe handling

Benefits:
- Eliminates race condition without polling (event-driven waiting)
- Minimal performance overhead (single CompletableFuture per update)
- Backwards compatible (existing unsubscribe() still works)
- Benefits all users of the library

Related to Lamassu issue: Race condition between FeedUpdater.stopSubscription
and ongoing feed update callbacks.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR attempts to fix a race condition where calling unsubscribe() while a feed update is in progress could result in cache writes occurring after cache cleanup. The solution introduces asynchronous tracking of in-flight updates using CompletableFuture.

Key Changes:

  • Adds getCurrentUpdate() method to the GbfsSubscription interface that returns a CompletableFuture<Void> representing in-flight updates
  • Implements update tracking in both GbfsV2Subscription and GbfsV3Subscription by creating a future at the start of each update
  • Adds unsubscribeAsync() methods with configurable timeout to wait for in-flight updates

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
src/main/java/org/entur/gbfs/loader/GbfsSubscription.java Adds getCurrentUpdate() method to interface
src/main/java/org/entur/gbfs/loader/v2/GbfsV2Subscription.java Implements update tracking using volatile CompletableFuture field
src/main/java/org/entur/gbfs/loader/v3/GbfsV3Subscription.java Implements update tracking using volatile CompletableFuture field
src/main/java/org/entur/gbfs/GbfsSubscriptionManager.java Adds unsubscribeAsync() methods with timeout and deprecates synchronous unsubscribe()
src/test/java/org/entur/gbfs/GBFSSubscriptionTest.java Adds tests for async unsubscribe behavior including timeout scenarios

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while unsubscribing", e);
} catch (ExecutionException e) {
throw new RuntimeException("Error while unsubscribing", e);
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message "Error while unsubscribing" is not helpful for debugging. The ExecutionException wraps the actual exception from the update (or TimeoutException). Consider unwrapping and including more context, such as:

throw new RuntimeException("Error waiting for in-flight update to complete during unsubscribe", e.getCause() != null ? e.getCause() : e);

Or handle TimeoutException separately to provide a more specific message.

Suggested change
throw new RuntimeException("Error while unsubscribing", e);
Throwable cause = e.getCause();
if (cause instanceof java.util.concurrent.TimeoutException) {
throw new RuntimeException("Timeout while waiting for in-flight update to complete during unsubscribe", cause);
} else {
throw new RuntimeException("Error waiting for in-flight update to complete during unsubscribe", cause != null ? cause : e);
}

Copilot uses AI. Check for mistakes.
Comment on lines 111 to 113
// Create a future for this update to enable waiting during unsubscribe
CompletableFuture<Void> updateFuture = new CompletableFuture<>();
currentUpdate = updateFuture;
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race condition between when update() is scheduled on the thread pool (via GbfsSubscriptionManager.update()) and when the currentUpdate future is actually created. If unsubscribeAsync() is called after update() is scheduled but before this code executes, getCurrentUpdate() will return a completed future, causing unsubscribe to complete prematurely.

Consider creating and setting the currentUpdate future before the update work begins, or using a different synchronization mechanism that tracks pending updates at the subscription manager level.

Copilot uses AI. Check for mistakes.
Comment on lines 109 to 110
CompletableFuture<Void> updateFuture = new CompletableFuture<>();
currentUpdate = updateFuture;
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race condition between when update() is scheduled on the thread pool (via GbfsSubscriptionManager.update()) and when the currentUpdate future is actually created. If unsubscribeAsync() is called after update() is scheduled but before this code executes, getCurrentUpdate() will return a completed future, causing unsubscribe to complete prematurely.

Consider creating and setting the currentUpdate future before the update work begins, or using a different synchronization mechanism that tracks pending updates at the subscription manager level.

Suggested change
CompletableFuture<Void> updateFuture = new CompletableFuture<>();
currentUpdate = updateFuture;
currentUpdate = new CompletableFuture<>();
CompletableFuture<Void> updateFuture = currentUpdate;

Copilot uses AI. Check for mistakes.
* Update the subscription by updating the loader and push a new delivery
* to the consumer if the update had changes
*/
public void update() {
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update() method is not thread-safe. If multiple threads call update() concurrently, they will race to set currentUpdate, potentially causing one update's future to be lost. This could lead to unsubscribe waiting for the wrong update or completing prematurely.

Consider adding synchronization to prevent concurrent updates, or documenting that update() must not be called concurrently for the same subscription instance.

Suggested change
public void update() {
public synchronized void update() {

Copilot uses AI. Check for mistakes.
* Update the subscription by updating the loader and push a new delivery
* to the consumer if the update had changes
*/
public void update() {
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update() method is not thread-safe. If multiple threads call update() concurrently, they will race to set currentUpdate, potentially causing one update's future to be lost. This could lead to unsubscribe waiting for the wrong update or completing prematurely.

Consider adding synchronization to prevent concurrent updates, or documenting that update() must not be called concurrently for the same subscription instance.

Copilot uses AI. Check for mistakes.
Comment on lines +184 to +185
Thread.sleep(500);
Assertions.assertFalse(unsubscribeFuture.isDone());
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Thread.sleep() for test synchronization is unreliable and can lead to flaky tests. The test might pass on fast machines but fail on slow/loaded CI systems, or vice versa.

Consider using a more deterministic approach, such as checking unsubscribeFuture.isDone() in a loop with a timeout, or using CompletableFuture.get() with a timeout and asserting it throws TimeoutException.

Suggested change
Thread.sleep(500);
Assertions.assertFalse(unsubscribeFuture.isDone());
Assertions.assertThrows(
TimeoutException.class,
() -> unsubscribeFuture.get(300, TimeUnit.MILLISECONDS)
);

Copilot uses AI. Check for mistakes.
);
manager.update(id);

Thread.sleep(500); // Let update start
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Thread.sleep() for test synchronization is unreliable and can lead to flaky tests. On a heavily loaded system, the update might not start within 500ms, causing the test to pass incorrectly when it should verify timeout behavior.

Consider using a CountDownLatch to signal when the update has started (similar to the first test), ensuring the update is guaranteed to be in progress before testing the timeout.

Copilot uses AI. Check for mistakes.
Comment on lines 197 to 203
/**
* Stop a subscription on a GBFS feed (blocking version).
* This method blocks until the subscription's in-flight update completes.
*
* @param identifier An identifier returned by subscribe method.
* @param identifier An identifier returned by subscribe method
* @deprecated Use {@link #unsubscribeAsync(String)} for better async composition
*/
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation is misleading. The method doesn't always block until the update completes - it uses a 30-second timeout and will throw a RuntimeException (wrapping TimeoutException) if the update doesn't complete in time. The update will continue running in the background even after the timeout.

Consider updating the documentation to mention the timeout behavior and that the update may still be running when this method returns via exception.

Copilot uses AI. Check for mistakes.
Addresses Copilot review feedback from PR #248:

1. **Critical Fix: Race condition in future creation**
   - Future is now created and set on subscription BEFORE scheduling
   - Added setCurrentUpdate() method to GbfsSubscription interface
   - Manager calls setCurrentUpdate() before pool.execute()
   - This ensures getCurrentUpdate() sees the future even if task hasn't started

2. **Critical Fix: Thread-safety in update() method**
   - Added synchronized keyword to update() in both V2 and V3 subscriptions
   - Added synchronized to getCurrentUpdate() for visibility
   - Prevents concurrent updates from overwriting currentUpdate reference

3. **Implementation details**
   - update() now checks if future was pre-set by manager
   - Falls back to creating new future if called directly (backwards compatible)
   - Cleaner separation between manager orchestration and subscription execution

These fixes ensure that unsubscribe cannot miss in-flight updates due to
timing issues between task scheduling and execution.
@sonarqubecloud
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
0.0% Coverage on New Code (required ≥ 80%)
11.4% Duplication on New Code (required ≤ 3%)
B Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +122 to +123
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method overrides GbfsSubscription.update; it is advisable to add an Override annotation.

Suggested change
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly
@Override
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly

Copilot uses AI. Check for mistakes.
Comment on lines +119 to +120
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method overrides GbfsSubscription.update; it is advisable to add an Override annotation.

Suggested change
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly
@Override
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants