Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ public ComputeGroupException(String msg, FailedTypeEnum failedType) {
this.failedType = failedType;
}

public FailedTypeEnum getFailedType() {
return failedType;
}

public String toString() {
return msg + ", ComputeGroupException: " + failedType + ", you can" + helpMsg();
}
Expand Down
38 changes: 19 additions & 19 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -1403,11 +1403,11 @@ public ComputeGroup getComputeGroup() throws UserException {
}

/**
* Tries to choose an available cluster in the following order
* 1. Do nothing if a cluster has been chosen for current session. It may be
* chosen explicitly by `use @` command or setCloudCluster() or this method
* 2. Tries to choose a default cluster if current mysql user has been set any
* 3. Tries to choose an authorized cluster if all preceeding conditions failed
* Tries to choose an available cluster in the following order:
* 1. Get cluster from session variable (set by `use @` command or setCloudCluster())
* 2. Get cluster from cached variable (this.cloudCluster) if available (from previous policy selection)
* 3. Get cluster from user's default cluster property if set
* 4. Choose an authorized cluster by policy if all preceding conditions failed
*
* @param updateErr whether set the connect state to error if the returned cluster is null or empty
* @return non-empty cluster name if a cluster has been chosen otherwise null or empty string
Expand All @@ -1430,18 +1430,7 @@ public String getCloudCluster(boolean updateErr) throws ComputeGroupException {
return sessionCluster;
}

// 2 get cluster from user
String userPropCluster = getDefaultCloudClusterFromUser(true);
if (!StringUtils.isEmpty(userPropCluster)) {
choseWay = "user property";
if (LOG.isDebugEnabled()) {
LOG.debug("finally set context compute group name {} for user {} with chose way '{}'", userPropCluster,
getCurrentUserIdentity(), choseWay);
}
return userPropCluster;
}

// 3 get cluster from a cached variable in connect context
// 2 get cluster from a cached variable in connect context
// this value comes from a cluster selection policy
if (!Strings.isNullOrEmpty(this.cloudCluster)) {
choseWay = "user selection policy";
Expand All @@ -1452,6 +1441,18 @@ public String getCloudCluster(boolean updateErr) throws ComputeGroupException {
return cloudCluster;
}

// 3 get cluster from user
String userPropCluster = getDefaultCloudClusterFromUser(true);
if (!StringUtils.isEmpty(userPropCluster)) {
choseWay = "user property";
if (LOG.isDebugEnabled()) {
LOG.debug("finally set context compute group name {} for user {} with chose way '{}'", userPropCluster,
getCurrentUserIdentity(), choseWay);
}
this.cloudCluster = userPropCluster;
return userPropCluster;
}

String policyCluster = "";
CloudClusterResult cloudClusterTypeAndName = getCloudClusterByPolicy();
if (cloudClusterTypeAndName != null && !Strings.isNullOrEmpty(cloudClusterTypeAndName.clusterName)) {
Expand All @@ -1472,9 +1473,8 @@ public String getCloudCluster(boolean updateErr) throws ComputeGroupException {
getState().setError(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, exception.getMessage());
}
throw exception;
} else {
this.cloudCluster = policyCluster;
}
this.cloudCluster = policyCluster;

if (LOG.isDebugEnabled()) {
LOG.debug("finally set context compute group name {} for user {} with chose way '{}'", this.cloudCluster,
Expand Down
169 changes: 169 additions & 0 deletions fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,24 @@

package org.apache.doris.qe;

import org.apache.doris.analysis.ResourceTypeEnum;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.Pair;
import org.apache.doris.mysql.MysqlCapability;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Lists;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
Expand All @@ -51,6 +59,12 @@ public class ConnectContextTest {
private Auth auth;
@Mocked
private String qualifiedUser;
@Mocked
private CloudSystemInfoService cloudSystemInfoService;
@Mocked
private AccessControllerManager accessManager;
@Mocked
private Backend backend;

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -475,4 +489,159 @@ public void testInitCatalogAndDbNullString() {
// Expected behavior
}
}

@Test
public void testGetCloudCluster() throws Exception {
// Setup: enable cloud mode by setting cloud_unique_id
String originalCloudUniqueId = Config.cloud_unique_id;
try {
Config.cloud_unique_id = "test-cloud-id";

ConnectContext ctx = new ConnectContext();
ctx.setEnv(env);
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp("testUser", "%"));

// Test 1: Cluster from session variable (step 1)
// This tests: "Get cluster from session variable (set by `use @` command or setCloudCluster())"
ctx.setCloudCluster("session_cluster");
// Verify that setCloudCluster sets session variable
Assert.assertEquals("session_cluster", ctx.getSessionVariable().getCloudCluster());
new Expectations() {
{
env.getCurrentSystemInfo();
result = cloudSystemInfoService;
}
};
String cluster = ctx.getCloudCluster(false);
Assert.assertEquals("session_cluster", cluster);

// Test 2: Cluster from user default (step 2)
// This tests: "Get cluster from user's default cluster property if set"
ctx.setCloudCluster(null); // Clear session cluster
ctx.cloudCluster = null; // Clear cached cluster
new Expectations() {
{
env.getAuth();
result = auth;
auth.getDefaultCloudCluster("testUser");
result = "user_default_cluster";
env.getCurrentSystemInfo();
result = cloudSystemInfoService;
cloudSystemInfoService.getCloudClusterNames();
result = Lists.newArrayList("user_default_cluster", "other_cluster");
}
};
cluster = ctx.getCloudCluster(false);
Assert.assertEquals("user_default_cluster", cluster);

// Test 3: Cluster from this.cloudCluster cache (step 3)
// This tests: "Get cluster from cached variable (this.cloudCluster) if available"
ctx.setCloudCluster(null); // Clear session cluster
ctx.cloudCluster = "cached_cluster"; // Set cached cluster (from previous policy selection)
new Expectations() {
{
env.getAuth();
result = auth;
auth.getDefaultCloudCluster("testUser");
result = null; // No user default
env.getCurrentSystemInfo();
result = cloudSystemInfoService;
}
};
cluster = ctx.getCloudCluster(false);
Assert.assertEquals("cached_cluster", cluster);

// Test 4: Cluster from policy (step 4)
// This tests: "Choose an authorized cluster by policy if all preceding conditions failed"
ctx.setCloudCluster(null); // Clear session cluster
ctx.cloudCluster = null; // Clear cached cluster
new Expectations() {
{
env.getAuth();
result = auth;
auth.getDefaultCloudCluster("testUser");
result = null; // No user default
env.getCurrentSystemInfo();
result = cloudSystemInfoService;
cloudSystemInfoService.getCloudClusterNames();
result = Lists.newArrayList("policy_cluster1", "policy_cluster2");
env.getAccessManager();
result = accessManager;
accessManager.checkCloudPriv((UserIdentity) any, "policy_cluster2",
PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER);
result = true;
cloudSystemInfoService.isStandByComputeGroup("policy_cluster2");
result = false;
cloudSystemInfoService.getBackendsByClusterName("policy_cluster2");
result = Lists.newArrayList(backend);
backend.isAlive();
result = true;
}
};
cluster = ctx.getCloudCluster(false);
Assert.assertEquals("policy_cluster2", cluster);
// Verify cache is set for subsequent calls
Assert.assertEquals("policy_cluster2", ctx.cloudCluster);

// Test 5: Priority order - session variable takes precedence over this.cloudCluster
ctx.setCloudCluster("session_cluster2");
ctx.cloudCluster = "cached_cluster2"; // This should be ignored
new Expectations() {
{
env.getCurrentSystemInfo();
result = cloudSystemInfoService;
}
};
cluster = ctx.getCloudCluster(false);
Assert.assertEquals("session_cluster2", cluster); // Session variable wins

// Test 6: Priority order - user this.cloudCluster over default takes precedence
ctx.setCloudCluster(null); // Clear session cluster
ctx.cloudCluster = "cached_cluster3"; // This should be ignored
new Expectations() {
{
env.getAuth();
result = auth;
auth.getDefaultCloudCluster("testUser");
result = "user_default_cluster2";
env.getCurrentSystemInfo();
result = cloudSystemInfoService;
cloudSystemInfoService.getCloudClusterNames();
result = Lists.newArrayList("user_default_cluster2", "other_cluster");
}
};
cluster = ctx.getCloudCluster(false);
Assert.assertEquals("cached_cluster3", cluster); // User this.cloudCluster wins

// Test 7: No cluster available - should throw exception
ctx.setCloudCluster(null);
ctx.cloudCluster = null;
new Expectations() {
{
env.getAuth();
result = auth;
auth.getDefaultCloudCluster("testUser");
result = null;
env.getCurrentSystemInfo();
result = cloudSystemInfoService;
cloudSystemInfoService.getCloudClusterNames();
result = Lists.newArrayList("unauthorized_cluster");
env.getAccessManager();
result = accessManager;
accessManager.checkCloudPriv((UserIdentity) any, anyString,
PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER);
result = false; // No authorized cluster
}
};
try {
ctx.getCloudCluster(true);
Assert.fail("Expected ComputeGroupException");
} catch (ComputeGroupException e) {
Assert.assertEquals(ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_ANY_COMPUTE_GROUP,
e.getFailedType());
}
} finally {
Config.cloud_unique_id = originalCloudUniqueId;
}
}
}
Loading