Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@

import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
Expand Down Expand Up @@ -756,51 +757,22 @@ public void preWALRestore(
*/
private void replicateEditOnWALRestore(ReplicationLogGroup logGroup, WALKey logKey,
WALEdit logEdit) throws IOException {
ImmutableBytesPtr prevKey = null, currentKey = null;
Put put = null;
Delete del = null;
List<Cell> regularCells = new ArrayList<>();
for (Cell kv : logEdit.getCells()) {
if (kv instanceof IndexedKeyValue) {
IndexedKeyValue ikv = (IndexedKeyValue) kv;
logGroup.append(Bytes.toString(ikv.getIndexTable()), -1, ikv.getMutation());
} else {
// While we can generate a separate mutation for every cell that is part of the
// WAL edit and replicate each such mutation. Doing that will not be very efficient
// since a mutation can have large number of cells. Instead, we first group the
// cells belonging to the same row into a mutation and then replicate that
// mutation.
currentKey = new ImmutableBytesPtr(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
if (!currentKey.equals(prevKey)) {
if (put != null && !this.ignoreReplicationFilter.test(put)) {
logGroup.append(logKey.getTableName().getNameAsString(), -1, put);
}
if (del != null && !this.ignoreReplicationFilter.test(del)) {
logGroup.append(logKey.getTableName().getNameAsString(), -1, del);
}
// reset
put = null;
del = null;
}
if (kv.getType() == Cell.Type.Put) {
if (put == null) {
put = new Put(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
}
put.add(kv);
} else {
if (del == null) {
del = new Delete(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
}
del.add(kv);
}
prevKey = currentKey;
regularCells.add(kv);
}
}
// append the last one
if (put != null && !this.ignoreReplicationFilter.test(put)) {
logGroup.append(logKey.getTableName().getNameAsString(), -1, put);
}
if (del != null && !this.ignoreReplicationFilter.test(del)) {
logGroup.append(logKey.getTableName().getNameAsString(), -1, del);
if (!regularCells.isEmpty()) {
String tableName = logKey.getTableName().getNameAsString();
for (Mutation split : splitCellsIntoMutations(regularCells)) {
if (!this.ignoreReplicationFilter.test(split)) {
logGroup.append(tableName, -1, split);
}
}
}
logGroup.sync();
}
Expand Down Expand Up @@ -2632,6 +2604,52 @@ public static boolean isAtomicOperationComplete(OperationStatus status) {
return status.getOperationStatusCode() == SUCCESS && status.getResult() != null;
}

/**
* Splits cells into individual Put/Delete mutations grouped by (row key, put-vs-delete). HBase's
* checkAndMergeCPMutations merges coprocessor cells into the data mutation, so a single Put may
* contain Delete cells with different row keys (e.g., local index). This method recovers distinct
* mutations using the same grouping algorithm as HBase's ReplicationSink.
*/
private static boolean isNewRowOrType(Cell previousCell, Cell cell) {
return previousCell == null || previousCell.getType() != cell.getType()
|| !CellUtil.matchingRows(previousCell, cell);
}

static List<Mutation> splitCellsIntoMutations(Iterable<Cell> cells) throws IOException {
List<Mutation> result = new ArrayList<>();
Cell previousCell = null;
Mutation current = null;
for (Cell cell : cells) {
if (isNewRowOrType(previousCell, cell)) {
if (current != null) {
result.add(current);
}
if (CellUtil.isDelete(cell)) {
current = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
} else {
current = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
}
if (CellUtil.isDelete(cell)) {
((Delete) current).add(cell);
} else {
((Put) current).add(cell);
}
previousCell = cell;
}
if (current != null) {
result.add(current);
}
return result;
}

static List<Mutation> splitCellsIntoMutations(Mutation merged) throws IOException {
if (merged.isEmpty()) {
return Collections.singletonList(merged);
}
return splitCellsIntoMutations(Iterables.concat(merged.getFamilyCellMap().values()));
}

private void replicateMutations(RegionCoprocessorEnvironment env,
MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context)
throws IOException {
Expand All @@ -2647,17 +2665,21 @@ private void replicateMutations(RegionCoprocessorEnvironment env,
if (!logGroup.isPresent()) {
return;
}
ReplicationLogGroup group = logGroup.get();

for (Integer i = 0; i < miniBatchOp.size(); i++) {
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
if (this.ignoreReplicationFilter.test(m)) {
continue;
}
logGroup.get().append(this.dataTableName, -1, m);
Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(i);
if (mutationsAddedByCP != null) {
for (Mutation addedMutation : mutationsAddedByCP) {
logGroup.get().append(this.dataTableName, -1, addedMutation);
// When coprocessors add cells (local index, conditional TTL, ON DUPLICATE KEY UPDATE),
// HBase merges them into the data mutation which can mix row keys and cell types.
// Split those back into individual Put/Delete mutations for correct serialization.
if (miniBatchOp.getOperationsFromCoprocessors(i) == null) {
group.append(this.dataTableName, -1, m);
} else {
for (Mutation split : splitCellsIntoMutations(m)) {
group.append(this.dataTableName, -1, split);
}
}
}
Expand All @@ -2667,7 +2689,7 @@ private void replicateMutations(RegionCoprocessorEnvironment env,
if (this.ignoreReplicationFilter.test(entry.getValue())) {
continue;
}
logGroup.get().append(entry.getKey().getTableName(), -1, entry.getValue());
group.append(entry.getKey().getTableName(), -1, entry.getValue());
}
}
if (context.postIndexUpdates != null) {
Expand All @@ -2676,9 +2698,9 @@ private void replicateMutations(RegionCoprocessorEnvironment env,
if (this.ignoreReplicationFilter.test(entry.getValue())) {
continue;
}
logGroup.get().append(entry.getKey().getTableName(), -1, entry.getValue());
group.append(entry.getKey().getTableName(), -1, entry.getValue());
}
}
logGroup.get().sync();
group.sync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,14 @@ public class ReplicationLogGroup {
public static final String STANDBY_DIR = "in";
public static final String FALLBACK_DIR = "out";

/** Cache of ReplicationLogGroup instances by HA Group ID */
/** Cache of ReplicationLogGroup instances by server name + HA Group name */
protected static final ConcurrentHashMap<String, ReplicationLogGroup> INSTANCES =
new ConcurrentHashMap<>();

private static String instanceKey(ServerName serverName, String haGroupName) {
return serverName.getServerName() + "|" + haGroupName;
}

protected final Configuration conf;
protected final ServerName serverName;
protected final String haGroupName;
Expand Down Expand Up @@ -362,7 +366,7 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName,
public static ReplicationLogGroup get(Configuration conf, ServerName serverName,
String haGroupName, Abortable abortable) throws IOException {
try {
return INSTANCES.computeIfAbsent(haGroupName, k -> {
return INSTANCES.computeIfAbsent(instanceKey(serverName, haGroupName), k -> {
try {
ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName,
HAGroupStoreManager.getInstance(conf), abortable);
Expand Down Expand Up @@ -390,7 +394,7 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName,
public static ReplicationLogGroup get(Configuration conf, ServerName serverName,
String haGroupName, HAGroupStoreManager haGroupStoreManager) throws IOException {
try {
return INSTANCES.computeIfAbsent(haGroupName, k -> {
return INSTANCES.computeIfAbsent(instanceKey(serverName, haGroupName), k -> {
try {
ReplicationLogGroup group =
new ReplicationLogGroup(conf, serverName, haGroupName, haGroupStoreManager);
Expand Down Expand Up @@ -654,7 +658,7 @@ public void close() {
return;
}
LOG.info("Closing HAGroup {}", this);
INSTANCES.remove(haGroupName);
INSTANCES.remove(instanceKey(serverName, haGroupName));
try {
disruptor.shutdown(shutdownTimeoutMs, TimeUnit.MILLISECONDS);
} catch (com.lmax.disruptor.TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -68,6 +69,7 @@
* | | PER-CELL DATA (repeated) | |
* | | +––––––––––––----------------–--–+ | |
* | | | CELL TIMESTAMP (long) | | |
* | | | CELL TYPE (byte) | | |
* | | | COLUMN QUALIFIER LENGTH (vint) | | |
* | | | COLUMN QUALIFIER (byte[]) | | |
* | | | VALUE LENGTH (vint) | | |
Expand Down Expand Up @@ -141,6 +143,7 @@ public void write(LogFile.Record record) throws IOException {
WritableUtils.writeVInt(recordOut, cells.size());
for (Cell cell : cells) {
recordOut.writeLong(cell.getTimestamp());
recordOut.writeByte(cell.getTypeByte());
WritableUtils.writeVInt(recordOut, cell.getQualifierLength());
recordOut.write(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
Expand Down Expand Up @@ -205,9 +208,6 @@ public boolean advance() throws IOException {
mutation = new Put(rowKey);
break;
case DELETE:
case DELETEFAMILYVERSION:
case DELETECOLUMN:
case DELETEFAMILY:
mutation = new Delete(rowKey);
break;
default:
Expand All @@ -229,6 +229,8 @@ public boolean advance() throws IOException {
for (int j = 0; j < columnValuePairsCount; j++) {
// Cell timestamp
long cellTs = in.readLong();
// Cell type byte
byte cellTypeByte = in.readByte();
// Qualifier name
int qualLen = WritableUtils.readVInt(in);
byte[] qual = new byte[qualLen];
Expand All @@ -241,22 +243,12 @@ public boolean advance() throws IOException {
if (valueLen > 0) {
in.readFully(value);
}
switch (type) {
case PUT:
((Put) mutation).addColumn(cf, qual, cellTs, value);
break;
case DELETE:
case DELETECOLUMN:
((Delete) mutation).addColumn(cf, qual, cellTs);
break;
case DELETEFAMILYVERSION:
((Delete) mutation).addFamilyVersion(cf, cellTs);
break;
case DELETEFAMILY:
((Delete) mutation).addFamily(cf, cellTs);
break;
default:
throw new UnsupportedOperationException("Unhandled mutation type " + type);
Cell cell = new KeyValue(rowKey, 0, rowKey.length, cf, 0, cf.length, qual, 0,
qual.length, cellTs, KeyValue.Type.codeToType(cellTypeByte), value, 0, value.length);
if (mutation instanceof Put) {
((Put) mutation).add(cell);
} else {
((Delete) mutation).add(cell);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
*/
package org.apache.phoenix.replication.log;

import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -114,16 +111,9 @@ public String toString() {

// Internals only below. Not for LogFile interface consumer use.

/**
* The Phoenix concept of HBase mutation type, which is currently a 1:1 mapping with HBase's, with
* different code values (they don't need to match), but may potentially diverge in the future.
*/
protected enum MutationType {
PUT(1),
DELETE(2),
DELETEFAMILYVERSION(3),
DELETECOLUMN(4),
DELETEFAMILY(5);
DELETE(2);

private int code;

Expand All @@ -135,33 +125,12 @@ int getCode() {
return code;
}

static MutationType get(Mutation mutation) throws IOException {
static MutationType get(Mutation mutation) {
if (mutation instanceof Put) {
return PUT;
} else if (mutation instanceof Delete) {
CellScanner s = mutation.cellScanner();
if (!s.advance()) {
// No cell in delete. A simple delete of a row.
return DELETE;
}
// This assumes that either there is only one cell in the Delete, or all cells in
// the delete have the same cell type, which is correct as of today. We only need
// to look at the first.
Cell cell = s.current();
switch (cell.getType()) {
case Delete:
return DELETE;
case DeleteFamilyVersion:
return DELETEFAMILYVERSION;
case DeleteColumn:
return DELETECOLUMN;
case DeleteFamily:
return DELETEFAMILY;
default:
// Fall through to throw the UnsupportedOperationException
break;
}
} // Fall through to throw the UnsupportedOperationException
return DELETE;
}
throw new UnsupportedOperationException("Unsupported mutation type: " + mutation);
}

Expand Down
Loading