Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -477,19 +476,18 @@ protected void handleCoprocessorThrowable(final E env, final Throwable e) throws
}
}

/**
* Used to limit legacy handling to once per Coprocessor class per classloader.
*/
private static final Set<Class<? extends Coprocessor>> legacyWarning =
new ConcurrentSkipListSet<>(new Comparator<Class<? extends Coprocessor>>() {
@Override
public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
if (c1.equals(c2)) {
return 0;
}
return c1.getName().compareTo(c2.getName());
}
});
protected static Optional<ObserverRpcCallContext> createObserverRpcCallContext() {
return createObserverRpcCallContext(RpcServer.getRequestUser().orElse(null));
}

protected static Optional<ObserverRpcCallContext> createObserverRpcCallContext(User user) {
if (user == null) {
/* not in RPC context */
return Optional.empty();
} else {
return Optional.of(new ObserverRpcCallContextImpl(user, RpcServer.getConnectionAttributes()));
}
}

/**
* Implementations defined function to get an observer of type {@code O} from a coprocessor of
Expand All @@ -506,19 +504,17 @@ private abstract class ObserverOperation<O> extends ObserverContextImpl<E> {
ObserverGetter<C, O> observerGetter;

ObserverOperation(ObserverGetter<C, O> observerGetter) {
this(observerGetter, null);
}

ObserverOperation(ObserverGetter<C, O> observerGetter, User user) {
this(observerGetter, user, false);
this(observerGetter, createObserverRpcCallContext());
}

ObserverOperation(ObserverGetter<C, O> observerGetter, boolean bypassable) {
this(observerGetter, null, bypassable);
ObserverOperation(ObserverGetter<C, O> observerGetter,
Optional<ObserverRpcCallContext> rpcCallContext) {
this(observerGetter, rpcCallContext, false);
}

ObserverOperation(ObserverGetter<C, O> observerGetter, User user, boolean bypassable) {
super(user != null ? user : RpcServer.getRequestUser().orElse(null), bypassable);
ObserverOperation(ObserverGetter<C, O> observerGetter,
Optional<ObserverRpcCallContext> rpcCallContext, boolean bypassable) {
super(rpcCallContext, bypassable);
this.observerGetter = observerGetter;
}

Expand All @@ -538,13 +534,14 @@ public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter) {
super(observerGetter);
}

public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user) {
super(observerGetter, user);
public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter,
Optional<ObserverRpcCallContext> rpcCallContext) {
super(observerGetter, rpcCallContext);
}

public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user,
boolean bypassable) {
super(observerGetter, user, bypassable);
public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter,
Optional<ObserverRpcCallContext> rpcCallContext, boolean bypassable) {
super(observerGetter, rpcCallContext, bypassable);
}

/**
Expand Down Expand Up @@ -572,16 +569,17 @@ public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result

public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
boolean bypassable) {
this(observerGetter, result, null, bypassable);
this(observerGetter, result, createObserverRpcCallContext(), bypassable);
}

public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user) {
this(observerGetter, result, user, false);
public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
Optional<ObserverRpcCallContext> rpcCallContext) {
this(observerGetter, result, rpcCallContext, false);
}

private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user,
boolean bypassable) {
super(observerGetter, user, bypassable);
private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
Optional<ObserverRpcCallContext> rpcCallContext, boolean bypassable) {
super(observerGetter, rpcCallContext, bypassable);
this.result = result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,20 @@ public interface ObserverContext<E extends CoprocessorEnvironment> {
void bypass();

/**
* Returns the active user for the coprocessor call. If an explicit {@code User} instance was
* provided to the constructor, that will be returned, otherwise if we are in the context of an
* RPC call, the remote user is used. May not be present if the execution is outside of an RPC
* context.
* Returns the {@link ObserverRpcCallContext} of an RPC call. May not be present if the execution
* is outside an RPC context.
* @return the context.
*/
Optional<User> getCaller();
Optional<ObserverRpcCallContext> getRpcCallContext();

/**
* Returns the active user for the coprocessor call. May not be present if the execution is
* outside an RPC context.
* @return the {@link User}.
* @deprecated will be removed in 4.0.0. Use {@link #getRpcCallContext()} instead.
*/
@Deprecated
default Optional<User> getCaller() {
return getRpcCallContext().map(ObserverRpcCallContext::getUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.coprocessor;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcServer;
Expand All @@ -35,14 +37,10 @@ public class ObserverContextImpl<E extends CoprocessorEnvironment> implements Ob
* Is this operation bypassable?
*/
private final boolean bypassable;
private final User caller;
private final Optional<ObserverRpcCallContext> rpcCallContext;

public ObserverContextImpl(User caller) {
this(caller, false);
}

public ObserverContextImpl(User caller, boolean bypassable) {
this.caller = caller;
public ObserverContextImpl(Optional<ObserverRpcCallContext> rpcCallContext, boolean bypassable) {
this.rpcCallContext = Objects.requireNonNull(rpcCallContext, "rpcCallContext cannot be null.");
this.bypassable = bypassable;
}

Expand Down Expand Up @@ -83,8 +81,8 @@ public boolean shouldBypass() {
}

@Override
public Optional<User> getCaller() {
return Optional.ofNullable(caller);
public Optional<ObserverRpcCallContext> getRpcCallContext() {
return rpcCallContext;
}

/**
Expand All @@ -98,7 +96,11 @@ public Optional<User> getCaller() {
@Deprecated
// TODO: Remove this method, ObserverContext should not depend on RpcServer
public static <E extends CoprocessorEnvironment> ObserverContext<E> createAndPrepare(E env) {
ObserverContextImpl<E> ctx = new ObserverContextImpl<>(RpcServer.getRequestUser().orElse(null));
Optional<User> user = RpcServer.getRequestUser();
Optional<ObserverRpcCallContext> rpcCallContext =
user.map(value -> new ObserverRpcCallContextImpl(value, Map.of()));

ObserverContextImpl<E> ctx = new ObserverContextImpl<>(rpcCallContext, false);
ctx.prepare(env);
return ctx;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;

import java.util.Map;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

/**
* RPC Call parameters for coprocessor context.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public interface ObserverRpcCallContext {
/**
* Returns the active user for the coprocessor call.
* @return the {@link User}, it must not be {@code null}.
*/
User getUser();

/**
* Returns the connection attributes for the coprocessor call. These parameters are passed by the
* client through {@code ConnectionHeader} protobuf.
* @return the attributes, it must not be {@code null}.
*/
Map<String, byte[]> getAttributes();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;

import java.util.Map;
import java.util.Objects;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class ObserverRpcCallContextImpl implements ObserverRpcCallContext {
private final User user;
private final Map<String, byte[]> attributes;

public ObserverRpcCallContextImpl(User user, Map<String, byte[]> attributes) {
this.user = Objects.requireNonNull(user, "user must not be null.");
this.attributes = Objects.requireNonNull(attributes, "attributes must not be null.");
}

@Override
public User getUser() {
return user;
}

@Override
public Map<String, byte[]> getAttributes() {
return attributes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ExtendedCellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,19 @@ public RpcCoprocessor checkAndGetInstance(Class<?> implClass)

abstract class RpcObserverOperation extends ObserverOperationWithoutResult<RpcObserver> {
public RpcObserverOperation() {
super(rpcObserverGetter);
this(null);
}

public RpcObserverOperation(boolean bypassable) {
this(null, bypassable);
public RpcObserverOperation(User user) {
this(user, false);
}

public RpcObserverOperation(User user) {
super(rpcObserverGetter, user);
public RpcObserverOperation(boolean bypassable) {
this(null, bypassable);
}

public RpcObserverOperation(User user, boolean bypassable) {
super(rpcObserverGetter, user, bypassable);
super(rpcObserverGetter, createObserverRpcCallContext(user), bypassable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ public static Optional<User> getRequestUser() {
return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
}

public static Map<String, byte[]> getConnectionAttributes() {
return getCurrentCall().map(RpcCall::getConnectionAttributes).orElse(Map.of());
}

/**
* The number of open RPC conections
* @return the number of open rpc connections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,19 @@ public MasterCoprocessor checkAndGetInstance(Class<?> implClass)

abstract class MasterObserverOperation extends ObserverOperationWithoutResult<MasterObserver> {
public MasterObserverOperation() {
super(masterObserverGetter);
this(null);
}

public MasterObserverOperation(boolean bypassable) {
this(null, bypassable);
public MasterObserverOperation(User user) {
this(user, false);
}

public MasterObserverOperation(User user) {
super(masterObserverGetter, user);
public MasterObserverOperation(boolean bypassable) {
this(null, bypassable);
}

public MasterObserverOperation(User user, boolean bypassable) {
super(masterObserverGetter, user, bypassable);
super(masterObserverGetter, createObserverRpcCallContext(user), bypassable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,26 +493,26 @@ public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
abstract class RegionObserverOperationWithoutResult
extends ObserverOperationWithoutResult<RegionObserver> {
public RegionObserverOperationWithoutResult() {
super(regionObserverGetter);
this(null);
}

public RegionObserverOperationWithoutResult(User user) {
super(regionObserverGetter, user);
this(user, false);
}

public RegionObserverOperationWithoutResult(boolean bypassable) {
super(regionObserverGetter, null, bypassable);
this(null, bypassable);
}

public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
super(regionObserverGetter, user, bypassable);
super(regionObserverGetter, createObserverRpcCallContext(user), bypassable);
}
}

abstract class BulkLoadObserverOperation
extends ObserverOperationWithoutResult<BulkLoadObserver> {
public BulkLoadObserverOperation(User user) {
super(RegionCoprocessor::getBulkLoadObserver, user);
super(RegionCoprocessor::getBulkLoadObserver, createObserverRpcCallContext(user));
}
}

Expand Down Expand Up @@ -678,7 +678,7 @@ public InternalScanner preCompact(final HStore store, final InternalScanner scan
return defaultResult;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
regionObserverGetter, defaultResult, user) {
regionObserverGetter, defaultResult, createObserverRpcCallContext(user)) {
@Override
public InternalScanner call(RegionObserver observer) throws IOException {
InternalScanner scanner =
Expand Down
Loading