-
Notifications
You must be signed in to change notification settings - Fork 4
Fix race condition in unsubscribe by waiting for in-flight updates #248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,28 @@ | ||
| package org.entur.gbfs.loader; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| public interface GbfsSubscription { | ||
| void init(); | ||
|
|
||
| boolean getSetupComplete(); | ||
|
|
||
| void update(); | ||
|
|
||
| /** | ||
| * Prepares the subscription for an update by setting the CompletableFuture that will track it. | ||
| * This must be called by the manager before scheduling the update to avoid race conditions | ||
| * where unsubscribe is called before the async update task starts. | ||
| * | ||
| * @param future The future that will complete when the update finishes | ||
| */ | ||
| void setCurrentUpdate(CompletableFuture<Void> future); | ||
|
|
||
| /** | ||
| * Get the CompletableFuture for the currently executing update, if any. | ||
| * Returns a completed future if no update is in progress. | ||
| * | ||
| * @return CompletableFuture that completes when the current update finishes | ||
| */ | ||
| CompletableFuture<Void> getCurrentUpdate(); | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |||||||||||
| import java.util.Arrays; | ||||||||||||
| import java.util.HashMap; | ||||||||||||
| import java.util.Map; | ||||||||||||
| import java.util.concurrent.CompletableFuture; | ||||||||||||
| import java.util.function.Consumer; | ||||||||||||
| import org.entur.gbfs.GbfsSubscriptionOptions; | ||||||||||||
| import org.entur.gbfs.SubscriptionUpdateInterceptor; | ||||||||||||
|
|
@@ -58,6 +59,19 @@ public class GbfsV2Subscription implements GbfsSubscription { | |||||||||||
| private final SubscriptionUpdateInterceptor updateInterceptor; | ||||||||||||
| private GbfsV2Loader loader; | ||||||||||||
|
|
||||||||||||
| // Track current update as a Future to enable waiting for in-flight updates during unsubscribe | ||||||||||||
| private volatile CompletableFuture<Void> currentUpdate = null; | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * Sets the future that will track the next update. This must be called before | ||||||||||||
| * the update is scheduled to avoid race conditions where unsubscribe is called | ||||||||||||
| * before the async update task starts. | ||||||||||||
| */ | ||||||||||||
| @Override | ||||||||||||
| public synchronized void setCurrentUpdate(CompletableFuture<Void> future) { | ||||||||||||
| this.currentUpdate = future; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| public GbfsV2Subscription( | ||||||||||||
| GbfsSubscriptionOptions subscriptionOptions, | ||||||||||||
| Consumer<GbfsV2Delivery> consumer | ||||||||||||
|
|
@@ -101,9 +115,18 @@ public boolean getSetupComplete() { | |||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * Update the subscription by updating the loader and push a new delivery | ||||||||||||
| * to the consumer if the update had changes | ||||||||||||
| * to the consumer if the update had changes. | ||||||||||||
| * | ||||||||||||
| * This method is thread-safe and ensures only one update runs at a time. | ||||||||||||
| */ | ||||||||||||
| public void update() { | ||||||||||||
| public synchronized void update() { | ||||||||||||
| // Use the future set by the manager, or create a new one if called directly | ||||||||||||
|
Comment on lines
+122
to
+123
|
||||||||||||
| public synchronized void update() { | |
| // Use the future set by the manager, or create a new one if called directly | |
| @Override | |
| public synchronized void update() { | |
| // Use the future set by the manager, or create a new one if called directly |
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |||||||||||
| import java.util.Arrays; | ||||||||||||
| import java.util.HashMap; | ||||||||||||
| import java.util.Map; | ||||||||||||
| import java.util.concurrent.CompletableFuture; | ||||||||||||
| import java.util.function.Consumer; | ||||||||||||
| import org.entur.gbfs.GbfsSubscriptionOptions; | ||||||||||||
| import org.entur.gbfs.SubscriptionUpdateInterceptor; | ||||||||||||
|
|
@@ -56,6 +57,19 @@ public class GbfsV3Subscription implements GbfsSubscription { | |||||||||||
| private final SubscriptionUpdateInterceptor updateInterceptor; | ||||||||||||
| private GbfsV3Loader loader; | ||||||||||||
|
|
||||||||||||
| // Track current update as a Future to enable waiting for in-flight updates during unsubscribe | ||||||||||||
| private volatile CompletableFuture<Void> currentUpdate = null; | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * Sets the future that will track the next update. This must be called before | ||||||||||||
| * the update is scheduled to avoid race conditions where unsubscribe is called | ||||||||||||
| * before the async update task starts. | ||||||||||||
| */ | ||||||||||||
| @Override | ||||||||||||
| public synchronized void setCurrentUpdate(CompletableFuture<Void> future) { | ||||||||||||
| this.currentUpdate = future; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| public GbfsV3Subscription( | ||||||||||||
| GbfsSubscriptionOptions subscriptionOptions, | ||||||||||||
| Consumer<GbfsV3Delivery> consumer | ||||||||||||
|
|
@@ -98,9 +112,18 @@ public boolean getSetupComplete() { | |||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * Update the subscription by updating the loader and push a new delivery | ||||||||||||
| * to the consumer if the update had changes | ||||||||||||
| * to the consumer if the update had changes. | ||||||||||||
| * | ||||||||||||
| * This method is thread-safe and ensures only one update runs at a time. | ||||||||||||
| */ | ||||||||||||
| public void update() { | ||||||||||||
| public synchronized void update() { | ||||||||||||
| // Use the future set by the manager, or create a new one if called directly | ||||||||||||
|
Comment on lines
+119
to
+120
|
||||||||||||
| public synchronized void update() { | |
| // Use the future set by the manager, or create a new one if called directly | |
| @Override | |
| public synchronized void update() { | |
| // Use the future set by the manager, or create a new one if called directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message "Error while unsubscribing" is not helpful for debugging. The ExecutionException wraps the actual exception from the update (or TimeoutException). Consider unwrapping and including more context, such as:
Or handle TimeoutException separately to provide a more specific message.