Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 69 additions & 3 deletions src/main/java/org/entur/gbfs/GbfsSubscriptionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.entur.gbfs.loader.GbfsSubscription;
import org.entur.gbfs.loader.v2.GbfsV2Delivery;
Expand All @@ -42,6 +45,9 @@ public class GbfsSubscriptionManager {

private ForkJoinPool customThreadPool;

// Default timeout for unsubscribe wait
private static final long DEFAULT_UNSUBSCRIBE_TIMEOUT_MS = 30_000; // 30 seconds

public GbfsSubscriptionManager() {}

public GbfsSubscriptionManager(ForkJoinPool customThreadPool) {
Expand Down Expand Up @@ -138,19 +144,79 @@ public void update(String identifier) {
* @param subscription Subscription which should be updated
*/
private void update(GbfsSubscription subscription) {
// Create the future BEFORE scheduling to avoid race condition where
// unsubscribe is called before the async task starts executing.
// This ensures getCurrentUpdate() will see it even if task hasn't started yet.
CompletableFuture<Void> updateFuture = new CompletableFuture<>();
subscription.setCurrentUpdate(updateFuture);

Optional
.ofNullable(customThreadPool)
.orElse(ForkJoinPool.commonPool())
.execute(subscription::update);
}

/**
* Stop a subscription on a GBFS feed
* Stop a subscription on a GBFS feed asynchronously.
* Returns a CompletableFuture that completes when any in-flight update finishes.
* Uses the default timeout of 30 seconds.
*
* @param identifier An identifier returned by subscribe method.
* @param identifier An identifier returned by subscribe method
* @return CompletableFuture that completes when it's safe to clean up caches
*/
public CompletableFuture<Void> unsubscribeAsync(String identifier) {
return unsubscribeAsync(
identifier,
DEFAULT_UNSUBSCRIBE_TIMEOUT_MS,
TimeUnit.MILLISECONDS
);
}

/**
* Stop a subscription on a GBFS feed asynchronously with a custom timeout.
* Returns a CompletableFuture that completes when any in-flight update finishes.
* The future will complete exceptionally with TimeoutException if the timeout expires.
*
* @param identifier An identifier returned by subscribe method
* @param timeout Maximum time to wait for in-flight updates
* @param unit Time unit for the timeout
* @return CompletableFuture that completes when it's safe to clean up caches
*/
public CompletableFuture<Void> unsubscribeAsync(
String identifier,
long timeout,
TimeUnit unit
) {
// Remove from subscriptions map first to prevent new updates
GbfsSubscription subscription = subscriptions.remove(identifier);

if (subscription == null) {
// Already unsubscribed - return completed future
return CompletableFuture.completedFuture(null);
}

// Get the subscription's current update future and apply timeout
// This is the key insight: no polling needed, just wait on the subscription's future!
return subscription.getCurrentUpdate().orTimeout(timeout, unit);
}

/**
* Stop a subscription on a GBFS feed (blocking version).
* This method blocks until the subscription's in-flight update completes.
*
* @param identifier An identifier returned by subscribe method
* @deprecated Use {@link #unsubscribeAsync(String)} for better async composition
*/
@Deprecated
public void unsubscribe(String identifier) {
subscriptions.remove(identifier);
try {
unsubscribeAsync(identifier).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while unsubscribing", e);
} catch (ExecutionException e) {
throw new RuntimeException("Error while unsubscribing", e);
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message "Error while unsubscribing" is not helpful for debugging. The ExecutionException wraps the actual exception from the update (or TimeoutException). Consider unwrapping and including more context, such as:

throw new RuntimeException("Error waiting for in-flight update to complete during unsubscribe", e.getCause() != null ? e.getCause() : e);

Or handle TimeoutException separately to provide a more specific message.

Suggested change
throw new RuntimeException("Error while unsubscribing", e);
Throwable cause = e.getCause();
if (cause instanceof java.util.concurrent.TimeoutException) {
throw new RuntimeException("Timeout while waiting for in-flight update to complete during unsubscribe", cause);
} else {
throw new RuntimeException("Error waiting for in-flight update to complete during unsubscribe", cause != null ? cause : e);
}

Copilot uses AI. Check for mistakes.
}
}

private String subscribe(GbfsSubscription subscription) {
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/org/entur/gbfs/loader/GbfsSubscription.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
package org.entur.gbfs.loader;

import java.util.concurrent.CompletableFuture;

public interface GbfsSubscription {
void init();

boolean getSetupComplete();

void update();

/**
* Prepares the subscription for an update by setting the CompletableFuture that will track it.
* This must be called by the manager before scheduling the update to avoid race conditions
* where unsubscribe is called before the async update task starts.
*
* @param future The future that will complete when the update finishes
*/
void setCurrentUpdate(CompletableFuture<Void> future);

/**
* Get the CompletableFuture for the currently executing update, if any.
* Returns a completed future if no update is in progress.
*
* @return CompletableFuture that completes when the current update finishes
*/
CompletableFuture<Void> getCurrentUpdate();
}
49 changes: 47 additions & 2 deletions src/main/java/org/entur/gbfs/loader/v2/GbfsV2Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.entur.gbfs.GbfsSubscriptionOptions;
import org.entur.gbfs.SubscriptionUpdateInterceptor;
Expand Down Expand Up @@ -58,6 +59,19 @@ public class GbfsV2Subscription implements GbfsSubscription {
private final SubscriptionUpdateInterceptor updateInterceptor;
private GbfsV2Loader loader;

// Track current update as a Future to enable waiting for in-flight updates during unsubscribe
private volatile CompletableFuture<Void> currentUpdate = null;

/**
* Sets the future that will track the next update. This must be called before
* the update is scheduled to avoid race conditions where unsubscribe is called
* before the async update task starts.
*/
@Override
public synchronized void setCurrentUpdate(CompletableFuture<Void> future) {
this.currentUpdate = future;
}

public GbfsV2Subscription(
GbfsSubscriptionOptions subscriptionOptions,
Consumer<GbfsV2Delivery> consumer
Expand Down Expand Up @@ -101,9 +115,18 @@ public boolean getSetupComplete() {

/**
* Update the subscription by updating the loader and push a new delivery
* to the consumer if the update had changes
* to the consumer if the update had changes.
*
* This method is thread-safe and ensures only one update runs at a time.
*/
public void update() {
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly
Comment on lines +122 to +123
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method overrides GbfsSubscription.update; it is advisable to add an Override annotation.

Suggested change
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly
@Override
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly

Copilot uses AI. Check for mistakes.
CompletableFuture<Void> updateFuture = currentUpdate;
if (updateFuture == null) {
updateFuture = new CompletableFuture<>();
currentUpdate = updateFuture;
}

if (updateInterceptor != null) {
updateInterceptor.beforeUpdate();
}
Expand All @@ -130,13 +153,19 @@ public void update() {
);
consumer.accept(delivery);
}
// Complete the future on success
updateFuture.complete(null);
} catch (RuntimeException e) {
// Complete exceptionally on error
updateFuture.completeExceptionally(e);
LOG.error("Exception occurred during update", e);
throw e;
} finally {
if (updateInterceptor != null) {
updateInterceptor.afterUpdate();
}
// Clear the reference when done
currentUpdate = null;
}
}

Expand All @@ -154,4 +183,20 @@ private ValidationResult validateFeeds() {
GbfsValidator validator = GbfsValidatorFactory.getGbfsJsonValidator();
return validator.validate(feeds);
}

/**
* Get the CompletableFuture for the currently executing update, if any.
* Returns a completed future if no update is in progress.
*
* This method is synchronized to ensure visibility of updates scheduled
* by the manager but not yet started on the ForkJoinPool.
*
* @return CompletableFuture that completes when the current update finishes
*/
@Override
public synchronized CompletableFuture<Void> getCurrentUpdate() {
CompletableFuture<Void> current = currentUpdate;
// Return current update future, or a completed future if none in progress
return current != null ? current : CompletableFuture.completedFuture(null);
}
}
49 changes: 47 additions & 2 deletions src/main/java/org/entur/gbfs/loader/v3/GbfsV3Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.entur.gbfs.GbfsSubscriptionOptions;
import org.entur.gbfs.SubscriptionUpdateInterceptor;
Expand Down Expand Up @@ -56,6 +57,19 @@ public class GbfsV3Subscription implements GbfsSubscription {
private final SubscriptionUpdateInterceptor updateInterceptor;
private GbfsV3Loader loader;

// Track current update as a Future to enable waiting for in-flight updates during unsubscribe
private volatile CompletableFuture<Void> currentUpdate = null;

/**
* Sets the future that will track the next update. This must be called before
* the update is scheduled to avoid race conditions where unsubscribe is called
* before the async update task starts.
*/
@Override
public synchronized void setCurrentUpdate(CompletableFuture<Void> future) {
this.currentUpdate = future;
}

public GbfsV3Subscription(
GbfsSubscriptionOptions subscriptionOptions,
Consumer<GbfsV3Delivery> consumer
Expand Down Expand Up @@ -98,9 +112,18 @@ public boolean getSetupComplete() {

/**
* Update the subscription by updating the loader and push a new delivery
* to the consumer if the update had changes
* to the consumer if the update had changes.
*
* This method is thread-safe and ensures only one update runs at a time.
*/
public void update() {
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly
Comment on lines +119 to +120
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method overrides GbfsSubscription.update; it is advisable to add an Override annotation.

Suggested change
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly
@Override
public synchronized void update() {
// Use the future set by the manager, or create a new one if called directly

Copilot uses AI. Check for mistakes.
CompletableFuture<Void> updateFuture = currentUpdate;
if (updateFuture == null) {
updateFuture = new CompletableFuture<>();
currentUpdate = updateFuture;
}

if (updateInterceptor != null) {
updateInterceptor.beforeUpdate();
}
Expand All @@ -125,13 +148,19 @@ public void update() {
);
consumer.accept(delivery);
}
// Complete the future on success
updateFuture.complete(null);
} catch (RuntimeException e) {
// Complete exceptionally on error
updateFuture.completeExceptionally(e);
LOG.error("Exception occurred during update", e);
throw e;
} finally {
if (updateInterceptor != null) {
updateInterceptor.afterUpdate();
}
// Clear the reference when done
currentUpdate = null;
}
}

Expand All @@ -149,4 +178,20 @@ private ValidationResult validateFeeds() {
GbfsValidator validator = GbfsValidatorFactory.getGbfsJsonValidator();
return validator.validate(feeds);
}

/**
* Get the CompletableFuture for the currently executing update, if any.
* Returns a completed future if no update is in progress.
*
* This method is synchronized to ensure visibility of updates scheduled
* by the manager but not yet started on the ForkJoinPool.
*
* @return CompletableFuture that completes when the current update finishes
*/
@Override
public synchronized CompletableFuture<Void> getCurrentUpdate() {
CompletableFuture<Void> current = currentUpdate;
// Return current update future, or a completed future if none in progress
return current != null ? current : CompletableFuture.completedFuture(null);
}
}
Loading
Loading