Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.sql.DataSource;
Expand All @@ -51,6 +53,7 @@
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;

/**
Expand Down Expand Up @@ -86,6 +89,20 @@ public class JDBCQuotaStore implements QuotaStore {
/** Max number of attempts we do to insert/update page stats in race-free mode */
int maxLoops = 100;

/** Max attempts in {@link #executeWithRetry(TransactionCallback)} before propagating the abort. */
int maxTransactionAttempts = 10;

/** Initial backoff between transaction retries, in milliseconds; doubles each retry, with full jitter. */
long initialTransactionBackoffMs = 10L;

private static final long MAX_TRANSACTION_BACKOFF_MS = 500L;

/** Oracle ORA-08176: consistent read failure; rollback data not available. */
private static final int ORA_08176 = 8176;

/** Oracle ORA-08177: can't serialize access for this transaction. */
private static final int ORA_08177 = 8177;

/** The executor used for asynch requests */
ExecutorService executor;

Expand Down Expand Up @@ -159,10 +176,10 @@ public void initialize() {
throw new IllegalStateException(
"Please provide both the sql dialect and the data " + "source before calling inizialize");
}
tt.executeWithoutResult(status -> {

// setup the tables if necessary
dialect.initializeTables(schema, jt);
// DDL must run outside the wrapping transaction: Oracle auto-commits it and a SERIALIZABLE
// read across the just-created indexes would abort with ORA-08176 on the first SELECT.
dialect.initializeTables(schema, jt);
executeWithRetry(status -> {

// get the existing table names
List<String> existingLayers = jt.query(dialect.getAllLayersQuery(schema), (rs, rowNum) -> rs.getString(1));
Expand Down Expand Up @@ -196,7 +213,7 @@ public void createLayer(String layerName) throws InterruptedException {
}

private void createLayerInternal(final String layerName) {
tt.executeWithoutResult(status -> {
executeWithRetry(status -> {
Set<TileSet> layerTileSets;
if (!GLOBAL_QUOTA_NAME.equals(layerName)) {
layerTileSets = calculator.getTileSetsFor(layerName);
Expand Down Expand Up @@ -276,14 +293,14 @@ private Quota nonNullQuota(Quota optionalQuota) {

@Override
public void deleteLayer(final String layerName) {
tt.executeWithoutResult(status -> {
executeWithRetry(status -> {
deleteLayerInternal(layerName);
});
}

@Override
public void deleteGridSubset(final String layerName, final String gridSetId) {
tt.executeWithoutResult(status -> {
executeWithRetry(status -> {
// get the disk quota used by the layer gridset
Quota quota = getUsedQuotaByLayerGridset(layerName, gridSetId);
// we will subtracting the current disk quota value
Expand All @@ -305,7 +322,7 @@ public void deleteGridSubset(final String layerName, final String gridSetId) {

public void deleteLayerInternal(final String layerName) {
getUsedQuotaByLayerName(layerName);
tt.executeWithoutResult(status -> {
executeWithRetry(status -> {
// update the global quota
Quota quota = getUsedQuotaByLayerName(layerName);
quota.setBytes(quota.getBytes().negate());
Expand All @@ -324,7 +341,7 @@ public void deleteLayerInternal(final String layerName) {

@Override
public void renameLayer(final String oldLayerName, final String newLayerName) throws InterruptedException {
tt.executeWithoutResult(status -> {
executeWithRetry(status -> {
String sql = dialect.getRenameLayerStatement(schema, "oldName", "newName");
Map<String, Object> params = new HashMap<>();
params.put("oldName", oldLayerName);
Expand Down Expand Up @@ -429,7 +446,7 @@ public TilePageCalculator getTilePageCalculator() {
public void addToQuotaAndTileCounts(
final TileSet tileSet, final Quota quotaDiff, final Collection<PageStatsPayload> tileCountDiffs)
throws InterruptedException {
tt.executeWithoutResult(status -> {
executeWithRetry(status -> {
getOrCreateTileSet(tileSet);
updateQuotas(tileSet, quotaDiff);

Expand Down Expand Up @@ -609,7 +626,7 @@ private PageStats getPageStats(String pageStatsKey) {
@Override
@SuppressWarnings("unchecked")
public Future<List<PageStats>> addHitsAndSetAccesTime(final Collection<PageStatsPayload> statsUpdates) {
return executor.submit(() -> (List<PageStats>) tt.execute(new QuotaStoreCallback(statsUpdates)));
return executor.submit(() -> (List<PageStats>) executeWithRetry(new QuotaStoreCallback(statsUpdates)));
}

@Override
Expand Down Expand Up @@ -651,7 +668,7 @@ private TilePage getSinglePage(Set<String> layerNames, boolean leastFrequentlyUs

@Override
public PageStats setTruncated(final TilePage page) throws InterruptedException {
return (PageStats) tt.execute((TransactionCallback<Object>) status -> {
return (PageStats) executeWithRetry((TransactionCallback<Object>) status -> {
if (log.isLoggable(Level.FINE)) {
log.info("Truncating page " + page);
}
Expand Down Expand Up @@ -693,6 +710,88 @@ public void close() throws Exception {
jt = null;
}

/**
* Runs {@code action} in a SERIALIZABLE transaction, retrying on concurrency aborts with bounded exponential
* backoff. If the call is already nested inside an active transaction the retry loop is skipped: Spring's
* {@code PROPAGATION_REQUIRED} would reuse the same stale snapshot, so only the outermost call can recover.
*/
private <T> T executeWithRetry(TransactionCallback<T> action) {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
return tt.execute(action);
}
long backoff = initialTransactionBackoffMs;
for (int attempt = 1; ; attempt++) {
try {
return tt.execute(action);
} catch (DataAccessException e) {
if (!isTransactionAbort(e)) {
throw e;
}
if (attempt >= maxTransactionAttempts) {
log.log(
Level.WARNING,
"DiskQuota transaction failed after " + attempt + " attempts: " + e.getMessage(),
e);
throw e;
}
long sleep = backoff + ThreadLocalRandom.current().nextLong(backoff);
try {
Thread.sleep(sleep);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw e;
}
if (log.isLoggable(Level.FINE)) {
log.fine("DiskQuota transaction conflict on attempt "
+ attempt
+ "/"
+ maxTransactionAttempts
+ ", retrying after "
+ sleep
+ "ms: "
+ e.getMessage());
}
backoff = Math.min(backoff * 2, MAX_TRANSACTION_BACKOFF_MS);
}
}
}

/** Void variant of {@link #executeWithRetry(TransactionCallback)}. */
private void executeWithRetry(Consumer<TransactionStatus> action) {
executeWithRetry((TransactionCallback<Void>) status -> {
action.accept(status);
return null;
});
}

/**
* Walks the cause chain looking for a retryable concurrency abort. Spring's translator alone is not enough:
* SQLSTATE class {@code 40} catches HSQL's bare {@link ConcurrencyFailureException}, and Oracle vendor codes 8176
* and 8177 are needed because Spring leaves 8176 uncategorized and routes 8177 to a deprecated sibling of
* {@link PessimisticLockingFailureException}.
*/
private static boolean isTransactionAbort(Throwable t) {
for (Throwable cause = t; cause != null; cause = cause.getCause()) {
if (cause instanceof PessimisticLockingFailureException) {
return true;
}
if (cause instanceof SQLException sqlException) {
String sqlState = sqlException.getSQLState();
if (sqlState != null && sqlState.startsWith("40")) {
return true;
}
if (isRetryableOracleCode(sqlException.getErrorCode())) {
return true;
}
}
}
return false;
}

private static boolean isRetryableOracleCode(int errorCode) {
return errorCode == ORA_08176 || errorCode == ORA_08177;
}

/**
* Maps a BigDecimal column into a Quota object
*
Expand Down Expand Up @@ -752,7 +851,7 @@ public TilePage mapRow(ResultSet rs, int rowNum) throws SQLException {

@Override
public void deleteParameters(final String layerName, final String parametersId) {
tt.executeWithoutResult(status -> {
executeWithRetry(status -> {
// first gather the disk quota used by the gridset, and update the global
// quota
Quota quota = getUsedQuotaByParametersId(parametersId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package org.geowebcache.diskquota.jdbc;

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

/**
Expand Down Expand Up @@ -60,7 +63,8 @@ BYTES NUMBER(%d) DEFAULT 0 NOT NULL
"""
CREATE TABLE ${schema}TILEPAGE (
KEY VARCHAR(%d) PRIMARY KEY,
TILESET_ID VARCHAR(%d) REFERENCES ${schema}TILESET(KEY) ON DELETE CASCADE,
TILESET_ID VARCHAR(%d) REFERENCES ${schema}TILESET(KEY) ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED,
PAGE_Z SMALLINT,
PAGE_X INTEGER,
PAGE_Y INTEGER,
Expand All @@ -84,37 +88,46 @@ protected void addEmtpyTableReference(StringBuilder sb) {
}

/**
* No-op: Oracle does not support {@code ON UPDATE CASCADE} on foreign keys, so there is nothing portable to
* migrate. Companion to {@link #getRenameLayerStatement(String, String, String)}, which preserves the legacy
* LAYER_NAME-only behavior on this dialect.
* Oracle does not support {@code ON UPDATE CASCADE}, so the FK is migrated to {@code DEFERRABLE INITIALLY DEFERRED}
* instead. Deferring the check to commit time also drops the per-INSERT snapshot read on TILESET that triggers
* ORA-08176 under SERIALIZABLE.
*/
@Override
public void migrateForeignKeys(String schema, SimpleJdbcTemplate template) {
// intentional no-op
protected boolean tilepageFkIsMigrated(ResultSet rs) throws SQLException {
return rs.getShort("DEFERRABILITY") == DatabaseMetaData.importedKeyInitiallyDeferred;
}

@Override
protected String tilepageFkAddSql(String prefixedTilepageName, String prefix) {
return """
ALTER TABLE %s ADD FOREIGN KEY (TILESET_ID)
REFERENCES %sTILESET(KEY)
ON DELETE CASCADE
DEFERRABLE INITIALLY DEFERRED
"""
.formatted(prefixedTilepageName, prefix);
}

/**
* Oracle does not support {@code ON UPDATE CASCADE} on foreign keys, so the {@code TILEPAGE.TILESET_ID -> TILESET
* .KEY} FK declared above only cascades on delete. As a result this dialect cannot safely rewrite {@code TILESET
* .KEY} during a rename without first dealing with the dangling {@code TILEPAGE} rows.
*
* <p>For now Oracle keeps the legacy behavior of only updating {@code LAYER_NAME}; lookups by id against the
* renamed layer will continue to miss the row and cause {@code getOrCreateTileSet} to insert duplicates. Fixing
* this on Oracle (e.g. via {@code DEFERRABLE INITIALLY DEFERRED} constraints, or by disabling the FK around the
* rename) is tracked separately.
* PL/SQL anonymous block that rewrites TILESET.KEY and TILEPAGE.TILESET_ID together; the deferred FK is checked
* once at commit with both updates in place. Oracle has no {@code ON UPDATE CASCADE}, hence the manual rewrite, and
* no SQL-standard {@code SUBSTRING ... FROM POSITION(...)}, hence {@code SUBSTR}/{@code INSTR}.
*/
@Override
public String getRenameLayerStatement(String schema, String oldLayerName, String newLayerName) {
StringBuilder sb = new StringBuilder("UPDATE ");
if (schema != null) {
sb.append(schema).append(".");
}
sb.append("TILESET SET LAYER_NAME = :")
.append(newLayerName)
.append(" WHERE LAYER_NAME = :")
.append(oldLayerName);

return sb.toString();
String prefix = schema == null ? "" : schema + ".";
return """
BEGIN
UPDATE %sTILESET
SET KEY = :%s || SUBSTR(KEY, INSTR(KEY, '#')),
LAYER_NAME = :%s
WHERE LAYER_NAME = :%s;
UPDATE %sTILEPAGE
SET TILESET_ID = :%s || SUBSTR(TILESET_ID, INSTR(TILESET_ID, '#'))
WHERE TILESET_ID LIKE :%s || '#%%';
END;
"""
.formatted(prefix, newLayerName, newLayerName, oldLayerName, prefix, newLayerName, oldLayerName);
}

@Override
Expand Down
Loading
Loading