Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicates;
Expand Down Expand Up @@ -496,7 +496,7 @@ public void initializeComplete() {
future.complete(newledger);
// May need to trigger offloading
if (config.isTriggerOffloadOnTopicLoad()) {
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
newledger.maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,19 @@ public Logger getLogger() {
protected final CallbackMutex trimmerMutex = new CallbackMutex();

protected final CallbackMutex offloadMutex = new CallbackMutex();
public static final CompletableFuture<Position> NULL_OFFLOAD_PROMISE = CompletableFuture
// Automatic offload has no caller-visible future. Coalesce concurrent automatic triggers into at most one
// running offload and one follow-up run.
private final AtomicBoolean automaticOffloadInProgress = new AtomicBoolean(false);
private final AtomicBoolean automaticOffloadRerunRequested = new AtomicBoolean(false);
// Identity sentinel for automatic offload requests. The completed Position value is not used.
public static final CompletableFuture<Position> AUTOMATIC_OFFLOAD_TRIGGER = CompletableFuture
.completedFuture(PositionFactory.LATEST);

private enum OffloadRequestSource {
AUTOMATIC,
EXPLICIT
}

@VisibleForTesting
@Getter
protected volatile LedgerHandle currentLedger;
Expand Down Expand Up @@ -1968,7 +1979,7 @@ synchronized void ledgerClosed(final LedgerHandle lh, Long lastAddConfirmed) {

trimConsumedLedgersInBackground();

maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER);

createLedgerAfterClosed();
}
Expand Down Expand Up @@ -2804,22 +2815,66 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
}

public void maybeOffloadInBackground(CompletableFuture<Position> promise) {
if (getOffloadPoliciesIfAppendable().isEmpty()) {
if (promise == AUTOMATIC_OFFLOAD_TRIGGER) {
if (!automaticOffloadInProgress.compareAndSet(false, true)) {
automaticOffloadRerunRequested.set(true);
return;
}
CompletableFuture<Position> automaticOffloadCompletion = new CompletableFuture<>();
automaticOffloadCompletion.whenComplete((res, ex) -> finishAutomaticOffload(ex));
maybeOffloadInBackground(automaticOffloadCompletion, OffloadRequestSource.AUTOMATIC);
return;
}

maybeOffloadInBackground(promise, OffloadRequestSource.EXPLICIT);
}

private void maybeOffloadInBackground(CompletableFuture<Position> promise, OffloadRequestSource source) {
Optional<Pair<Long, Long>> offloadThresholds = getOffloadThresholds();
if (offloadThresholds.isEmpty()) {
// Explicit callers keep the previous no-completion behavior. The internal automatic completion must be
// finished so automaticOffloadInProgress can be cleared.
if (source == OffloadRequestSource.AUTOMATIC) {
promise.complete(PositionFactory.LATEST);
}
return;
}

final OffloadPolicies policies = config.getLedgerOffloader().getOffloadPolicies();
Pair<Long, Long> thresholds = offloadThresholds.get();
executor.execute(() -> maybeOffload(thresholds.getLeft(), thresholds.getRight(), promise,
source));
}

private Optional<Pair<Long, Long>> getOffloadThresholds() {
Optional<OffloadPolicies> optionalOffloadPolicies = getOffloadPoliciesIfAppendable();
if (optionalOffloadPolicies.isEmpty()) {
return Optional.empty();
}

final OffloadPolicies policies = optionalOffloadPolicies.get();
final long offloadThresholdInBytes =
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
final long offloadThresholdInSeconds =
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
executor.execute(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise));
return Optional.of(Pair.of(offloadThresholdInBytes, offloadThresholdInSeconds));
}

return Optional.empty();
}

private void finishAutomaticOffload(Throwable exception) {
if (exception != null) {
log.warn().exception(exception).log("Failed to automatically offload ledgers");
}
automaticOffloadInProgress.set(false);
if (automaticOffloadRerunRequested.getAndSet(false)) {
maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER);
}
}

private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
CompletableFuture<Position> finalPromise) {
CompletableFuture<Position> finalPromise, OffloadRequestSource source) {
if (getOffloadPoliciesIfAppendable().isEmpty()) {
String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
finalPromise.completeExceptionally(new IllegalArgumentException(msg));
Expand All @@ -2834,7 +2889,7 @@ private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInS
}

if (!offloadMutex.tryLock()) {
scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise),
scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise, source),
100, TimeUnit.MILLISECONDS);
return;
}
Expand Down Expand Up @@ -2926,12 +2981,11 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {

private Optional<OffloadPolicies> getOffloadPoliciesIfAppendable() {
LedgerOffloader ledgerOffloader = config.getLedgerOffloader();
if (ledgerOffloader == null
|| !ledgerOffloader.isAppendable()
|| ledgerOffloader.getOffloadPolicies() == null) {
if (ledgerOffloader == null || !ledgerOffloader.isAppendable()) {
return Optional.empty();
}
return Optional.ofNullable(ledgerOffloader.getOffloadPolicies());
OffloadPolicies offloadPolicies = ledgerOffloader.getOffloadPolicies();
return Optional.ofNullable(offloadPolicies);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,128 @@ public CompletableFuture<Void> offload(ReadHandle ledger,
}
}

@Test
public void automaticOffloadTriggersAreCoalescedWhileOffloadInProgress() throws Exception {
CompletableFuture<Void> slowOffload = new CompletableFuture<>();
CountDownLatch offloadRunning = new CountDownLatch(1);
AtomicInteger offloadPolicyCalls = new AtomicInteger();
MockLedgerOffloader offloader = new MockLedgerOffloader() {
@Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uuid,
Map<String, String> extraMetadata) {
offloadRunning.countDown();
return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata));
}

@Override
public OffloadPoliciesImpl getOffloadPolicies() {
offloadPolicyCalls.incrementAndGet();
return super.getOffloadPolicies();
}
};

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config);

for (int i = 0; i < 25; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}
assertTrue(offloadRunning.await(5, TimeUnit.SECONDS));

int callsBeforeRepeatedTriggers = offloadPolicyCalls.get();
for (int i = 0; i < 20; i++) {
ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER);
}

Thread.sleep(300);
assertTrue(offloadPolicyCalls.get() < callsBeforeRepeatedTriggers + 5,
"Repeated automatic triggers should not create independent retry loops");

slowOffload.complete(null);

assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1)));
}

@Test
public void automaticOffloadRunsAgainForCoalescedTrigger() throws Exception {
CompletableFuture<Void> slowOffload = new CompletableFuture<>();
CountDownLatch offloadRunning = new CountDownLatch(1);
MockLedgerOffloader offloader = new MockLedgerOffloader() {
@Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uuid,
Map<String, String> extraMetadata) {
offloadRunning.countDown();
return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata));
}
};

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config);

for (int i = 0; i < 11; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}
assertTrue(offloadRunning.await(5, TimeUnit.SECONDS));

// The next ledger closes after the first automatic scan, so it depends on the coalesced rerun.
for (int i = 11; i < 21; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}
assertEquals(offloader.offloadedLedgers().size(), 0);

slowOffload.complete(null);

assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1)));
}

@Test
public void automaticOffloadWithoutThresholdDoesNotBlockLaterTriggers() throws Exception {
MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(-1L);
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config);

for (int i = 0; i < 25; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}
ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER);
assertEquals(offloader.offloadedLedgers().size(), 0);

// A disabled automatic trigger must complete internally so a later enabled trigger can run.
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L);
ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER);

assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1)));
}

@DataProvider(name = "offloadAsSoonAsClosed")
public Object[][] offloadAsSoonAsClosedProvider() {
return new Object[][]{
Expand Down