Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -28,6 +29,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -69,6 +71,8 @@ public class ZKMetadataProvider {
private ZKMetadataProvider() {
}

public static final int DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE = 1000;

private static final Logger LOGGER = LoggerFactory.getLogger(ZKMetadataProvider.class);
private static final String CLUSTER_TENANT_ISOLATION_ENABLED_KEY = "tenantIsolationEnabled";
private static final String CLUSTER_APPLICATION_QUOTAS = "applicationQuotas";
Expand Down Expand Up @@ -725,6 +729,57 @@ public static List<SegmentZKMetadata> getSegmentsZKMetadata(ZkHelixPropertyStore
}
}

/**
* Iterates over the segment ZK metadata for the table and applies the provided consumer to each non-null segment
* metadata.
*
* @param propertyStore Helix property store from which segment metadata is read.
* @param tableNameWithType Table name with type suffix (e.g. {@code myTable_OFFLINE}).
* @param batchSize Batch size for ZK get calls.
* @param consumer Consumer invoked for each non-null segment metadata.
*/
public static void forEachSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType,
int batchSize, Consumer<SegmentZKMetadata> consumer) {
Preconditions.checkArgument(batchSize > 0, "Segment metadata batchSize must be greater than 0: %s", batchSize);

String segmentsPath = constructPropertyStorePathForResource(tableNameWithType);
List<String> segmentNames = getSegments(propertyStore, tableNameWithType);
if (segmentNames == null || segmentNames.isEmpty()) {
LOGGER.debug("No segments found under path: {}", segmentsPath);
return;
}

for (int startIndex = 0; startIndex < segmentNames.size(); startIndex += batchSize) {
int endIndex = Math.min(startIndex + batchSize, segmentNames.size());
List<String> segmentNameBatch = segmentNames.subList(startIndex, endIndex);

List<String> segmentPathBatch = new ArrayList<>(segmentNameBatch.size());
for (String segmentName : segmentNameBatch) {
segmentPathBatch.add(constructPropertyStorePathForSegment(tableNameWithType, segmentName));
}

List<ZNRecord> znRecords = propertyStore.get(segmentPathBatch, null, AccessOption.PERSISTENT);
int numNullRecords = 0;
if (znRecords != null) {
for (int i = 0; i < segmentNameBatch.size(); i++) {
ZNRecord znRecord = i < znRecords.size() ? znRecords.get(i) : null;
if (znRecord == null) {
numNullRecords++;
} else {
consumer.accept(new SegmentZKMetadata(znRecord));
}
}
} else {
numNullRecords = segmentNameBatch.size();
}

if (numNullRecords > 0) {
LOGGER.warn("Failed to read {}/{} segment ZK metadata under path: {} for table: {}",
numNullRecords, segmentNameBatch.size(), segmentsPath, tableNameWithType);
}
}
}
Comment thread
xiangfu0 marked this conversation as resolved.

/**
* Returns the segments for the given table.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.metadata;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.helix.AccessOption;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.zookeeper.data.Stat;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/**
* Tests for {@link ZKMetadataProvider}.
*/
public class ZKMetadataProviderTest {
private static final String TABLE_NAME_WITH_TYPE = "testTable_OFFLINE";

@Test
public void testForEachSegmentZKMetadataEmptySegments() {
ZkHelixPropertyStore<ZNRecord> mockPropertyStore = Mockito.mock(ZkHelixPropertyStore.class);
String segmentsPath = ZKMetadataProvider.constructPropertyStorePathForResource(TABLE_NAME_WITH_TYPE);
Mockito.when(mockPropertyStore.exists(segmentsPath, AccessOption.PERSISTENT)).thenReturn(true);
Mockito.when(
mockPropertyStore.getChildNames(segmentsPath, AccessOption.PERSISTENT)).thenReturn(Collections.emptyList());

List<String> segmentNames = new ArrayList<>();
ZKMetadataProvider.forEachSegmentZKMetadata(mockPropertyStore, TABLE_NAME_WITH_TYPE, 2,
segmentZKMetadata -> segmentNames.add(segmentZKMetadata.getSegmentName()));

Assert.assertTrue(segmentNames.isEmpty());
Mockito.verify(mockPropertyStore, Mockito.never())
.get(Mockito.<String>anyList(), Mockito.<Stat>anyList(),
ArgumentMatchers.eq(AccessOption.PERSISTENT));
}

@Test
public void testForEachSegmentZKMetadataBatchesAndNullRecords() {
ZkHelixPropertyStore<ZNRecord> mockPropertyStore = Mockito.mock(ZkHelixPropertyStore.class);
String segmentsPath = ZKMetadataProvider.constructPropertyStorePathForResource(TABLE_NAME_WITH_TYPE);
List<String> segmentNames = Arrays.asList("segment-1", "segment-2", "segment-3", "segment-4");
Mockito.when(mockPropertyStore.exists(segmentsPath, AccessOption.PERSISTENT)).thenReturn(true);
Mockito.when(mockPropertyStore.getChildNames(segmentsPath, AccessOption.PERSISTENT)).thenReturn(segmentNames);

List<List<String>> requestedBatches = new ArrayList<>();
Mockito.when(mockPropertyStore.get(Mockito.<String>anyList(), Mockito.isNull(),
ArgumentMatchers.eq(AccessOption.PERSISTENT))).thenAnswer(invocation -> {
List<String> requestedSegments = invocation.getArgument(0);
requestedBatches.add(new ArrayList<>(requestedSegments));
if (requestedSegments.equals(Arrays.asList(constructSegmentMetadataPath("segment-1"),
constructSegmentMetadataPath("segment-2")))) {
return Arrays.asList(createSegmentMetadata("segment-1"), null);
}
if (requestedSegments.equals(Arrays.asList(constructSegmentMetadataPath("segment-3"),
constructSegmentMetadataPath("segment-4")))) {
return Collections.singletonList(createSegmentMetadata("segment-3"));
}
return Collections.emptyList();
});

List<String> consumedSegments = new ArrayList<>();
ZKMetadataProvider.forEachSegmentZKMetadata(mockPropertyStore, TABLE_NAME_WITH_TYPE, 2,
segmentZKMetadata -> consumedSegments.add(segmentZKMetadata.getSegmentName()));

Assert.assertEquals(consumedSegments, Arrays.asList("segment-1", "segment-3"));
Assert.assertEquals(requestedBatches, Arrays.asList(
Arrays.asList(constructSegmentMetadataPath("segment-1"), constructSegmentMetadataPath("segment-2")),
Arrays.asList(constructSegmentMetadataPath("segment-3"), constructSegmentMetadataPath("segment-4"))));
}

@Test
public void testForEachSegmentZKMetadataRequiresPositiveBatchSize() {
ZkHelixPropertyStore<ZNRecord> mockPropertyStore = Mockito.mock(ZkHelixPropertyStore.class);
Assert.assertThrows(IllegalArgumentException.class,
() -> ZKMetadataProvider.forEachSegmentZKMetadata(mockPropertyStore, TABLE_NAME_WITH_TYPE, 0,
segmentZKMetadata -> {
}));
}

private ZNRecord createSegmentMetadata(String segmentName) {
return new ZNRecord(segmentName);
}

private String constructSegmentMetadataPath(String segmentName) {
return ZKMetadataProvider.constructPropertyStorePathForSegment(TABLE_NAME_WITH_TYPE, segmentName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ public static class ControllerPeriodicTasksConf {
public static final String AGED_SEGMENTS_DELETION_BATCH_SIZE =
"controller.retentionManager.agedSegmentsDeletionBatchSize";
public static final int DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE = 1000;
public static final String SEGMENTS_ZK_METADATA_BATCH_SIZE =
"controller.retentionManager.segmentsZkMetadataBatchSize";
Comment thread
xiangfu0 marked this conversation as resolved.
public static final int DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE = 1000;
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND = 60 * 60; // 1 Hour.
Expand Down Expand Up @@ -1226,6 +1229,15 @@ public void setAgedSegmentsDeletionBatchSize(int agedSegmentsDeletionBatchSize)
setProperty(ControllerPeriodicTasksConf.AGED_SEGMENTS_DELETION_BATCH_SIZE, agedSegmentsDeletionBatchSize);
}

public int getSegmentsZKMetadataBatchSize() {
return getProperty(ControllerPeriodicTasksConf.SEGMENTS_ZK_METADATA_BATCH_SIZE,
ControllerPeriodicTasksConf.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE);
}

public void setSegmentsZKMetadataBatchSize(int segmentZKMetadataBatchSize) {
setProperty(ControllerPeriodicTasksConf.SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentZKMetadataBatchSize);
}

public long getPinotTaskManagerInitialDelaySeconds() {
return getPeriodicTaskInitialDelayInSeconds();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,20 +902,17 @@ public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExclu
selectedSegments = new ArrayList<>(segmentSet);
} else {
selectedSegments = new ArrayList<>();
List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType);
ArrayList<String> filteredSegments = new ArrayList<>();
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
forEachSegmentsZKMetadata(tableNameWithType, ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE,
segmentZKMetadata -> {
String segmentName = segmentZKMetadata.getSegmentName();
// Compute the intersection of segmentZK metadata and idealstate for valid segments
if (!segmentSet.contains(segmentName)) {
filteredSegments.add(segmentName);
continue;
}
// Filter by time if the time range is specified
if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) {
} else if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) {
selectedSegments.add(segmentName);
}
}
});
LOGGER.info(
"Successfully computed the segments for table : {}. # of filtered segments: {}, the filtered segment list: "
+ "{}. Only showing up to 100 filtered segments.", tableNameWithType, filteredSegments.size(),
Expand Down Expand Up @@ -995,13 +992,23 @@ public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType, String s
return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName);
}

public void forEachSegmentsZKMetadata(String tableNameWithType, int batchSize,
Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
ZKMetadataProvider.forEachSegmentZKMetadata(_propertyStore, tableNameWithType, batchSize, segmentMetadataConsumer);
}

public void forEachSegmentsZKMetadata(String tableNameWithType, Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
ZKMetadataProvider.forEachSegmentZKMetadata(_propertyStore, tableNameWithType,
ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentMetadataConsumer);
}

public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
return ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, tableNameWithType);
}

public Collection<String> getLastLLCCompletedSegments(String tableNameWithType) {
Map<Integer, String> partitionIdToLastLLCCompletedSegmentMap = new HashMap<>();
for (SegmentZKMetadata zkMetadata : getSegmentsZKMetadata(tableNameWithType)) {
forEachSegmentsZKMetadata(tableNameWithType, zkMetadata -> {
if (zkMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
LLCSegmentName llcName = LLCSegmentName.of(zkMetadata.getSegmentName());
int partitionGroupId = llcName.getPartitionGroupId();
Expand All @@ -1012,7 +1019,7 @@ public Collection<String> getLastLLCCompletedSegments(String tableNameWithType)
partitionIdToLastLLCCompletedSegmentMap.put(partitionGroupId, zkMetadata.getSegmentName());
}
}
}
});
return partitionIdToLastLLCCompletedSegmentMap.values();
}

Expand Down Expand Up @@ -1510,13 +1517,12 @@ public void addSchema(Schema schema, boolean override, boolean force)

public void updateSegmentsZKTimeInterval(String tableNameWithType, DateTimeFieldSpec timeColumnFieldSpec) {
LOGGER.info("Updating segment time interval in ZK metadata for table: {}", tableNameWithType);

List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType);
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
int version = segmentZKMetadata.toZNRecord().getVersion();
updateZkTimeInterval(segmentZKMetadata, timeColumnFieldSpec);
updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
}
forEachSegmentsZKMetadata(tableNameWithType,
ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentZKMetadata -> {
int version = segmentZKMetadata.toZNRecord().getVersion();
updateZkTimeInterval(segmentZKMetadata, timeColumnFieldSpec);
updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
});
}

public void updateSchema(Schema schema, boolean reload, boolean forceTableSchemaUpdate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.helix.HelixDataAccessor;
Expand Down Expand Up @@ -115,6 +116,15 @@ public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
return ZKMetadataProvider.getSegmentsZKMetadata(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
}

public void forEachSegmentsZKMetadata(String tableNameWithType, int batchSize,
Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
_pinotHelixResourceManager.forEachSegmentsZKMetadata(tableNameWithType, batchSize, segmentMetadataConsumer);
}

public void forEachSegmentsZKMetadata(String tableNameWithType, Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
_pinotHelixResourceManager.forEachSegmentsZKMetadata(tableNameWithType, segmentMetadataConsumer);
}

public IdealState getIdealState(String tableNameWithType) {
return _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.model.IdealState;
import org.apache.helix.task.JobConfig;
Expand Down Expand Up @@ -161,32 +162,45 @@ public int getMaxAttemptsPerTask(String minionTag) {
* @return the list of segment zk metadata for available segments in the table.
*/
public List<SegmentZKMetadata> getSegmentsZKMetadataForTable(String tableNameWithType) {
return getSegmentsZKMetadataInIdealState(tableNameWithType, null);
}

public List<SegmentZKMetadata> getNonConsumingSegmentsZKMetadataForRealtimeTable(String tableNameWithType) {
IdealState idealState = _clusterInfoAccessor.getIdealState(tableNameWithType);
Set<String> segmentsForTable = idealState.getPartitionSet();
List<SegmentZKMetadata> segmentZKMetadataList = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>();
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())) {
selectedSegmentZKMetadataList.add(segmentZKMetadata);
}
if (idealState == null) {
return new ArrayList<>();
}
return selectedSegmentZKMetadataList;
return getSegmentsZKMetadataInIdealState(tableNameWithType, segmentZKMetadata -> {
String segmentName = segmentZKMetadata.getSegmentName();
Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName);
return segmentZKMetadata.getStatus().isCompleted() // skip consuming segments
&& instanceStateMap != null && !instanceStateMap.containsValue(SegmentStateModel.CONSUMING);
// The last check is for an edge case where
// 1. SegmentZKMetadata was updated to DONE in segment commit protocol, but
// 2. IdealState for the segment was not updated to ONLINE due to some issue in the controller.
// We avoid picking up such segments to allow RealtimeSegmentValidationManager to fix them.
});
}

public List<SegmentZKMetadata> getNonConsumingSegmentsZKMetadataForRealtimeTable(String tableNameWithType) {
private List<SegmentZKMetadata> getSegmentsZKMetadataInIdealState(String tableNameWithType,
Predicate<SegmentZKMetadata> segmentMetadataFilter) {
IdealState idealState = _clusterInfoAccessor.getIdealState(tableNameWithType);
Set<String> idealStateSegments = idealState.getPartitionSet();
List<SegmentZKMetadata> segmentZKMetadataList = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
if (idealState == null) {
return new ArrayList<>();
}
Set<String> segmentsForTable = idealState.getPartitionSet();
if (segmentsForTable == null || segmentsForTable.isEmpty()) {
return new ArrayList<>();
}

List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
if (allSegments == null) {
return new ArrayList<>();
}
List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>();
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
String segmentName = segmentZKMetadata.getSegmentName();
if (idealStateSegments.contains(segmentName)
&& segmentZKMetadata.getStatus().isCompleted() // skip consuming segments
&& !idealState.getInstanceStateMap(segmentName).containsValue(SegmentStateModel.CONSUMING)) {
// The last check is for an edge case where
// 1. SegmentZKMetadata was updated to DONE in segment commit protocol, but
// 2. IdealState for the segment was not updated to ONLINE due to some issue in the controller.
// We avoid picking up such segments to allow RealtimeSegmentValidationManager to fix them.
for (SegmentZKMetadata segmentZKMetadata : allSegments) {
if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())
&& (segmentMetadataFilter == null || segmentMetadataFilter.test(segmentZKMetadata))) {
selectedSegmentZKMetadataList.add(segmentZKMetadata);
}
}
Expand Down
Loading
Loading