Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,30 @@ public void addCompactionResults(Collection<HStoreFile> newCompactedfiles,
// Let a background thread close the actual reader on these compacted files and also
// ensure to evict the blocks from block cache so that they are no longer in
// cache
newCompactedfiles.forEach(HStoreFile::markCompactedAway);
compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
Iterables.concat(compactedfiles, newCompactedfiles));
List<HStoreFile> filesToClose = new ArrayList<>(newCompactedfiles);
try {
HStoreFile.increaseStoreFilesRefeCount(newCompactedfiles);
newCompactedfiles.forEach(hStoreFile -> {
StoreFileReader reader = hStoreFile.getReader();
try {
if (reader == null) {
hStoreFile.initReader();
} else {
filesToClose.remove(hStoreFile);
}
} catch (IOException e) {
LOG.warn("Couldn't initialize reader for " + hStoreFile, e);
throw new RuntimeException(e);
} finally {
hStoreFile.markCompactedAway();
}
});
compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
Iterables.concat(compactedfiles, newCompactedfiles));
} finally {
HStoreFile.decreaseStoreFilesRefeCount(newCompactedfiles);
filesToClose.forEach(HStoreFile::closeStoreFile);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,18 @@ public synchronized void closeStoreFile(boolean evictOnClose) throws IOException
}
}

public synchronized void closeStoreFile() {
try {
boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
if (this.initialReader != null) {
this.initialReader.close(evictOnClose);
this.initialReader = null;
}
} catch (IOException e) {
LOG.warn("failed to close reader", e);
}
}

/**
* Delete this file
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
Expand Down Expand Up @@ -1775,6 +1776,103 @@ public void testReclaimChunkWhenScaning() throws IOException {
}
}


@Test
public void testReaderIsClosedOnlyAfterCompactionComplete() throws Exception {
HBaseTestingUtility util = new HBaseTestingUtility();
util.startMiniCluster(1);

try {
Configuration conf = util.getConfiguration();

byte[] FAMILY = Bytes.toBytes("f");
byte[] Q = Bytes.toBytes("q");
TableName TABLE = TableName.valueOf("race_test");

TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY)
.setMaxVersions(1)
.build())
.build();

util.getAdmin().createTable(td);

HRegion region =
util.getMiniHBaseCluster().getRegions(TABLE).get(0);

HStore store = region.getStore(FAMILY);

for (int i = 0; i < 4; i++) {
Put p = new Put(Bytes.toBytes("row-" + i));
p.addColumn(FAMILY, Q, Bytes.toBytes(i));
region.put(p);
region.flush(true); // force store file
}

// sanity check
assertEquals(4, store.getStorefilesCount());

DefaultStoreFileManager sfm =
(DefaultStoreFileManager) store.getStoreEngine().getStoreFileManager();

HStoreFile victim =
sfm.getStoreFiles().iterator().next();

AtomicReference<Throwable> failure = new AtomicReference<>();
CountDownLatch start = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(2);

Thread remover = new Thread(() -> {
try {
start.await();
victim.closeStoreFile(true); // async-style close
} catch (Throwable t) {
failure.set(t);
} finally {
done.countDown();
}
});

Thread adder = new Thread(() -> {
try {
start.await();
sfm.addCompactionResults(
Collections.singletonList(victim),
new ArrayList<>());
} catch (Throwable t) {
failure.set(t);
} finally {
done.countDown();
}
});

remover.start();
adder.start();
start.countDown();

assertTrue(done.await(60, TimeUnit.SECONDS));

if (failure.get() != null) {
throw new AssertionError(
"Race caused failure (this is the bug you fixed)",
failure.get());
}

Collection<HStoreFile> finalFiles = sfm.getStoreFiles();

assertFalse("Old file must be gone",
finalFiles.contains(victim));

assertEquals("File count stable after compaction",
3, finalFiles.size());

} finally {
util.shutdownMiniCluster();
}
}


/**
* If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the
* versionedList. And the first InMemoryFlushRunnable will use the chagned versionedList to remove
Expand Down