Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,15 @@
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest;
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
import org.apache.fluss.rpc.messages.InitWriterRequest;
import org.apache.fluss.rpc.messages.InitWriterResponse;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest;
import org.apache.fluss.rpc.messages.NotifyLakeTableOffsetRequest;
import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest;
import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
import org.apache.fluss.security.acl.AccessControlEntry;
Expand Down Expand Up @@ -1370,6 +1374,226 @@ void testTableExistsAuthorization() throws Exception {
rootAdmin.dropTable(testTablePath, true).get();
}

@Test
void testSnapshotManagementAuthorization() throws Exception {
// These RPCs are internal-only, so we test via direct gateway access
try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) {

TabletServerGateway guestTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0),
rpcClient,
TabletServerGateway.class);

CoordinatorGateway guestCoordinatorGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
rpcClient,
CoordinatorGateway.class);

// Test 1: notifyKvSnapshotOffset without WRITE permission
NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest();
notifyKvRequest.setTableId(1L);
notifyKvRequest.setBucketId(0);
notifyKvRequest.setCoordinatorEpoch(1);
notifyKvRequest.setMinRetainOffset(0L);
assertThatThrownBy(
() -> guestTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// Test 2: notifyLakeTableOffset without WRITE permission
NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest();
notifyLakeRequest.setCoordinatorEpoch(1);
assertThatThrownBy(
() -> guestTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// Test 3: commitKvSnapshot without WRITE permission
CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest();
commitKvRequest.setCompletedSnapshot(new byte[0]);
commitKvRequest.setCoordinatorEpoch(1);
commitKvRequest.setBucketLeaderEpoch(1);
assertThatThrownBy(
() -> guestCoordinatorGateway.commitKvSnapshot(commitKvRequest).get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// Test 4: commitLakeTableSnapshot without WRITE permission
CommitLakeTableSnapshotRequest commitLakeRequest = new CommitLakeTableSnapshotRequest();
assertThatThrownBy(
() ->
guestCoordinatorGateway
.commitLakeTableSnapshot(commitLakeRequest)
.get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));
}

// Test 5: Grant CLUSTER/WRITE permission and verify operations succeed
List<AclBinding> aclBindings =
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.WRITE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);

try (RpcClient authorizedRpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) {

TabletServerGateway authorizedTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0),
authorizedRpcClient,
TabletServerGateway.class);

CoordinatorGateway authorizedCoordinatorGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
authorizedRpcClient,
CoordinatorGateway.class);

// Test notifyKvSnapshotOffset with permission
NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest();
notifyKvRequest.setTableId(1L);
notifyKvRequest.setBucketId(0);
notifyKvRequest.setCoordinatorEpoch(1);
notifyKvRequest.setMinRetainOffset(0L);
Throwable thrown1 =
catchThrowable(
() ->
authorizedTabletGateway
.notifyKvSnapshotOffset(notifyKvRequest)
.get());
if (thrown1 != null) {
assertThat(thrown1).rootCause().isNotInstanceOf(AuthorizationException.class);
}

// Test notifyLakeTableOffset with permission
NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest();
notifyLakeRequest.setCoordinatorEpoch(1);
Throwable thrown2 =
catchThrowable(
() ->
authorizedTabletGateway
.notifyLakeTableOffset(notifyLakeRequest)
.get());
if (thrown2 != null) {
assertThat(thrown2).rootCause().isNotInstanceOf(AuthorizationException.class);
}

// Test commitKvSnapshot with permission
CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest();
commitKvRequest.setCompletedSnapshot(new byte[0]);
commitKvRequest.setCoordinatorEpoch(1);
commitKvRequest.setBucketLeaderEpoch(1);
Throwable thrown3 =
catchThrowable(
() ->
authorizedCoordinatorGateway
.commitKvSnapshot(commitKvRequest)
.get());
if (thrown3 != null) {
assertThat(thrown3).rootCause().isNotInstanceOf(AuthorizationException.class);
}

// Test commitLakeTableSnapshot with permission
CommitLakeTableSnapshotRequest commitLakeRequest = new CommitLakeTableSnapshotRequest();
Throwable thrown4 =
catchThrowable(
() ->
authorizedCoordinatorGateway
.commitLakeTableSnapshot(commitLakeRequest)
.get());
if (thrown4 != null) {
assertThat(thrown4).rootCause().isNotInstanceOf(AuthorizationException.class);
}
}

// Test 6: Verify internal sessions bypass authorization
TabletServerGateway internalTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("FLUSS").get(0),
FLUSS_CLUSTER_EXTENSION.getRpcClient(),
TabletServerGateway.class);

CoordinatorGateway internalCoordinatorGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("FLUSS"),
FLUSS_CLUSTER_EXTENSION.getRpcClient(),
CoordinatorGateway.class);

// Internal connections should NOT throw AuthorizationException
NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest();
notifyKvRequest.setTableId(1L);
notifyKvRequest.setBucketId(0);
notifyKvRequest.setCoordinatorEpoch(1);
notifyKvRequest.setMinRetainOffset(0L);
Throwable thrown5 =
catchThrowable(
() -> internalTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get());
if (thrown5 != null) {
assertThat(thrown5).rootCause().isNotInstanceOf(AuthorizationException.class);
}

NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest();
notifyLakeRequest.setCoordinatorEpoch(1);
Throwable thrown6 =
catchThrowable(
() -> internalTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get());
if (thrown6 != null) {
assertThat(thrown6).rootCause().isNotInstanceOf(AuthorizationException.class);
}

CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest();
commitKvRequest.setCompletedSnapshot(new byte[0]);
commitKvRequest.setCoordinatorEpoch(1);
commitKvRequest.setBucketLeaderEpoch(1);
Throwable thrown7 =
catchThrowable(
() -> internalCoordinatorGateway.commitKvSnapshot(commitKvRequest).get());
if (thrown7 != null) {
assertThat(thrown7).rootCause().isNotInstanceOf(AuthorizationException.class);
}

CommitLakeTableSnapshotRequest commitLakeRequest = new CommitLakeTableSnapshotRequest();
Throwable thrown8 =
catchThrowable(
() ->
internalCoordinatorGateway
.commitLakeTableSnapshot(commitLakeRequest)
.get());
if (thrown8 != null) {
assertThat(thrown8).rootCause().isNotInstanceOf(AuthorizationException.class);
}

// Cleanup
rootAdmin.dropAcls(Collections.singletonList(AclBindingFilter.ANY)).all().get();
}

private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
import static org.apache.fluss.security.acl.OperationType.WRITE;
import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.getGoalByType;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.addTableOffsetsToResponse;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath;
Expand Down Expand Up @@ -770,6 +771,9 @@ public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest request)
@Override
public CompletableFuture<CommitKvSnapshotResponse> commitKvSnapshot(
CommitKvSnapshotRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), WRITE, Resource.cluster());
}
CompletableFuture<CommitKvSnapshotResponse> response = new CompletableFuture<>();
// parse completed snapshot from request
byte[] completedSnapshotBytes = request.getCompletedSnapshot();
Expand Down Expand Up @@ -870,6 +874,9 @@ public CompletableFuture<PrepareLakeTableSnapshotResponse> prepareLakeTableSnaps
@Override
public CompletableFuture<CommitLakeTableSnapshotResponse> commitLakeTableSnapshot(
CommitLakeTableSnapshotRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), WRITE, Resource.cluster());
}
CompletableFuture<CommitLakeTableSnapshotResponse> response = new CompletableFuture<>();
eventManagerSupplier
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,9 @@ public CompletableFuture<NotifyRemoteLogOffsetsResponse> notifyRemoteLogOffsets(
@Override
public CompletableFuture<NotifyKvSnapshotOffsetResponse> notifyKvSnapshotOffset(
NotifyKvSnapshotOffsetRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), WRITE, Resource.cluster());
}
CompletableFuture<NotifyKvSnapshotOffsetResponse> response = new CompletableFuture<>();
replicaManager.notifyKvSnapshotOffset(
getNotifySnapshotOffsetData(request), response::complete);
Expand All @@ -441,6 +444,9 @@ public CompletableFuture<NotifyKvSnapshotOffsetResponse> notifyKvSnapshotOffset(
@Override
public CompletableFuture<NotifyLakeTableOffsetResponse> notifyLakeTableOffset(
NotifyLakeTableOffsetRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), WRITE, Resource.cluster());
}
CompletableFuture<NotifyLakeTableOffsetResponse> response = new CompletableFuture<>();
replicaManager.notifyLakeTableOffset(getNotifyLakeTableOffset(request), response::complete);
return response;
Expand Down
Loading