Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,32 @@ public interface AsyncBufferedMutatorBuilder {
AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit);

/**
* Set a rpc request attribute.
* Sets a request attribute. Ignored if a factory is set via
* {@link #setRequestAttributesFactory(RequestAttributesFactory)}.
* @deprecated Since 3.0.0, will be removed in 4.0.0. Please use
* {@link #setRequestAttributesFactory(RequestAttributesFactory)} instead.
*/
@Deprecated
AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value);

/**
* Set multiple rpc request attributes.
* Sets multiple request attributes. Ignored if a factory is set via
* @deprecated Since 3.0.0, will be removed in 4.0.0. Please use
* {@link #setRequestAttributesFactory(RequestAttributesFactory)} instead.
*/
@Deprecated
AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes);

/**
* Sets the factory for creating request attributes. Use {@link FixedRequestAttributesFactory} for
* attributes that do not change, or implement {@link RequestAttributesFactory} for dynamic
* attributes.
*/
default AsyncBufferedMutatorBuilder
setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ public AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requ
return this;
}

@Override
public AsyncBufferedMutatorBuilder
setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) {
tableBuilder.setRequestAttributesFactory(requestAttributesFactory);
return this;
}

@Override
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
long getScanTimeout(TimeUnit unit);

/**
* Get the map of request attributes
* @return a map of request attributes supplied by the client
* Returns the request attributes for this call. The attributes are generated by the configured
* {@link RequestAttributesFactory}, so each invocation may return different values if a dynamic
* factory is in use.
* @return the request attributes
*/
default Map<String, byte[]> getRequestAttributes() {
return Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,28 @@ default AsyncTableBuilder<C> setMaxRetries(int maxRetries) {
AsyncTableBuilder<C> setStartLogErrorsCnt(int startLogErrorsCnt);

/**
* Set a request attribute
* Sets a request attribute. Ignored if a factory is set via
* {@link #setRequestAttributesFactory(RequestAttributesFactory)}.
* @param key the attribute key
* @param value the attribute value
* @deprecated Since 3.0.0, will be removed in 4.0.0. Please use
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think master is 4.0.0; should it get deprecated for a later version or should I remove it from master?

* {@link #setRequestAttributesFactory(RequestAttributesFactory)} instead.
*/
@Deprecated
AsyncTableBuilder<C> setRequestAttribute(String key, byte[] value);

/**
* Sets the factory for creating request attributes. Use {@link FixedRequestAttributesFactory} for
* attributes that do not change, or implement {@link RequestAttributesFactory} for dynamic
* attributes.
* @param requestAttributesFactory the factory to use
* @return this builder
*/
default AsyncTableBuilder<C>
setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Create the {@link AsyncTable} instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -53,7 +50,9 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>

protected int startLogErrorsCnt;

protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
protected FixedRequestAttributesFactory.Builder fixedRequestAttributesFactoryBuilder = null;

protected RequestAttributesFactory requestAttributesFactory = null;

AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) {
this.tableName = tableName;
Expand Down Expand Up @@ -129,10 +128,27 @@ public AsyncTableBuilderBase<C> setStartLogErrorsCnt(int startLogErrorsCnt) {

@Override
public AsyncTableBuilder<C> setRequestAttribute(String key, byte[] value) {
if (requestAttributes.isEmpty()) {
requestAttributes = new HashMap<>();
if (fixedRequestAttributesFactoryBuilder == null) {
fixedRequestAttributesFactoryBuilder = FixedRequestAttributesFactory.newBuilder();
}
requestAttributes.put(key, value);
fixedRequestAttributesFactoryBuilder.setAttribute(key, value);
return this;
}

@Override
public AsyncTableBuilder<C>
setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) {
this.requestAttributesFactory = requestAttributesFactory;
return this;
}

RequestAttributesFactory getRequestAttributesFactory() {
if (requestAttributesFactory != null) {
return requestAttributesFactory;
} else if (fixedRequestAttributesFactoryBuilder != null) {
return fixedRequestAttributesFactoryBuilder.build();
} else {
return FixedRequestAttributesFactory.EMPTY;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.collect.Maps;

/**
* Parameters for instantiating a {@link BufferedMutator}.
*/
Expand All @@ -44,7 +40,8 @@ public class BufferedMutatorParams implements Cloneable {
private int rpcTimeout = UNSET;
private int operationTimeout = UNSET;
private int maxMutations = UNSET;
protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
protected FixedRequestAttributesFactory.Builder fixedRequestAttributesFactoryBuilder = null;
protected RequestAttributesFactory requestAttributesFactory = null;
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException exception,
Expand Down Expand Up @@ -109,16 +106,55 @@ public int getMaxMutations() {
return maxMutations;
}

/**
* Sets a request attribute. Ignored if a factory is set via
* {@link #setRequestAttributesFactory(RequestAttributesFactory)}.
* @deprecated Since 3.0.0, will be removed in 4.0.0. Please use
* {@link #setRequestAttributesFactory(RequestAttributesFactory)} instead.
*/
@Deprecated
public BufferedMutatorParams setRequestAttribute(String key, byte[] value) {
if (requestAttributes.isEmpty()) {
requestAttributes = new HashMap<>();
if (fixedRequestAttributesFactoryBuilder == null) {
fixedRequestAttributesFactoryBuilder = FixedRequestAttributesFactory.newBuilder();
}
requestAttributes.put(key, value);
fixedRequestAttributesFactoryBuilder.setAttribute(key, value);
return this;
}

/**
* Returns *only* the request attributes added by {@link #setRequestAttribute(String, byte[])}.
* @deprecated Since 3.0.0, will be removed in 4.0.0. Please use
* {@link #getRequestAttributesFactory()} instead.
*/
public Map<String, byte[]> getRequestAttributes() {
return requestAttributes;
if (fixedRequestAttributesFactoryBuilder == null) {
return null;
}
return fixedRequestAttributesFactoryBuilder.getAttributes();
}

/**
* Sets the factory for creating request attributes. Use {@link FixedRequestAttributesFactory} for
* attributes that do not change, or implement {@link RequestAttributesFactory} for dynamic
* attributes.
*/
public BufferedMutatorParams
setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) {
this.requestAttributesFactory = requestAttributesFactory;
return this;
}

/**
* Returns the request attributes factory.
*/
public RequestAttributesFactory getRequestAttributesFactory() {
if (requestAttributesFactory != null) {
return requestAttributesFactory;
} else if (fixedRequestAttributesFactoryBuilder != null) {
return fixedRequestAttributesFactoryBuilder.build();
} else {
return FixedRequestAttributesFactory.EMPTY;
}
}

/**
Expand Down Expand Up @@ -243,7 +279,12 @@ public BufferedMutatorParams clone() {
clone.writeBufferPeriodicFlushTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
clone.maxKeyValueSize = this.maxKeyValueSize;
clone.maxMutations = this.maxMutations;
clone.requestAttributes = Maps.newHashMap(this.requestAttributes);
clone.requestAttributesFactory = this.requestAttributesFactory;
if (fixedRequestAttributesFactoryBuilder != null) {
clone.fixedRequestAttributesFactoryBuilder = FixedRequestAttributesFactory.newBuilder();
fixedRequestAttributesFactoryBuilder.getAttributes()
.forEach(clone.fixedRequestAttributesFactoryBuilder::setAttribute);
}
clone.pool = this.pool;
clone.listener = this.listener;
clone.implementationClassName = this.implementationClassName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,7 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I
if (params.getMaxMutations() != BufferedMutatorParams.UNSET) {
builder.setMaxMutations(params.getMaxMutations());
}
if (!params.getRequestAttributes().isEmpty()) {

builder.setRequestAttributes(params.getRequestAttributes());
}
builder.setRequestAttributesFactory(params.getRequestAttributesFactory());
return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
}

Expand Down Expand Up @@ -200,8 +197,8 @@ public Table build() {
conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS)
.setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
.setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS)
.setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS);
requestAttributes.forEach(tableBuilder::setRequestAttribute);
.setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS)
.setRequestAttributesFactory(getRequestAttributesFactory());
return new TableOverAsyncTable(conn, tableBuilder.build(), poolSupplier);
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A {@link RequestAttributesFactory} that returns a fixed set of attributes for every call. Use
* this when attributes are fixed and do not change.
* @see AsyncTableBuilder#setRequestAttributesFactory(RequestAttributesFactory)
*/
@InterfaceAudience.Public
public final class FixedRequestAttributesFactory implements RequestAttributesFactory {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other ideas I had were ImmutableRequestAttributesFactory or StaticRequestAttributesFactory, but I think FixedRequestAttributesFactory is most clear because Immutable- might be interpreted as "the returned collection is immutable" instead of that it will return the same collection every time; and Static- clashes with the reserved word in my opinion


/**
* A factory that always returns an empty map.
*/
public static final RequestAttributesFactory EMPTY = Collections::emptyMap;

/**
* Builder for creating {@link FixedRequestAttributesFactory} instances.
*/
public static final class Builder {
private final Map<String, byte[]> requestAttributes = new LinkedHashMap<>();

/**
* Sets a request attribute. If value is null, the attribute is removed.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying Protobuf ByteString will throw a NPE if value is null, so instead this factory removes the entry if it's null.

* @param key the attribute key
* @param value the attribute value, or null to remove
* @return this builder
*/
public Builder setAttribute(String key, byte[] value) {
if (value == null) {
requestAttributes.remove(key);
} else {
requestAttributes.put(key, value);
}
return this;
}

/**
* Gets the accumulated request attributes.
*/
public Map<String, byte[]> getAttributes() {
return Collections.unmodifiableMap(requestAttributes);
}

/**
* Builds a {@link FixedRequestAttributesFactory} with the configured attributes.
* @return the factory
*/
public FixedRequestAttributesFactory build() {
return new FixedRequestAttributesFactory(new LinkedHashMap<>(requestAttributes));
}
}

/**
* Returns a new builder.
* @return a new builder instance
*/
public static Builder newBuilder() {
return new Builder();
}

private final Map<String, byte[]> requestAttributes;

private FixedRequestAttributesFactory(Map<String, byte[]> requestAttributes) {
this.requestAttributes = Collections.unmodifiableMap(requestAttributes);
}

@Override
public Map<String, byte[]> create() {
return requestAttributes;
}
}
Loading
Loading