Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,9 @@ public SQLException newException(SQLExceptionInfo info) {
info.getMaxPhoenixColumnSizeBytes(), info.getPhoenixColumnSizeBytes());
}
}),
MUTATION_LIMIT_REACHED(733, "LIM04",
"Mutation buffer limit reached. Existing mutations are preserved. "
+ "Commit current mutations and retry the failed operation."),
INSUFFICIENT_MEMORY(999, "50M01", "Unable to allocate enough memory."),
HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_FAILURE_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER;
import static org.apache.phoenix.query.QueryServices.INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
import static org.apache.phoenix.query.QueryServices.WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -99,6 +101,7 @@
import org.apache.phoenix.schema.MaxMutationSizeBytesExceededException;
import org.apache.phoenix.schema.MaxMutationSizeExceededException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.MutationLimitReachedException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PMetaData;
Expand Down Expand Up @@ -187,6 +190,7 @@ public class MutationState implements SQLCloseable {

private final boolean indexRegionObserverEnabledAllTables;
private final boolean serverSideImmutableIndexes;
private final boolean preserveOnLimitExceeded;

/**
* Return result back to client. To be used when client needs to read the whole row or some
Expand Down Expand Up @@ -268,6 +272,8 @@ Maps.<TableRef, List<MultiRowMutationState>> newHashMapWithExpectedSize(5), subT
this.serverSideImmutableIndexes = this.connection.getQueryServices().getConfiguration()
.getBoolean(SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED);
this.preserveOnLimitExceeded = this.connection.getQueryServices().getProps().getBoolean(
PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB, DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED);
}

public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset,
Expand Down Expand Up @@ -507,6 +513,17 @@ private void throwIfTooBig() throws SQLException {
}
}

/**
* Pre-check for preserve mode: throws MutationLimitReachedException if adding the given
* rows/bytes would exceed limits, which prevents the caller from modifying the state.
*/
private void throwIfLimitWouldBeExceeded(int additionalRows, long additionalBytes)
throws SQLException {
if (numRows + additionalRows > maxSize || estimatedSize + additionalBytes > maxSizeBytes) {
throw new MutationLimitReachedException();
}
}

public long getUpdateCount() {
return sizeOffset + numRows;
}
Expand Down Expand Up @@ -537,20 +554,25 @@ public void clearResult() {
}

private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows,
Map<TableRef, List<MultiRowMutationState>> dstMutations) {
Map<TableRef, List<MultiRowMutationState>> dstMutations) throws SQLException {
PTable table = tableRef.getTable();
boolean isIndex = table.getType() == PTableType.INDEX;
boolean incrementRowCount = dstMutations == this.mutationsMap;
boolean impactsLimits = incrementRowCount && !isIndex;
// we only need to check if the new mutation batch (srcRows) conflicts with the
// last mutation batch since we try to merge it with that only
MultiRowMutationState existingRows = getLastMutationBatch(dstMutations, tableRef);

if (existingRows == null) { // no rows found for this table
// For preserve mode, check limits BEFORE modifying any state
if (impactsLimits && preserveOnLimitExceeded) {
throwIfLimitWouldBeExceeded(srcRows.size(), srcRows.estimatedSize);
}
// Size new map at batch size as that's what it'll likely grow to.
MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize());
newRows.putAll(srcRows);
addMutations(dstMutations, tableRef, newRows);
if (incrementRowCount && !isIndex) {
if (impactsLimits) {
numRows += srcRows.size();
// if we added all the rows from newMutationState we can just increment the
// estimatedSize by newMutationState.estimatedSize
Expand All @@ -562,36 +584,51 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows,
// for conflicting rows
MultiRowMutationState conflictingRows =
new MultiRowMutationState(connection.getMutateBatchSize());
long conflictingRowsTotalSize = 0;

// Rows for this table already exist, check for conflicts
for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : srcRows.entrySet()) {
ImmutableBytesPtr key = rowEntry.getKey();
RowMutationState newRowMutationState = rowEntry.getValue();
RowMutationState existingRowMutationState = existingRows.get(key);
if (existingRowMutationState == null) {
// For preserve mode, check limits BEFORE modifying any state
long newRowSize = newRowMutationState.calculateEstimatedSize();
if (impactsLimits && preserveOnLimitExceeded) {
throwIfLimitWouldBeExceeded(1, newRowSize);
}
existingRows.put(key, newRowMutationState);
if (incrementRowCount && !isIndex) { // Don't count index rows in row count
if (impactsLimits) { // Don't count index rows in row count
numRows++;
// increment estimated size by the size of the new row
estimatedSize += newRowMutationState.calculateEstimatedSize();
estimatedSize += newRowSize;
}
continue;
}

Map<PColumn, byte[]> existingValues = existingRowMutationState.getColumnValues();
Map<PColumn, byte[]> newValues = newRowMutationState.getColumnValues();
if (existingValues != PRow.DELETE_MARKER && newValues != PRow.DELETE_MARKER) {
// Check if we can merge existing column values with new column values
long beforeMergeSize = existingRowMutationState.calculateEstimatedSize();
boolean isMerged = existingRowMutationState.join(rowEntry.getValue());
if (isMerged) {
// decrement estimated size by the size of the old row
estimatedSize -= beforeMergeSize;
// increment estimated size by the size of the new row
estimatedSize += existingRowMutationState.calculateEstimatedSize();
// Check if we can merge existing column values with new column values.
// For preserve mode, pass this instance so join() can check limits before modification.
Long sizeDiff =
existingRowMutationState.join(newRowMutationState, preserveOnLimitExceeded ? this : null);
if (sizeDiff != null) {
// Merged successfully (row count unchanged - same row key)
estimatedSize += sizeDiff;
} else {
// cannot merge regular upsert and conditional upsert
// conflicting row is not a new row so no need to increment numRows
// conflicting row goes into a separate batch (same key, different semantics)
// Row count unchanged (same key), but size increases
long conflictingRowSize = newRowMutationState.calculateEstimatedSize();
if (impactsLimits && preserveOnLimitExceeded) {
// Include already-accumulated conflicting rows size in the check
throwIfLimitWouldBeExceeded(0, conflictingRowsTotalSize + conflictingRowSize);
}
conflictingRows.put(key, newRowMutationState);
if (impactsLimits) {
conflictingRowsTotalSize += conflictingRowSize;
}
}
} else {
existingRows.put(key, newRowMutationState);
Expand All @@ -600,11 +637,13 @@ private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows,

if (!conflictingRows.isEmpty()) {
addMutations(dstMutations, tableRef, conflictingRows);
// Update estimatedSize only after actual state change
estimatedSize += conflictingRowsTotalSize;
}
}

private void joinMutationState(Map<TableRef, List<MultiRowMutationState>> srcMutations,
Map<TableRef, List<MultiRowMutationState>> dstMutations) {
Map<TableRef, List<MultiRowMutationState>> dstMutations) throws SQLException {
// Merge newMutation with this one, keeping state from newMutation for any overlaps
for (Map.Entry<TableRef, List<MultiRowMutationState>> entry : srcMutations.entrySet()) {
TableRef tableRef = entry.getKey();
Expand Down Expand Up @@ -1609,7 +1648,6 @@ private IOException updateTableRegionCacheIfNecessary(IOException ioe) {
"Sent batch of " + mutationBatch.size() + " for " + Bytes.toString(htableName));
}
child.stop();
child.stop();
shouldRetry = false;
numFailedMutations = 0;

Expand Down Expand Up @@ -2406,36 +2444,56 @@ int[] getStatementIndexes() {

/**
* Join the newRow with the current row if it doesn't conflict with it. A regular upsert
* conflicts with a conditional upsert
* @return True if the rows were successfully joined else False
* conflicts with a conditional upsert.
* @param mutationState if non-null, checks limits before modification and throws
* MutationLimitReachedException if size increase would exceed limits
* @return the size change (can be 0, positive, or negative) if merged, or null if conflicting
*/
boolean join(RowMutationState newRow) {
Long join(RowMutationState newRow, MutationState mutationState) throws SQLException {
if (isConflicting(newRow)) {
return false;
return null;
}
// If we already have a row and the new row has an ON DUPLICATE KEY clause
// ignore the new values (as that's what the server will do).

// Pre-compute merged results (no side effects - these return new objects)
byte[] combinedOnDupKey =
PhoenixIndexBuilderHelper.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes);
int[] mergedIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());

// Calculate column values size change
long colValuesSizeDiff = 0;
if (newRow.onDupKeyBytes == null) {
// increment the column value size by the new row column value size
colValuesSize += newRow.colValuesSize;
colValuesSizeDiff = newRow.colValuesSize;
for (Map.Entry<PColumn, byte[]> entry : newRow.columnValues.entrySet()) {
PColumn col = entry.getKey();
byte[] oldValue = columnValues.put(col, entry.getValue());
byte[] oldValue = columnValues.get(entry.getKey());
if (oldValue != null) {
// decrement column value size by the size of all column values that were replaced
colValuesSize -= (col.getEstimatedSize() + oldValue.length);
colValuesSizeDiff -= (entry.getKey().getEstimatedSize() + oldValue.length);
}
}
}
// Concatenate ON DUPLICATE KEY bytes to allow multiple
// increments of the same row in the same commit batch.
this.onDupKeyBytes =
PhoenixIndexBuilderHelper.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes);

// Total size change (can be negative)
long totalSizeDiff = colValuesSizeDiff
+ ((combinedOnDupKey != null ? combinedOnDupKey.length : 0)
- (this.onDupKeyBytes != null ? this.onDupKeyBytes.length : 0))
+ (mergedIndexes.length - statementIndexes.length) * SizedUtil.INT_SIZE;

// Check limit BEFORE any modification (row count unchanged for merge - same row key)
if (mutationState != null) {
mutationState.throwIfLimitWouldBeExceeded(0, totalSizeDiff);
}

// Apply modifications
this.onDupKeyBytes = combinedOnDupKey;
this.statementIndexes = mergedIndexes;
if (newRow.onDupKeyType == OnDuplicateKeyType.UPDATE_ONLY) {
this.onDupKeyType = OnDuplicateKeyType.UPDATE_ONLY;
}
statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
return true;
if (newRow.onDupKeyBytes == null) {
columnValues.putAll(newRow.columnValues);
colValuesSize += colValuesSizeDiff;
}

return totalSizeDiff;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.jdbc;

import java.sql.BatchUpdateException;
import org.apache.phoenix.schema.MutationLimitReachedException;

/**
* Thrown from executeBatch() when the mutation buffer limit is reached. The batch is automatically
* trimmed to contain only unprocessed items.
*/
public class MutationLimitBatchException extends BatchUpdateException {
private static final long serialVersionUID = 1L;

private final int processedCount;

/**
* @param updateCounts array of update counts for each statement in the batch
* @param cause the underlying MutationLimitReachedException
* @param processedCount number of statements successfully processed
*/
public MutationLimitBatchException(int[] updateCounts, MutationLimitReachedException cause,
int processedCount) {
super(cause.getMessage(), cause.getSQLState(), cause.getErrorCode(), updateCounts, cause);
this.processedCount = processedCount;
}

/**
* Returns the number of statements that were successfully processed before the limit was reached.
* The batch has been trimmed to contain only the remaining unprocessed items.
*/
public int getProcessedCount() {
return processedCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@
import org.apache.phoenix.schema.FunctionNotFoundException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
import org.apache.phoenix.schema.MutationLimitReachedException;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PDatum;
import org.apache.phoenix.schema.PIndexState;
Expand Down Expand Up @@ -2451,6 +2452,26 @@ public int[] executeBatch() throws SQLException {
connection.commit();
}
return returnCodes;
} catch (MutationLimitReachedException limitEx) {
// Special handling: limit reached but existing mutations preserved
// Statements 0 through i-1 were successfully added to MutationState
// Statement at index i was NOT added (its join was rejected by the pre-check)

// If original autoCommit was true, commit what we have buffered
if (autoCommit) {
connection.commit();
}

// Trim the batch list to contain only unprocessed items (from index i to end)
if (i > 0) {
batch.subList(0, i).clear();
}

// Mark the failed index
returnCodes[i] = Statement.EXECUTE_FAILED;

// Throw MutationLimitBatchException with checkpoint information
throw new MutationLimitBatchException(returnCodes, limitEx, i);
} catch (SQLException t) {
if (i == returnCodes.length) {
// Exception after for loop, perhaps in commit(), discard returnCodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public interface QueryServices extends SQLCloseable {
public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching";
public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
public static final String MAX_MUTATION_SIZE_BYTES_ATTRIB = "phoenix.mutate.maxSizeBytes";
public static final String PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED_ATTRIB =
"phoenix.mutate.preserveOnLimitExceeded";
public static final String HBASE_CLIENT_KEYVALUE_MAXSIZE = "hbase.client.keyvalue.maxsize";

public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true;
public static final int DEFAULT_MAX_MUTATION_SIZE = 500000;
public static final int DEFAULT_MAX_MUTATION_SIZE_BYTES = 104857600; // 100 Mb
public static final boolean DEFAULT_PRESERVE_MUTATIONS_ON_LIMIT_EXCEEDED = false;
public static final int DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE = 10485760; // 10 Mb
public static final boolean DEFAULT_USE_INDEXES = true; // Use indexes
public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows may be updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4866,9 +4866,9 @@ public MutationState addColumn(PTable table, List<ColumnDef> origColumnDefs,
/**
* To check if TTL is defined at any of the child below we are checking it at
* {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List, ColumnMutator, int, PTable, PTable, boolean)}
* level where in function
* {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], byte[], List, int)}
* we are already traversing through allDescendantViews.
* level where in function {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
* validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[],
* byte[], List, int)} we are already traversing through allDescendantViews.
*/
}

Expand Down
Loading