Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,8 @@ public BatchScanner createBatchScanner(String tableName, Authorizations authoriz
int numQueryThreads) throws TableNotFoundException {
ensureOpen();
checkArgument(authorizations != null, "authorizations is null");
return new TabletServerBatchReader(this, requireNotOffline(getTableId(tableName), tableName),
tableName, authorizations, numQueryThreads);
return new TabletServerBatchReader(this, getTableId(tableName), tableName, authorizations,
numQueryThreads);
}

@Override
Expand Down Expand Up @@ -796,8 +796,7 @@ public Scanner createScanner(String tableName, Authorizations authorizations)
throws TableNotFoundException {
ensureOpen();
checkArgument(authorizations != null, "authorizations is null");
Scanner scanner =
new ScannerImpl(this, requireNotOffline(getTableId(tableName), tableName), authorizations);
Scanner scanner = new ScannerImpl(this, getTableId(tableName), authorizations);
Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties());
if (batchSize != null) {
scanner.setBatchSize(batchSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.clientImpl;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.util.Timer;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.base.Preconditions;

import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.cache.CaffeineStatsCounter;

public class OfflineTabletLocatorImpl extends TabletLocator {

private static final Logger LOG = LoggerFactory.getLogger(OfflineTabletLocatorImpl.class);

public static class OfflineTabletLocation extends TabletLocation {

public static final String SERVER = "offline_table_marker";

public OfflineTabletLocation(KeyExtent tablet_extent) {
super(tablet_extent, SERVER, SERVER);
}

}

private class OfflineTabletsCache implements RemovalListener<KeyExtent,KeyExtent> {

// This object uses a Caffeine cache to manage the duration of the extents
// cached in the TreeSet. The TreeSet is necessary for expedient operations.

private final ClientContext context;
private final int prefetch;
private final Cache<KeyExtent,KeyExtent> cache;
private final LinkedBlockingQueue<KeyExtent> evictions = new LinkedBlockingQueue<>();
private final TreeSet<KeyExtent> extents = new TreeSet<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Timer scanTimer = Timer.startNew();

private OfflineTabletsCache(ClientContext context) {
this.context = context;
Properties clientProperties = context.getProperties();
Duration cacheDuration = Duration.ofMillis(
ClientProperty.OFFLINE_LOCATOR_CACHE_DURATION.getTimeInMillis(clientProperties));
int maxCacheSize =
Integer.parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_SIZE.getValue(clientProperties));
prefetch = Integer
.parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_PREFETCH.getValue(clientProperties));
cache = Caffeine.newBuilder().expireAfterAccess(cacheDuration).initialCapacity(maxCacheSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache could have a weigher. The could be useful for the case where tablets splits can widely vary in size.

.maximumSize(maxCacheSize).removalListener(this).ticker(Ticker.systemTicker())
.recordStats(() -> new CaffeineStatsCounter(Metrics.globalRegistry,
OfflineTabletsCache.class.getSimpleName()))
.build();
}

@Override
public void onRemoval(KeyExtent key, KeyExtent value, RemovalCause cause) {
LOG.trace("Extent was evicted from cache: {}", key);
evictions.add(key);
try {
if (lock.writeLock().tryLock(50, TimeUnit.MILLISECONDS)) {
try {
processEvictions();
} finally {
lock.writeLock().unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting to acquire write lock", e);
}
}

private void processEvictions() {
Preconditions.checkArgument(lock.writeLock().isHeldByCurrentThread());
LOG.trace("Processing prior evictions");
Set<KeyExtent> copy = new HashSet<>();
evictions.drainTo(copy);
extents.removeAll(copy);
}

private KeyExtent findOrLoadExtent(KeyExtent searchKey) {
lock.readLock().lock();
try {
KeyExtent match = extents.ceiling(searchKey);
if (match != null && match.contains(searchKey)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this call to contains is working correctly. The way search key is created it goes from -inf to row, so the match may not contain it. Wrote the following test program and it prints false true.

        KeyExtent lookupExtent = KeyExtent.fromMetaRow(new Text("1;cat"));
        KeyExtent match = new KeyExtent(TableId.of("1"), new Text("d"),new Text("b") );
        System.out.println(match.contains(lookupExtent));
        System.out.println(match.contains(new Text("cat")));

// update access time in cache
@SuppressWarnings("unused")
var unused = cache.getIfPresent(match);
LOG.trace("Extent {} found in cache for start row {}", match, searchKey);
return match;
}
} finally {
lock.readLock().unlock();
}
lock.writeLock().lock();
// process prior evictions since we have the write lock
processEvictions();
// Load TabletMetadata
if (LOG.isDebugEnabled()) {
scanTimer.restart();
}
int added = 0;
try (TabletsMetadata tm =
context.getAmple().readTablets().forTable(tid).overlapping(searchKey.endRow(), true, null)
.fetch(ColumnType.PREV_ROW, ColumnType.LOCATION).build()) {
Iterator<TabletMetadata> iter = tm.iterator();
for (int i = 0; i < prefetch && iter.hasNext(); i++) {
TabletMetadata t = iter.next();
KeyExtent ke = t.getExtent();
if (t.getLocation() != null) {
if (context.getTableState(tid) == TableState.ONLINE) {
throw new IllegalStateException(
"Cannot continue scan with OfflineTabletLocator, table is now online");
}
throw new IllegalStateException(
"Extent " + ke + " has current or future location, but table is not online");
}
LOG.trace("Caching extent: {}", ke);
cache.put(ke, ke);
TabletLocatorImpl.removeOverlapping(extents, ke);
extents.add(ke);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be safest to remove any overlapping extents when adding new extents. If the table is offline for the entire duration of this caches lifetime that should not happen. However this cache could be long lived and the table could be brought online and taken offline during its lifetime, and it may not see that event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented this change in 3ed4585

added++;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Took {}ms to scan and load {} metadata tablets for table {}",
scanTimer.elapsed(TimeUnit.MILLISECONDS), added, tid);
}
return extents.ceiling(searchKey);
} finally {
lock.writeLock().unlock();
}
}

private void invalidate(KeyExtent failedExtent) {
cache.invalidate(failedExtent);
}

private void invalidate(Collection<KeyExtent> keySet) {
cache.invalidateAll(keySet);
}

private void invalidateAll() {
cache.invalidateAll();
}

}

private final TableId tid;
private final OfflineTabletsCache extentCache;

public OfflineTabletLocatorImpl(ClientContext context, TableId tableId) {
tid = tableId;
if (context.getTableState(tid) != TableState.OFFLINE) {
throw new IllegalStateException("Table " + tableId + " is not offline");
}
extentCache = new OfflineTabletsCache(context);
}

@Override
public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow,
boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {

if (skipRow) {
row = new Text(row);
row.append(new byte[] {0}, 0, 1);
}

Text metadataRow = new Text(tid.canonical());
metadataRow.append(new byte[] {';'}, 0, 1);
metadataRow.append(row.getBytes(), 0, row.getLength());

LOG.trace("Locating offline tablet for row: {}", metadataRow);
KeyExtent searchKey = KeyExtent.fromMetaRow(metadataRow);
KeyExtent match = extentCache.findOrLoadExtent(searchKey);
if (match != null) {
if (match.prevEndRow() == null || match.prevEndRow().compareTo(row) < 0) {
LOG.trace("Found match for row: {}, extent = {}", row, match);
return new OfflineTabletLocation(match);
}
}
LOG.trace("Found no matching extent for row: {}", row);
return null;
}

@Override
public List<Range> binRanges(ClientContext context, List<Range> ranges,
Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {

List<TabletLocation> tabletLocations = new ArrayList<>(ranges.size());
List<Range> failures = new ArrayList<>();

l1: for (Range r : ranges) {
LOG.trace("Looking up locations for range: {}", r);
tabletLocations.clear();
Text startRow;

if (r.getStartKey() != null) {
startRow = r.getStartKey().getRow();
} else {
startRow = new Text();
}

TabletLocation tl = this.locateTablet(context, startRow, false, false);
if (tl == null) {
LOG.trace("NOT FOUND first tablet in range: {}", r);
failures.add(r);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have coverage of the entire table, should never fail to find anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of dd9f6c8, the cache no longer keeps all of the extents in memory

continue;
}
LOG.trace("Found first tablet in range: {}, extent: {}", r, tl.tablet_extent);
tabletLocations.add(tl);

while (tl.tablet_extent.endRow() != null
&& !r.afterEndKey(new Key(tl.tablet_extent.endRow()).followingKey(PartialKey.ROW))) {
KeyExtent priorExtent = tl.tablet_extent;
tl = locateTablet(context, tl.tablet_extent.endRow(), true, false);

if (tl == null) {
LOG.trace("NOT FOUND tablet following {} in range: {}", priorExtent, r);
failures.add(r);
continue l1;
}
LOG.trace("Found following tablet in range: {}, extent: {}", r, tl.tablet_extent);
tabletLocations.add(tl);
}

// Ensure the extents found are non overlapping and have no holes. When reading some extents
// from the cache and other from the metadata table in the loop above we may end up with
// non-contiguous extents. This can happen when a subset of exents are placed in the cache and
// then after that merges and splits happen.
if (TabletLocatorImpl.isContiguous(tabletLocations)) {
for (TabletLocation tl2 : tabletLocations) {
TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, r);
}
} else {
LOG.trace("Found non-contiguous tablet in range: {}", r);
failures.add(r);
}

}
return failures;
}

@Override
public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations,
Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
throw new UnsupportedOperationException();
}

@Override
public void invalidateCache(KeyExtent failedExtent) {
extentCache.invalidate(failedExtent);
}

@Override
public void invalidateCache(Collection<KeyExtent> keySet) {
extentCache.invalidate(keySet);
}

@Override
public void invalidateCache() {
extentCache.invalidateAll();
}

@Override
public void invalidateCache(ClientContext context, String server) {
invalidateCache();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
Expand Down Expand Up @@ -163,6 +164,15 @@ public synchronized int getBatchSize() {
@Override
public synchronized Iterator<Entry<Key,Value>> iterator() {
ensureOpen();
if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) {
try {
String tableName = context.getTableName(tableId);
context.requireNotOffline(tableId, tableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this can do ZK operation per creation of a scanner iterator it could cause problems. Not sure what the impl of this method does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's using ZooCache, not hitting ZK directly.

} catch (TableNotFoundException e) {
throw new RuntimeException("Table not found", e);
}
}

ScannerIterator iter = new ScannerIterator(context, tableId, authorizations, range, size,
Duration.ofMillis(getTimeout(MILLISECONDS)), this, isolated, readaheadThreshold,
new Reporter());
Expand Down
Loading