Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ public static TFileType getTFileTypeForBE(String location) {
}

public static String getTempWritePath(String loc, String prefix) {
// If prefix is relative, it is resolved under loc; if absolute, it is used as the base path.
Path tempRoot = new Path(loc, prefix);
Path tempPath = new Path(tempRoot, UUID.randomUUID().toString().replace("-", ""));
return tempPath.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class HMSExternalCatalog extends ExternalCatalog {

public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second";
public static final String PARTITION_CACHE_TTL_SECOND = "partition.cache.ttl-second";
public static final String HIVE_STAGING_DIR = "hive.staging_dir";
// broker name for file split and query scan.
public static final String BIND_BROKER_NAME = "broker.name";
// Default is false, if set to true, will get table schema from "remoteTable" instead of from hive metastore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.thrift.TUpdateMode;
import org.apache.doris.transaction.Transaction;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
Expand All @@ -65,6 +66,7 @@
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedPart;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -1276,27 +1278,41 @@ public void prepareAlterTable(NameMapping nameMapping, TableAndMore tableAndMore
String targetPath = table.getSd().getLocation();
String writePath = tableAndMore.getCurrentLocation();
if (!targetPath.equals(writePath)) {
Path path = new Path(targetPath);
String oldTablePath = new Path(
path.getParent(), "_temp_" + queryId + "_" + path.getName()).toString();
Status status = wrapperRenameDirWithProfileSummary(
targetPath,
oldTablePath,
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg());
}
clearDirsForFinish.add(oldTablePath);
if (isSubDirectory(targetPath, writePath)) {
String stagingRoot = getImmediateChildPath(targetPath, writePath);
deleteTargetPathContents(targetPath, stagingRoot);
ensureDirectory(targetPath);
wrapperAsyncRenameWithProfileSummary(
fileSystemExecutor,
asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
writePath,
targetPath,
tableAndMore.getFileNames());
} else {
Path path = new Path(targetPath);
String oldTablePath = new Path(
path.getParent(), "_temp_" + queryId + "_" + path.getName()).toString();
Status status = wrapperRenameDirWithProfileSummary(
targetPath,
oldTablePath,
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg());
}
clearDirsForFinish.add(oldTablePath);

status = wrapperRenameDirWithProfileSummary(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(
new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
status = wrapperRenameDirWithProfileSummary(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(
new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath
+ ":" + status.getErrMsg());
}
}
} else {
if (!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
Expand Down Expand Up @@ -1620,6 +1636,132 @@ public void shutdownExecutorService() {
}
}

@VisibleForTesting
static boolean isSubDirectory(String parent, String child) {
if (parent == null || child == null) {
return false;
}
Path parentPath = new Path(parent);
Path childPath = new Path(child);
URI parentUri = parentPath.toUri();
URI childUri = childPath.toUri();
if (!sameFileSystem(parentUri, childUri)) {
return false;
}
String parentPathValue = normalizePath(parentUri.getPath());
String childPathValue = normalizePath(childUri.getPath());
if (parentPathValue.isEmpty() || childPathValue.isEmpty()) {
return false;
}
return !parentPathValue.equals(childPathValue)
&& childPathValue.startsWith(parentPathValue + "/");
}

/**
* Returns the first-level child path of {@code parent} that contains {@code child},
* or null if {@code child} is not a subdirectory of {@code parent}.
* Example: parent=/warehouse/table, child=/warehouse/table/.doris_staging/user/uuid
* returns /warehouse/table/.doris_staging.
*/
@VisibleForTesting
static String getImmediateChildPath(String parent, String child) {
if (!isSubDirectory(parent, child)) {
return null;
}
Path parentPath = new Path(parent);
URI parentUri = parentPath.toUri();
URI childUri = new Path(child).toUri();
String parentPathValue = normalizePath(parentUri.getPath());
String childPathValue = normalizePath(childUri.getPath());
String relative = childPathValue.substring(parentPathValue.length() + 1);
int slashIndex = relative.indexOf("/");
String firstComponent = slashIndex == -1 ? relative : relative.substring(0, slashIndex);
return new Path(parentPath, firstComponent).toString();
}

private static boolean sameFileSystem(URI left, URI right) {
String leftScheme = normalizeUriPart(left.getScheme());
String rightScheme = normalizeUriPart(right.getScheme());
if (!leftScheme.isEmpty() && !rightScheme.isEmpty()
&& !leftScheme.equalsIgnoreCase(rightScheme)) {
return false;
}
String leftAuthority = normalizeUriPart(left.getAuthority());
String rightAuthority = normalizeUriPart(right.getAuthority());
if (!leftAuthority.isEmpty() && !rightAuthority.isEmpty()
&& !leftAuthority.equalsIgnoreCase(rightAuthority)) {
return false;
}
return true;
}

private static String normalizeUriPart(String value) {
return value == null ? "" : value;
}

private static String normalizePath(String path) {
if (path == null || path.isEmpty()) {
return "";
}
int end = path.length();
while (end > 1 && path.charAt(end - 1) == '/') {
end--;
}
return path.substring(0, end);
}

private static boolean pathsEqual(String left, String right) {
if (left == null || right == null) {
return left == null && right == null;
}
URI leftUri = new Path(left).toUri();
URI rightUri = new Path(right).toUri();
if (!sameFileSystem(leftUri, rightUri)) {
return false;
}
return normalizePath(leftUri.getPath()).equals(normalizePath(rightUri.getPath()));
}

@VisibleForTesting
void deleteTargetPathContents(String targetPath, String excludedChildPath) {
Set<String> dirs = new HashSet<>();
Status status = fs.listDirectories(targetPath, dirs);
if (!status.ok() && !Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
throw new RuntimeException(
"Failed to list directories under " + targetPath + ":" + status.getErrMsg());
}
for (String dir : dirs) {
if (excludedChildPath != null && pathsEqual(dir, excludedChildPath)) {
continue;
}
Status deleteStatus = wrapperDeleteDirWithProfileSummary(dir);
if (!deleteStatus.ok() && !Status.ErrCode.NOT_FOUND.equals(deleteStatus.getErrCode())) {
throw new RuntimeException("Failed to delete directory " + dir + ":" + deleteStatus.getErrMsg());
}
}

List<RemoteFile> files = new ArrayList<>();
status = fs.listFiles(targetPath, false, files);
if (!status.ok() && !Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
throw new RuntimeException(
"Failed to list files under " + targetPath + ":" + status.getErrMsg());
}
for (RemoteFile file : files) {
Status deleteStatus = wrapperDeleteWithProfileSummary(file.getPath().toString());
if (!deleteStatus.ok() && !Status.ErrCode.NOT_FOUND.equals(deleteStatus.getErrCode())) {
throw new RuntimeException("Failed to delete file " + file.getPath() + ":" + deleteStatus.getErrMsg());
}
}
}

@VisibleForTesting
void ensureDirectory(String path) {
Status status = fs.makeDir(path);
if (!status.ok()) {
throw new RuntimeException("Failed to create directory " + path + ":" + status.getErrMsg());
}
}

public Status wrapperRenameDirWithProfileSummary(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.doris.thrift.THiveTableSink;

import com.google.common.base.Strings;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;

Expand Down Expand Up @@ -168,7 +169,14 @@ public void bindDataSink(Optional<InsertCommandContext> insertCtx)

private String createTempPath(String location) {
String user = ConnectContext.get().getCurrentUserIdentity().getUser();
return LocationPath.getTempWritePath(location, "/tmp/.doris_staging/" + user);
String defaultStagingBaseDir = ".doris_staging";
String stagingBaseDir = targetTable.getCatalog().getCatalogProperty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can user specify this stage dir arbitrarily? Is it safe?
What if user specify dir like /etc or /?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should only support /tmp/.staging or relative dir?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other systems also use this facility.

.getOrDefault(HMSExternalCatalog.HIVE_STAGING_DIR, defaultStagingBaseDir);
if (Strings.isNullOrEmpty(stagingBaseDir)) {
stagingBaseDir = defaultStagingBaseDir;
}
String stagingDir = new Path(stagingBaseDir, user).toString();
return LocationPath.getTempWritePath(location, stagingDir);
}

private void setCompressType(THiveTableSink tSink, TFileFormatType formatType) {
Expand Down
Loading
Loading