Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,55 @@ public class DataFileValue {
private final long size;
private final long numEntries;
private long time = -1;
private boolean shared;

public DataFileValue(long size, long numEntries, long time, boolean shared) {
this.size = size;
this.numEntries = numEntries;
this.time = time;
this.shared = shared;
}

public DataFileValue(long size, long numEntries, boolean shared) {
this.size = size;
this.numEntries = numEntries;
this.time = -1;
this.shared = shared;
}

public DataFileValue(long size, long numEntries, long time) {
this.size = size;
this.numEntries = numEntries;
this.time = time;
this.shared = false;
}

public DataFileValue(long size, long numEntries) {
this.size = size;
this.numEntries = numEntries;
this.time = -1;
this.shared = false;
}

public DataFileValue(String encodedDFV) {
String[] ba = encodedDFV.split(",");

size = Long.parseLong(ba[0]);
numEntries = Long.parseLong(ba[1]);

if (ba.length == 3) {
time = Long.parseLong(ba[2]);
} else {
time = -1;
time = -1;
shared = false;

if (ba.length >= 3) {
// Field 3 could be either time (old format) or shared (new format)
// Try to parse as time first (for backward compatibility)
try {
time = Long.parseLong(ba[2]);
} catch (NumberFormatException e) {
shared = Boolean.parseBoolean(ba[2]);
}
}
if (ba.length == 4) {
time = Long.parseLong(ba[3]);
}
}

Expand All @@ -74,15 +100,19 @@ public long getTime() {
return time;
}

public boolean isShared() {
return shared;
}

public byte[] encode() {
return encodeAsString().getBytes(UTF_8);
}

public String encodeAsString() {
if (time >= 0) {
return ("" + size + "," + numEntries + "," + time);
return ("" + size + "," + numEntries + "," + shared + "," + time);
}
return ("" + size + "," + numEntries);
return ("" + size + "," + numEntries + "," + shared);
}

public Value encodeAsValue() {
Expand All @@ -93,7 +123,8 @@ public Value encodeAsValue() {
public boolean equals(Object o) {
if (o instanceof DataFileValue odfv) {

return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time;
return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time
&& shared == odfv.shared;
}

return false;
Expand All @@ -116,6 +147,10 @@ public void setTime(long time) {
this.time = time;
}

public void setShared(boolean shared) {
this.shared = shared;
}

/**
* @return true if {@link #wrapFileIterator} would wrap a given iterator, false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,48 @@ public static void deleteTable(TableId tableId, boolean insertDeletes, ServerCon
return new Pair<>(result, sizes);
}

private static void markSourceFilesAsShared(TableId srcTableId, TableId tableId,
AccumuloClient client, BatchWriter bw) throws MutationsRejectedException {

log.info("Marking source table {} files as shared after clone to {}", srcTableId, tableId);

try (TabletsMetadata srcTM = createCloneScanner(null, srcTableId, client)) {
for (TabletMetadata srcTablet : srcTM) {

if (srcTablet.getFiles().isEmpty()) {
continue;
}

Mutation m = new Mutation(srcTablet.getExtent().toMetaRow());
boolean hasChanges = false;

// Update each file's DataFileValue to mark as shared
for (var entry : srcTablet.getFilesMap().entrySet()) {
StoredTabletFile file = entry.getKey();
DataFileValue dfv = entry.getValue();

if (!dfv.isShared()) {
DataFileValue sharedDfv = new DataFileValue(dfv.getSize(), dfv.getNumEntries(),
dfv.isTimeSet() ? dfv.getTime() : -1, true);

m.put(DataFileColumnFamily.NAME, file.getMetadataText(), sharedDfv.encodeAsValue());
hasChanges = true;

log.debug("Marking file {} as shared in source tablet {}", file.getFileName(),
srcTablet.getExtent());
}
}

if (hasChanges) {
bw.addMutation(m);
}
}
}

bw.flush();
log.info("Finished marking source table {} files as shared", srcTableId);
}

private static Mutation createCloneMutation(TableId srcTableId, TableId tableId,
Iterable<Entry<Key,Value>> tablet) {

Expand All @@ -191,7 +233,13 @@ private static Mutation createCloneMutation(TableId srcTableId, TableId tableId,
if (!cf.startsWith("../") && !cf.contains(":")) {
cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
}
m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue());

DataFileValue ogVal = new DataFileValue(entry.getValue().get());
DataFileValue newSharedVal = new DataFileValue(ogVal.getSize(), ogVal.getNumEntries(),
ogVal.isTimeSet() ? ogVal.getTime() : -1, true);

// FIXED: Use newSharedVal instead of original value
m.put(entry.getKey().getColumnFamily(), new Text(cf), newSharedVal.encodeAsValue());
} else if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) {
m.put(LastLocationColumnFamily.NAME, entry.getKey().getColumnQualifier(), entry.getValue());
} else if (entry.getKey().getColumnFamily().equals(LastLocationColumnFamily.NAME)) {
Expand Down Expand Up @@ -380,6 +428,8 @@ public static void cloneTable(ServerContext context, TableId srcTableId, TableId
}
}

markSourceFilesAsShared(srcTableId, tableId, context, bw);

// delete the clone markers and create directory entries
Scanner mscanner =
context.createScanner(SystemTables.METADATA.tableName(), Authorizations.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -67,6 +69,10 @@ public CommitCompaction(CompactionCommitData commitData, String newDatafile) {
this.newDatafile = newDatafile;
}

private record CompactionFileResult(TabletMetadata tabletMetadata,
ArrayList<StoredTabletFile> filesToDeleteViaGc) {
}

@Override
public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
var ecid = ExternalCompactionId.of(commitData.ecid);
Expand All @@ -79,25 +85,25 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
// process died and now its running again. In this case commit should do nothing, but its
// important to still carry on with the rest of the steps after commit. This code ignores a that
// fact that a commit may not have happened in the current call and continues for this reason.
TabletMetadata tabletMetadata = commitCompaction(env.getContext(), ecid, newFile);
CompactionFileResult fileResult = commitCompaction(env.getContext(), ecid, newFile);

String loc = null;
if (tabletMetadata != null && tabletMetadata.getLocation() != null) {
loc = tabletMetadata.getLocation().getHostPortSession();
if (fileResult != null && fileResult.tabletMetadata.getLocation() != null) {
loc = fileResult.tabletMetadata.getLocation().getHostPortSession();
}

// This will causes the tablet to be reexamined to see if it needs any more compactions.
var extent = KeyExtent.fromThrift(commitData.textent);
env.getEventPublisher().event(extent, "Compaction completed %s", extent);

return new PutGcCandidates(commitData, loc);
return new PutGcCandidates(commitData, loc, fileResult.filesToDeleteViaGc());
}

KeyExtent getExtent() {
return KeyExtent.fromThrift(commitData.textent);
}

private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid,
private CompactionFileResult commitCompaction(ServerContext ctx, ExternalCompactionId ecid,
Optional<ReferencedTabletFile> newDatafile) {

var tablet =
Expand All @@ -107,6 +113,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
.logInterval(Duration.ofMinutes(3)).createRetry();

ArrayList<StoredTabletFile> filesToDeleteViaGc = new ArrayList<>();

while (canCommitCompaction(ecid, tablet)) {
CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);

Expand All @@ -126,7 +134,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
}

// make the needed updates to the tablet
updateTabletForCompaction(commitData.stats, ecid, tablet, newDatafile, ecm, tabletMutator);
filesToDeleteViaGc = updateTabletForCompaction(ctx, commitData.stats, ecid, tablet,
newDatafile, ecm, tabletMutator);

tabletMutator.submit(
tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid),
Expand Down Expand Up @@ -156,13 +165,16 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
}
}

return tablet;
return new CompactionFileResult(tablet, filesToDeleteViaGc);
}

private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid,
TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, CompactionMetadata ecm,
private ArrayList<StoredTabletFile> updateTabletForCompaction(ServerContext ctx,
TCompactionStats stats, ExternalCompactionId ecid, TabletMetadata tablet,
Optional<ReferencedTabletFile> newDatafile, CompactionMetadata ecm,
Ample.ConditionalTabletMutator tabletMutator) {

ArrayList<StoredTabletFile> filesToDeleteViaGc = new ArrayList<>();

if (ecm.getKind() == CompactionKind.USER) {
if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) {
// all files selected for the user compactions are finished, so the tablet is finish and
Expand Down Expand Up @@ -211,13 +223,51 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio
// scan
ecm.getJobFiles().forEach(tabletMutator::putScan);
}
ecm.getJobFiles().forEach(tabletMutator::deleteFile);
for (StoredTabletFile file : ecm.getJobFiles()) {
// Check if file is shared
DataFileValue fileMetadata = tablet.getFilesMap().get(file);
boolean isShared = fileMetadata != null && fileMetadata.isShared();

if (isShared) {
// File is shared - must use GC delete markers
LOG.debug("File {} is shared, will create GC delete marker", file.getFileName());
filesToDeleteViaGc.add(file);
} else {
// File is not shared - try to delete directly
try {
Path filePath = new Path(file.getMetadataPath());
boolean deleted = ctx.getVolumeManager().deleteRecursively(filePath);

if (deleted) {
LOG.debug("Successfully deleted non-shared compaction input file: {}",
file.getFileName());
} else {
LOG.warn("Failed to delete non-shared file {}, will create GC delete marker",
file.getFileName());
filesToDeleteViaGc.add(file);
}
} catch (IOException e) {
LOG.warn("Error deleting non-shared file {}, will create GC delete marker: {}",
file.getFileName(), e.getMessage());
filesToDeleteViaGc.add(file);
}
}

// Always delete from tablet metadata
tabletMutator.deleteFile(file);
}

tabletMutator.deleteExternalCompaction(ecid);

if (newDatafile.isPresent()) {
tabletMutator.putFile(newDatafile.orElseThrow(),
new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()));
// NEW: Mark new compaction output files as not shared
DataFileValue newFileValue = new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(),
System.currentTimeMillis(), false // New file created by this tablet alone - not shared
);
tabletMutator.putFile(newDatafile.orElseThrow(), newFileValue);
}

return filesToDeleteViaGc;
}

public static boolean canCommitCompaction(ExternalCompactionId ecid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,52 @@
*/
package org.apache.accumulo.manager.compaction.coordinator.commit;

import java.util.ArrayList;
import java.util.List;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.gc.ReferenceFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.manager.tableOps.AbstractFateOperation;
import org.apache.accumulo.manager.tableOps.FateEnv;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PutGcCandidates extends AbstractFateOperation {
private static final long serialVersionUID = 1L;
private final CompactionCommitData commitData;
private final String refreshLocation;
private final ArrayList<String> filesToDeleteViaGc;
private static final Logger LOG = LoggerFactory.getLogger(PutGcCandidates.class);

public PutGcCandidates(CompactionCommitData commitData, String refreshLocation) {
public PutGcCandidates(CompactionCommitData commitData, String refreshLocation,
ArrayList<StoredTabletFile> filesToDeleteViaGc) {
this.commitData = commitData;
this.refreshLocation = refreshLocation;
this.filesToDeleteViaGc = new ArrayList<>();
for (StoredTabletFile file : filesToDeleteViaGc) {
this.filesToDeleteViaGc.add(file.getMetadataPath());
}
}

@Override
public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
if (filesToDeleteViaGc != null && !filesToDeleteViaGc.isEmpty()) {
var extent = KeyExtent.fromThrift(commitData.textent);
List<ReferenceFile> deleteMarkers = new ArrayList<>();

// add the GC candidates
env.getContext().getAmple().putGcCandidates(commitData.getTableId(), commitData.getJobFiles());
for (String filePath : filesToDeleteViaGc) {
StoredTabletFile file = new StoredTabletFile(filePath);
deleteMarkers.add(ReferenceFile.forFile(extent.tableId(), file));
LOG.debug("Creating GC delete marker for file: {}", file.getFileName());
}

env.getContext().getAmple().putGcFileAndDirCandidates(extent.tableId(), deleteMarkers);
LOG.debug("Created {} GC delete markers for compaction", deleteMarkers.size());
}

if (refreshLocation == null) {
env.recordCompactionCompletion(ExternalCompactionId.of(commitData.ecid));
Expand Down
Loading
Loading