Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
import org.apache.fluss.rpc.messages.InitWriterRequest;
import org.apache.fluss.rpc.messages.InitWriterResponse;
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest;
import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest;
import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
import org.apache.fluss.security.acl.AccessControlEntry;
Expand Down Expand Up @@ -1370,6 +1373,221 @@ void testTableExistsAuthorization() throws Exception {
rootAdmin.dropTable(testTablePath, true).get();
}

@Test
void testRemoteLogAndTieringAuthorization() throws Exception {
// These RPCs are internal-only, so we test via direct gateway access
try (RpcClient rpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) {

TabletServerGateway guestTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0),
rpcClient,
TabletServerGateway.class);

CoordinatorGateway guestCoordinatorGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
rpcClient,
CoordinatorGateway.class);

// Test 1: notifyRemoteLogOffsets without WRITE permission
NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest();
notifyRemoteRequest.setTableId(1L);
notifyRemoteRequest.setBucketId(0);
notifyRemoteRequest.setCoordinatorEpoch(1);
notifyRemoteRequest.setRemoteStartOffset(0L);
notifyRemoteRequest.setRemoteEndOffset(100L);
assertThatThrownBy(
() ->
guestTabletGateway
.notifyRemoteLogOffsets(notifyRemoteRequest)
.get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// Test 2: commitRemoteLogManifest without WRITE permission
CommitRemoteLogManifestRequest commitManifestRequest =
new CommitRemoteLogManifestRequest();
commitManifestRequest.setTableId(1L);
commitManifestRequest.setBucketId(0);
commitManifestRequest.setRemoteLogManifestPath("/path/to/manifest");
commitManifestRequest.setRemoteLogStartOffset(0L);
commitManifestRequest.setRemoteLogEndOffset(100L);
commitManifestRequest.setCoordinatorEpoch(1);
commitManifestRequest.setBucketLeaderEpoch(1);
assertThatThrownBy(
() ->
guestCoordinatorGateway
.commitRemoteLogManifest(commitManifestRequest)
.get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));

// Test 3: lakeTieringHeartbeat without WRITE permission
LakeTieringHeartbeatRequest heartbeatRequest = new LakeTieringHeartbeatRequest();
assertThatThrownBy(
() ->
guestCoordinatorGateway
.lakeTieringHeartbeat(heartbeatRequest)
.get())
.rootCause()
.isInstanceOf(AuthorizationException.class)
.hasMessageContaining(
String.format(
"Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));
}

// Test 4: Grant CLUSTER/WRITE permission and verify operations succeed
List<AclBinding> aclBindings =
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
"*",
OperationType.WRITE,
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);

try (RpcClient authorizedRpcClient =
RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) {

TabletServerGateway authorizedTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0),
authorizedRpcClient,
TabletServerGateway.class);

CoordinatorGateway authorizedCoordinatorGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"),
authorizedRpcClient,
CoordinatorGateway.class);

// Test notifyRemoteLogOffsets with permission
NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest();
notifyRemoteRequest.setTableId(1L);
notifyRemoteRequest.setBucketId(0);
notifyRemoteRequest.setCoordinatorEpoch(1);
notifyRemoteRequest.setRemoteStartOffset(0L);
notifyRemoteRequest.setRemoteEndOffset(100L);
Throwable thrown1 =
catchThrowable(
() ->
authorizedTabletGateway
.notifyRemoteLogOffsets(notifyRemoteRequest)
.get());
if (thrown1 != null) {
assertThat(thrown1).rootCause().isNotInstanceOf(AuthorizationException.class);
}

// Test commitRemoteLogManifest with permission
CommitRemoteLogManifestRequest commitManifestRequest =
new CommitRemoteLogManifestRequest();
commitManifestRequest.setTableId(1L);
commitManifestRequest.setBucketId(0);
commitManifestRequest.setRemoteLogManifestPath("/path/to/manifest");
commitManifestRequest.setRemoteLogStartOffset(0L);
commitManifestRequest.setRemoteLogEndOffset(100L);
commitManifestRequest.setCoordinatorEpoch(1);
commitManifestRequest.setBucketLeaderEpoch(1);
Throwable thrown2 =
catchThrowable(
() ->
authorizedCoordinatorGateway
.commitRemoteLogManifest(commitManifestRequest)
.get());
if (thrown2 != null) {
assertThat(thrown2).rootCause().isNotInstanceOf(AuthorizationException.class);
}

// Test lakeTieringHeartbeat with permission
LakeTieringHeartbeatRequest heartbeatRequest = new LakeTieringHeartbeatRequest();
Throwable thrown3 =
catchThrowable(
() ->
authorizedCoordinatorGateway
.lakeTieringHeartbeat(heartbeatRequest)
.get());
if (thrown3 != null) {
assertThat(thrown3).rootCause().isNotInstanceOf(AuthorizationException.class);
}
}

// Test 5: Verify internal sessions bypass authorization
TabletServerGateway internalTabletGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("FLUSS").get(0),
FLUSS_CLUSTER_EXTENSION.getRpcClient(),
TabletServerGateway.class);

CoordinatorGateway internalCoordinatorGateway =
GatewayClientProxy.createGatewayProxy(
() -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("FLUSS"),
FLUSS_CLUSTER_EXTENSION.getRpcClient(),
CoordinatorGateway.class);

// Internal connections should NOT throw AuthorizationException
NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest();
notifyRemoteRequest.setTableId(1L);
notifyRemoteRequest.setBucketId(0);
notifyRemoteRequest.setCoordinatorEpoch(1);
notifyRemoteRequest.setRemoteStartOffset(0L);
notifyRemoteRequest.setRemoteEndOffset(100L);
Throwable thrown4 =
catchThrowable(
() ->
internalTabletGateway
.notifyRemoteLogOffsets(notifyRemoteRequest)
.get());
if (thrown4 != null) {
assertThat(thrown4).rootCause().isNotInstanceOf(AuthorizationException.class);
}

CommitRemoteLogManifestRequest commitManifestRequest = new CommitRemoteLogManifestRequest();
commitManifestRequest.setTableId(1L);
commitManifestRequest.setBucketId(0);
commitManifestRequest.setRemoteLogManifestPath("/path/to/manifest");
commitManifestRequest.setRemoteLogStartOffset(0L);
commitManifestRequest.setRemoteLogEndOffset(100L);
commitManifestRequest.setCoordinatorEpoch(1);
commitManifestRequest.setBucketLeaderEpoch(1);
Throwable thrown5 =
catchThrowable(
() ->
internalCoordinatorGateway
.commitRemoteLogManifest(commitManifestRequest)
.get());
if (thrown5 != null) {
assertThat(thrown5).rootCause().isNotInstanceOf(AuthorizationException.class);
}

LakeTieringHeartbeatRequest heartbeatRequest = new LakeTieringHeartbeatRequest();
Throwable thrown6 =
catchThrowable(
() ->
internalCoordinatorGateway
.lakeTieringHeartbeat(heartbeatRequest)
.get());
if (thrown6 != null) {
assertThat(thrown6).rootCause().isNotInstanceOf(AuthorizationException.class);
}

// Cleanup
rootAdmin.dropAcls(Collections.singletonList(AclBindingFilter.ANY)).all().get();
}

private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
import static org.apache.fluss.security.acl.OperationType.WRITE;
import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.getGoalByType;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.addTableOffsetsToResponse;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath;
Expand Down Expand Up @@ -787,6 +788,9 @@ public CompletableFuture<CommitKvSnapshotResponse> commitKvSnapshot(
@Override
public CompletableFuture<CommitRemoteLogManifestResponse> commitRemoteLogManifest(
CommitRemoteLogManifestRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), WRITE, Resource.cluster());
}
CompletableFuture<CommitRemoteLogManifestResponse> response = new CompletableFuture<>();
eventManagerSupplier
.get()
Expand Down Expand Up @@ -882,6 +886,9 @@ public CompletableFuture<CommitLakeTableSnapshotResponse> commitLakeTableSnapsho
@Override
public CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat(
LakeTieringHeartbeatRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), WRITE, Resource.cluster());
}
LakeTieringHeartbeatResponse heartbeatResponse = new LakeTieringHeartbeatResponse();
int currentCoordinatorEpoch = coordinatorEpochSupplier.get();
heartbeatResponse.setCoordinatorEpoch(currentCoordinatorEpoch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ public CompletableFuture<InitWriterResponse> initWriter(InitWriterRequest reques
@Override
public CompletableFuture<NotifyRemoteLogOffsetsResponse> notifyRemoteLogOffsets(
NotifyRemoteLogOffsetsRequest request) {
if (authorizer != null) {
authorizer.authorize(currentSession(), WRITE, Resource.cluster());
}
CompletableFuture<NotifyRemoteLogOffsetsResponse> response = new CompletableFuture<>();
replicaManager.notifyRemoteLogOffsets(
getNotifyRemoteLogOffsetsData(request), response::complete);
Expand Down
Loading