Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
import org.apache.fluss.rpc.messages.InitWriterRequest;
import org.apache.fluss.rpc.messages.InitWriterResponse;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrRequest;
import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest;
import org.apache.fluss.rpc.messages.StopReplicaRequest;
import org.apache.fluss.rpc.messages.UpdateMetadataRequest;
import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
import org.apache.fluss.security.acl.AccessControlEntry;
import org.apache.fluss.security.acl.AccessControlEntryFilter;
Expand Down Expand Up @@ -1394,6 +1398,171 @@ private static Configuration initConfig() {
return conf;
}

@Test
void testInternalReplicationControlAuthorization() throws Exception {
// These RPCs are internal-only, so we test via direct gateway access
try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) {

TabletServerGateway guestTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0),
rpcClient,
TabletServerGateway.class);

CoordinatorGateway guestCoordinatorGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
rpcClient,
CoordinatorGateway.class);

// Test 1: notifyLeaderAndIsr without WRITE permission
NotifyLeaderAndIsrRequest notifyRequest = new NotifyLeaderAndIsrRequest();
notifyRequest.setCoordinatorEpoch(1);
assertThatThrownBy(() -> guestTabletGateway.notifyLeaderAndIsr(notifyRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// Test 2: updateMetadata without WRITE permission
UpdateMetadataRequest updateRequest = new UpdateMetadataRequest();
updateRequest.setCoordinatorEpoch(1);
assertThatThrownBy(() -> guestTabletGateway.updateMetadata(updateRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// Test 3: stopReplica without WRITE permission
StopReplicaRequest stopRequest = new StopReplicaRequest();
stopRequest.setCoordinatorEpoch(1);
assertThatThrownBy(() -> guestTabletGateway.stopReplica(stopRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// Test 4: adjustIsr without WRITE permission
AdjustIsrRequest adjustRequest = new AdjustIsrRequest();
adjustRequest.setServerId(0);
assertThatThrownBy(() -> guestCoordinatorGateway.adjustIsr(adjustRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));
}

// Test 5: Grant CLUSTER/WRITE permission and verify operations succeed
List<AclBinding> aclBindings =
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.WRITE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);

try (RpcClient authorizedRpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) {

TabletServerGateway authorizedTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0),
authorizedRpcClient,
TabletServerGateway.class);

CoordinatorGateway authorizedCoordinatorGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
authorizedRpcClient,
CoordinatorGateway.class);

// Now with WRITE permission, operations should NOT throw AuthorizationException
// (they may fail for other reasons like invalid data, but not authorization)

NotifyLeaderAndIsrRequest authorizedNotifyRequest = new NotifyLeaderAndIsrRequest();
authorizedNotifyRequest.setCoordinatorEpoch(1);
Throwable notifyThrown =
catchThrowable(
() ->
authorizedTabletGateway
.notifyLeaderAndIsr(authorizedNotifyRequest)
.get());
if (notifyThrown != null) {
assertThat(notifyThrown).rootCause().isNotInstanceOf(AuthorizationException.class);
}

UpdateMetadataRequest authorizedUpdateRequest = new UpdateMetadataRequest();
authorizedUpdateRequest.setCoordinatorEpoch(1);
Throwable updateThrown =
catchThrowable(
() ->
authorizedTabletGateway
.updateMetadata(authorizedUpdateRequest)
.get());
if (updateThrown != null) {
assertThat(updateThrown).rootCause().isNotInstanceOf(AuthorizationException.class);
}

StopReplicaRequest authorizedStopRequest = new StopReplicaRequest();
authorizedStopRequest.setCoordinatorEpoch(1);
Throwable stopThrown =
catchThrowable(
() -> authorizedTabletGateway.stopReplica(authorizedStopRequest).get());
if (stopThrown != null) {
assertThat(stopThrown).rootCause().isNotInstanceOf(AuthorizationException.class);
}

AdjustIsrRequest authorizedAdjustRequest = new AdjustIsrRequest();
authorizedAdjustRequest.setServerId(0);
Throwable adjustThrown =
catchThrowable(
() ->
authorizedCoordinatorGateway
.adjustIsr(authorizedAdjustRequest)
.get());
if (adjustThrown != null) {
assertThat(adjustThrown).rootCause().isNotInstanceOf(AuthorizationException.class);
}
}

// Test 6: Verify internal sessions bypass authorization
TabletServerGateway internalTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("FLUSS").get(0),
FLUSS_CLUSTER_EXTENSION.getRpcClient(),
TabletServerGateway.class);

// Internal connection should NOT throw AuthorizationException
// (may fail for other reasons like invalid data, but not authorization)
NotifyLeaderAndIsrRequest internalNotifyRequest = new NotifyLeaderAndIsrRequest();
internalNotifyRequest.setCoordinatorEpoch(1);

// The request will likely fail due to invalid data, but importantly
// it should NOT fail with AuthorizationException
Throwable thrown =
catchThrowable(
() ->
internalTabletGateway
.notifyLeaderAndIsr(internalNotifyRequest)
.get());
if (thrown != null) {
assertThat(thrown).rootCause().isNotInstanceOf(AuthorizationException.class);
}
}

private void assertNoTableDescribeAuth(ThrowableAssert.ThrowingCallable callable) {
assertThatThrownBy(callable)
.cause()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
import static org.apache.fluss.security.acl.OperationType.WRITE;
import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.getGoalByType;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.addTableOffsetsToResponse;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath;
Expand Down Expand Up @@ -760,6 +761,9 @@ public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {
}

public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), WRITE, Resource.cluster());
}
CompletableFuture<AdjustIsrResponse> response = new CompletableFuture<>();
eventManagerSupplier
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ public CompletableFuture<GetTableStatsResponse> getTableStats(GetTableStatsReque
@Override
public CompletableFuture<NotifyLeaderAndIsrResponse> notifyLeaderAndIsr(
NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest) {
if (authorizer != null) {
authorizer.authorize(currentSession(), WRITE, Resource.cluster());
}
CompletableFuture<NotifyLeaderAndIsrResponse> response = new CompletableFuture<>();
List<NotifyLeaderAndIsrData> notifyLeaderAndIsrRequestData =
getNotifyLeaderAndIsrRequestData(notifyLeaderAndIsrRequest);
Expand All @@ -373,6 +376,9 @@ public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {

@Override
public CompletableFuture<UpdateMetadataResponse> updateMetadata(UpdateMetadataRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), WRITE, Resource.cluster());
}
int coordinatorEpoch =
request.hasCoordinatorEpoch()
? request.getCoordinatorEpoch()
Expand All @@ -385,6 +391,9 @@ public CompletableFuture<UpdateMetadataResponse> updateMetadata(UpdateMetadataRe
@Override
public CompletableFuture<StopReplicaResponse> stopReplica(
StopReplicaRequest stopReplicaRequest) {
if (authorizer != null) {
authorizer.authorize(currentSession(), WRITE, Resource.cluster());
}
CompletableFuture<StopReplicaResponse> response = new CompletableFuture<>();
replicaManager.stopReplicas(
stopReplicaRequest.getCoordinatorEpoch(),
Expand Down
Loading