-
Notifications
You must be signed in to change notification settings - Fork 479
Enable clients to scan offline tables using ScanServers #6156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 2.1
Are you sure you want to change the base?
Changes from all commits
ed54842
dd9f6c8
3ed4585
8f3ad2d
305297e
63b9d17
5603230
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,323 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.accumulo.core.clientImpl; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Properties; | ||
| import java.util.Set; | ||
| import java.util.TreeSet; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
|
||
| import org.apache.accumulo.core.client.AccumuloException; | ||
| import org.apache.accumulo.core.client.AccumuloSecurityException; | ||
| import org.apache.accumulo.core.client.TableNotFoundException; | ||
| import org.apache.accumulo.core.conf.ClientProperty; | ||
| import org.apache.accumulo.core.data.Key; | ||
| import org.apache.accumulo.core.data.Mutation; | ||
| import org.apache.accumulo.core.data.PartialKey; | ||
| import org.apache.accumulo.core.data.Range; | ||
| import org.apache.accumulo.core.data.TableId; | ||
| import org.apache.accumulo.core.dataImpl.KeyExtent; | ||
| import org.apache.accumulo.core.manager.state.tables.TableState; | ||
| import org.apache.accumulo.core.metadata.schema.TabletMetadata; | ||
| import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; | ||
| import org.apache.accumulo.core.metadata.schema.TabletsMetadata; | ||
| import org.apache.accumulo.core.util.Timer; | ||
| import org.apache.hadoop.io.Text; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import com.github.benmanes.caffeine.cache.Cache; | ||
| import com.github.benmanes.caffeine.cache.Caffeine; | ||
| import com.github.benmanes.caffeine.cache.RemovalCause; | ||
| import com.github.benmanes.caffeine.cache.RemovalListener; | ||
| import com.github.benmanes.caffeine.cache.Ticker; | ||
| import com.google.common.base.Preconditions; | ||
|
|
||
| import io.micrometer.core.instrument.Metrics; | ||
| import io.micrometer.core.instrument.binder.cache.CaffeineStatsCounter; | ||
|
|
||
| public class OfflineTabletLocatorImpl extends TabletLocator { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(OfflineTabletLocatorImpl.class); | ||
|
|
||
| public static class OfflineTabletLocation extends TabletLocation { | ||
|
|
||
| public static final String SERVER = "offline_table_marker"; | ||
|
|
||
| public OfflineTabletLocation(KeyExtent tablet_extent) { | ||
| super(tablet_extent, SERVER, SERVER); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| private class OfflineTabletsCache implements RemovalListener<KeyExtent,KeyExtent> { | ||
|
|
||
| // This object uses a Caffeine cache to manage the duration of the extents | ||
| // cached in the TreeSet. The TreeSet is necessary for expedient operations. | ||
|
|
||
| private final ClientContext context; | ||
| private final int prefetch; | ||
| private final Cache<KeyExtent,KeyExtent> cache; | ||
| private final LinkedBlockingQueue<KeyExtent> evictions = new LinkedBlockingQueue<>(); | ||
| private final TreeSet<KeyExtent> extents = new TreeSet<>(); | ||
| private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); | ||
| private final Timer scanTimer = Timer.startNew(); | ||
|
|
||
| private OfflineTabletsCache(ClientContext context) { | ||
| this.context = context; | ||
| Properties clientProperties = context.getProperties(); | ||
| Duration cacheDuration = Duration.ofMillis( | ||
| ClientProperty.OFFLINE_LOCATOR_CACHE_DURATION.getTimeInMillis(clientProperties)); | ||
| int maxCacheSize = | ||
| Integer.parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_SIZE.getValue(clientProperties)); | ||
| prefetch = Integer | ||
| .parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_PREFETCH.getValue(clientProperties)); | ||
| cache = Caffeine.newBuilder().expireAfterAccess(cacheDuration).initialCapacity(maxCacheSize) | ||
| .maximumSize(maxCacheSize).removalListener(this).ticker(Ticker.systemTicker()) | ||
| .recordStats(() -> new CaffeineStatsCounter(Metrics.globalRegistry, | ||
| OfflineTabletsCache.class.getSimpleName())) | ||
| .build(); | ||
| } | ||
|
|
||
| @Override | ||
| public void onRemoval(KeyExtent key, KeyExtent value, RemovalCause cause) { | ||
| LOG.trace("Extent was evicted from cache: {}", key); | ||
| evictions.add(key); | ||
| try { | ||
| if (lock.writeLock().tryLock(50, TimeUnit.MILLISECONDS)) { | ||
| try { | ||
| processEvictions(); | ||
| } finally { | ||
| lock.writeLock().unlock(); | ||
| } | ||
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new RuntimeException("Interrupted while waiting to acquire write lock", e); | ||
| } | ||
| } | ||
|
|
||
| private void processEvictions() { | ||
| Preconditions.checkArgument(lock.writeLock().isHeldByCurrentThread()); | ||
| LOG.trace("Processing prior evictions"); | ||
| Set<KeyExtent> copy = new HashSet<>(); | ||
| evictions.drainTo(copy); | ||
| extents.removeAll(copy); | ||
| } | ||
|
|
||
| private KeyExtent findOrLoadExtent(KeyExtent searchKey) { | ||
| lock.readLock().lock(); | ||
| try { | ||
| KeyExtent match = extents.ceiling(searchKey); | ||
| if (match != null && match.contains(searchKey)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure this call to contains is working correctly. The way search key is created it goes from -inf to row, so the match may not contain it. Wrote the following test program and it prints false true. KeyExtent lookupExtent = KeyExtent.fromMetaRow(new Text("1;cat"));
KeyExtent match = new KeyExtent(TableId.of("1"), new Text("d"),new Text("b") );
System.out.println(match.contains(lookupExtent));
System.out.println(match.contains(new Text("cat"))); |
||
| // update access time in cache | ||
| @SuppressWarnings("unused") | ||
| var unused = cache.getIfPresent(match); | ||
| LOG.trace("Extent {} found in cache for start row {}", match, searchKey); | ||
| return match; | ||
| } | ||
| } finally { | ||
| lock.readLock().unlock(); | ||
| } | ||
| lock.writeLock().lock(); | ||
| // process prior evictions since we have the write lock | ||
| processEvictions(); | ||
| // Load TabletMetadata | ||
| if (LOG.isDebugEnabled()) { | ||
| scanTimer.restart(); | ||
| } | ||
| int added = 0; | ||
| try (TabletsMetadata tm = | ||
| context.getAmple().readTablets().forTable(tid).overlapping(searchKey.endRow(), true, null) | ||
| .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION).build()) { | ||
| Iterator<TabletMetadata> iter = tm.iterator(); | ||
| for (int i = 0; i < prefetch && iter.hasNext(); i++) { | ||
| TabletMetadata t = iter.next(); | ||
| KeyExtent ke = t.getExtent(); | ||
| if (t.getLocation() != null) { | ||
| if (context.getTableState(tid) == TableState.ONLINE) { | ||
| throw new IllegalStateException( | ||
| "Cannot continue scan with OfflineTabletLocator, table is now online"); | ||
| } | ||
| throw new IllegalStateException( | ||
| "Extent " + ke + " has current or future location, but table is not online"); | ||
| } | ||
| LOG.trace("Caching extent: {}", ke); | ||
| cache.put(ke, ke); | ||
| TabletLocatorImpl.removeOverlapping(extents, ke); | ||
| extents.add(ke); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be safest to remove any overlapping extents when adding new extents. If the table is offline for the entire duration of this caches lifetime that should not happen. However this cache could be long lived and the table could be brought online and taken offline during its lifetime, and it may not see that event.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented this change in 3ed4585 |
||
| added++; | ||
| } | ||
| if (LOG.isDebugEnabled()) { | ||
| LOG.debug("Took {}ms to scan and load {} metadata tablets for table {}", | ||
| scanTimer.elapsed(TimeUnit.MILLISECONDS), added, tid); | ||
| } | ||
| return extents.ceiling(searchKey); | ||
| } finally { | ||
| lock.writeLock().unlock(); | ||
| } | ||
| } | ||
|
|
||
| private void invalidate(KeyExtent failedExtent) { | ||
| cache.invalidate(failedExtent); | ||
| } | ||
|
|
||
| private void invalidate(Collection<KeyExtent> keySet) { | ||
| cache.invalidateAll(keySet); | ||
| } | ||
|
|
||
| private void invalidateAll() { | ||
| cache.invalidateAll(); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| private final TableId tid; | ||
| private final OfflineTabletsCache extentCache; | ||
|
|
||
| public OfflineTabletLocatorImpl(ClientContext context, TableId tableId) { | ||
| tid = tableId; | ||
| if (context.getTableState(tid) != TableState.OFFLINE) { | ||
| throw new IllegalStateException("Table " + tableId + " is not offline"); | ||
| } | ||
| extentCache = new OfflineTabletsCache(context); | ||
| } | ||
|
|
||
| @Override | ||
| public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, | ||
| boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { | ||
|
|
||
| if (skipRow) { | ||
| row = new Text(row); | ||
| row.append(new byte[] {0}, 0, 1); | ||
| } | ||
|
|
||
| Text metadataRow = new Text(tid.canonical()); | ||
| metadataRow.append(new byte[] {';'}, 0, 1); | ||
| metadataRow.append(row.getBytes(), 0, row.getLength()); | ||
|
|
||
| LOG.trace("Locating offline tablet for row: {}", metadataRow); | ||
| KeyExtent searchKey = KeyExtent.fromMetaRow(metadataRow); | ||
| KeyExtent match = extentCache.findOrLoadExtent(searchKey); | ||
| if (match != null) { | ||
| if (match.prevEndRow() == null || match.prevEndRow().compareTo(row) < 0) { | ||
| LOG.trace("Found match for row: {}, extent = {}", row, match); | ||
| return new OfflineTabletLocation(match); | ||
| } | ||
| } | ||
| LOG.trace("Found no matching extent for row: {}", row); | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public List<Range> binRanges(ClientContext context, List<Range> ranges, | ||
| Map<String,Map<KeyExtent,List<Range>>> binnedRanges) | ||
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { | ||
|
|
||
| List<TabletLocation> tabletLocations = new ArrayList<>(ranges.size()); | ||
| List<Range> failures = new ArrayList<>(); | ||
|
|
||
| l1: for (Range r : ranges) { | ||
| LOG.trace("Looking up locations for range: {}", r); | ||
| tabletLocations.clear(); | ||
| Text startRow; | ||
|
|
||
| if (r.getStartKey() != null) { | ||
| startRow = r.getStartKey().getRow(); | ||
| } else { | ||
| startRow = new Text(); | ||
| } | ||
|
|
||
| TabletLocation tl = this.locateTablet(context, startRow, false, false); | ||
| if (tl == null) { | ||
| LOG.trace("NOT FOUND first tablet in range: {}", r); | ||
| failures.add(r); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we have coverage of the entire table, should never fail to find anything.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As of dd9f6c8, the cache no longer keeps all of the extents in memory |
||
| continue; | ||
| } | ||
| LOG.trace("Found first tablet in range: {}, extent: {}", r, tl.tablet_extent); | ||
| tabletLocations.add(tl); | ||
|
|
||
| while (tl.tablet_extent.endRow() != null | ||
| && !r.afterEndKey(new Key(tl.tablet_extent.endRow()).followingKey(PartialKey.ROW))) { | ||
| KeyExtent priorExtent = tl.tablet_extent; | ||
| tl = locateTablet(context, tl.tablet_extent.endRow(), true, false); | ||
|
|
||
| if (tl == null) { | ||
| LOG.trace("NOT FOUND tablet following {} in range: {}", priorExtent, r); | ||
| failures.add(r); | ||
| continue l1; | ||
| } | ||
| LOG.trace("Found following tablet in range: {}, extent: {}", r, tl.tablet_extent); | ||
| tabletLocations.add(tl); | ||
| } | ||
|
|
||
| // Ensure the extents found are non overlapping and have no holes. When reading some extents | ||
| // from the cache and other from the metadata table in the loop above we may end up with | ||
| // non-contiguous extents. This can happen when a subset of exents are placed in the cache and | ||
| // then after that merges and splits happen. | ||
| if (TabletLocatorImpl.isContiguous(tabletLocations)) { | ||
| for (TabletLocation tl2 : tabletLocations) { | ||
| TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, r); | ||
| } | ||
| } else { | ||
| LOG.trace("Found non-contiguous tablet in range: {}", r); | ||
| failures.add(r); | ||
| } | ||
|
|
||
| } | ||
| return failures; | ||
| } | ||
|
|
||
| @Override | ||
| public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations, | ||
| Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures) | ||
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
|
|
||
| @Override | ||
| public void invalidateCache(KeyExtent failedExtent) { | ||
| extentCache.invalidate(failedExtent); | ||
| } | ||
|
|
||
| @Override | ||
| public void invalidateCache(Collection<KeyExtent> keySet) { | ||
| extentCache.invalidate(keySet); | ||
| } | ||
|
|
||
| @Override | ||
| public void invalidateCache() { | ||
| extentCache.invalidateAll(); | ||
| } | ||
|
|
||
| @Override | ||
| public void invalidateCache(ClientContext context, String server) { | ||
| invalidateCache(); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,7 @@ | |
|
|
||
| import org.apache.accumulo.core.Constants; | ||
| import org.apache.accumulo.core.client.Scanner; | ||
| import org.apache.accumulo.core.client.TableNotFoundException; | ||
| import org.apache.accumulo.core.data.Key; | ||
| import org.apache.accumulo.core.data.Range; | ||
| import org.apache.accumulo.core.data.TableId; | ||
|
|
@@ -163,6 +164,15 @@ public synchronized int getBatchSize() { | |
| @Override | ||
| public synchronized Iterator<Entry<Key,Value>> iterator() { | ||
| ensureOpen(); | ||
| if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) { | ||
| try { | ||
| String tableName = context.getTableName(tableId); | ||
| context.requireNotOffline(tableId, tableName); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this can do ZK operation per creation of a scanner iterator it could cause problems. Not sure what the impl of this method does.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's using ZooCache, not hitting ZK directly. |
||
| } catch (TableNotFoundException e) { | ||
| throw new RuntimeException("Table not found", e); | ||
| } | ||
| } | ||
|
|
||
| ScannerIterator iter = new ScannerIterator(context, tableId, authorizations, range, size, | ||
| Duration.ofMillis(getTimeout(MILLISECONDS)), this, isolated, readaheadThreshold, | ||
| new Reporter()); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache could have a weigher. The could be useful for the case where tablets splits can widely vary in size.