Skip to content
6 changes: 2 additions & 4 deletions paimon-python/dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ dataclasses>=0.8; python_version < "3.7"
fastavro>=1.4,<2
fsspec>=2021.10,<2026; python_version<"3.8"
fsspec>=2023,<2026; python_version>="3.8"
ossfs>=2021.8; python_version<"3.8"
ossfs>=2023; python_version>="3.8"
# ossfs moved to extras_require (pip install pypaimon[oss])
packaging>=21,<26
pandas>=1.1,<2; python_version < "3.7"
pandas>=1.3,<3; python_version >= "3.7" and python_version < "3.9"
Expand All @@ -32,8 +31,7 @@ polars>=0.9,<1; python_version<"3.8"
polars>=1,<2; python_version>="3.8"
pyarrow>=6,<7; python_version < "3.8"
pyarrow>=16,<20; python_version >= "3.8"
pylance>=0.20,<1; python_version>="3.9"
pylance>=0.10,<1; python_version>="3.8" and python_version<"3.9"
# pylance moved to extras_require (pip install pypaimon[lance])
pyroaring
readerwriterlock>=1,<2
zstandard>=0.19,<1
Expand Down
34 changes: 19 additions & 15 deletions paimon-python/pypaimon/common/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import uuid
from abc import ABC, abstractmethod
from pathlib import Path
from typing import List, Optional
from typing import Dict, List, Optional

import pyarrow # noqa: F401
import pyarrow.fs as pafs
Expand All @@ -31,19 +31,19 @@ class FileIO(ABC):
"""
File IO interface to read and write files.
"""

@abstractmethod
def new_input_stream(self, path: str):
pass

@abstractmethod
def new_output_stream(self, path: str):
pass

@abstractmethod
def get_file_status(self, path: str):
pass

@abstractmethod
def list_status(self, path: str):
pass
Expand All @@ -52,18 +52,22 @@ def list_status(self, path: str):
def exists(self, path: str) -> bool:
pass

def exists_batch(self, paths: List[str]) -> Dict[str, bool]:
"""Check existence of multiple paths, returning {path: bool}."""
return {path: self.exists(path) for path in paths}

@abstractmethod
def delete(self, path: str, recursive: bool = False) -> bool:
pass

@abstractmethod
def mkdirs(self, path: str) -> bool:
pass

@abstractmethod
def rename(self, src: str, dst: str) -> bool:
pass

def delete_quietly(self, path: str):
logger = logging.getLogger(__name__)
if logger.isEnabledFor(logging.DEBUG):
Expand Down Expand Up @@ -115,7 +119,7 @@ def try_to_write_atomic(self, path: str, content: str) -> bool:
if self.exists(path):
if self.is_dir(path):
return False

temp_path = path + str(uuid.uuid4()) + ".tmp"
success = False
try:
Expand Down Expand Up @@ -143,7 +147,7 @@ def copy_file(self, source_path: str, target_path: str, overwrite: bool = False)

target_str = self.to_filesystem_path(target_path)
target_parent = Path(target_str).parent

if str(target_parent) and not self.exists(str(target_parent)):
self.mkdirs(str(target_parent))

Expand Down Expand Up @@ -191,8 +195,8 @@ def to_filesystem_path(self, path: str) -> str:
return path

def parse_location(self, location: str):
from urllib.parse import urlparse
import os
from urllib.parse import urlparse

uri = urlparse(location)
if not uri.scheme:
Expand Down Expand Up @@ -220,10 +224,10 @@ def write_lance(self, path: str, data, **kwargs):
def write_blob(self, path: str, data, **kwargs):
"""Write Blob format file."""
raise NotImplementedError("write_blob must be implemented by FileIO subclasses")

def close(self):
pass

@staticmethod
def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO':
"""
Expand All @@ -232,13 +236,13 @@ def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO':
- PyArrowFileIO for remote file systems (oss://, s3://, hdfs://, etc.)
"""
from urllib.parse import urlparse

uri = urlparse(path)
scheme = uri.scheme

if not scheme or scheme == "file":
from pypaimon.filesystem.local_file_io import LocalFileIO
return LocalFileIO(path, catalog_options)

from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
return PyArrowFileIO(path, catalog_options or Options({}))
50 changes: 43 additions & 7 deletions paimon-python/pypaimon/common/identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,49 @@ def create(cls, database: str, object: str) -> "Identifier":

@classmethod
def from_string(cls, full_name: str) -> "Identifier":
parts = full_name.split(".")
if len(parts) == 2:
return cls(parts[0], parts[1])
elif len(parts) == 3:
return cls(parts[0], parts[1], parts[2])
else:
raise ValueError("Invalid identifier format: {}".format(full_name))
"""Parse a 'database.object' identifier, with optional backtick quoting."""
if not full_name or not full_name.strip():
raise ValueError("fullName cannot be null or empty")

# Check if backticks are used - if so, parse with backtick support
if '`' in full_name:
return cls._parse_with_backticks(full_name)

# Otherwise, use Java-compatible split on first period only
parts = full_name.split(".", 1)

if len(parts) != 2:
raise ValueError(
f"Cannot get splits from '{full_name}' to get database and object"
)

return cls(parts[0], parts[1])

@classmethod
def _parse_with_backticks(cls, full_name: str) -> "Identifier":
parts = []
current = ""
in_backticks = False

for char in full_name:
if char == '`':
in_backticks = not in_backticks
elif char == '.' and not in_backticks:
parts.append(current)
current = ""
else:
current += char

if current:
parts.append(current)

if in_backticks:
raise ValueError(f"Unclosed backtick in identifier: {full_name}")

if len(parts) != 2:
raise ValueError(f"Invalid identifier format: {full_name}")

return cls(parts[0], parts[1])

def get_full_name(self) -> str:
if self.branch:
Expand Down
6 changes: 3 additions & 3 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
# limitations under the License.
################################################################################
import sys
from datetime import timedelta
from enum import Enum
from typing import Dict, Optional

from datetime import timedelta

from pypaimon.common.memory_size import MemorySize
from pypaimon.common.options import Options
from pypaimon.common.options.config_options import ConfigOptions
from pypaimon.common.options.config_option import ConfigOption
from pypaimon.common.options.config_options import ConfigOptions


class ExternalPathStrategy(str, Enum):
Expand Down Expand Up @@ -261,6 +260,7 @@ class CoreOptions:
.with_description("Specify the merge engine for table with primary key. "
"Options: deduplicate, partial-update, aggregation, first-row.")
)

# Commit options
COMMIT_USER_PREFIX: ConfigOption[str] = (
ConfigOptions.key("commit.user-prefix")
Expand Down
52 changes: 34 additions & 18 deletions paimon-python/pypaimon/filesystem/pyarrow_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions, S3Options
from pypaimon.common.uri_reader import UriReaderFactory
from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser
from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
from pypaimon.schema.data_types import (AtomicType, DataField,
PyarrowFieldParser)
from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.row_kind import RowKind
from pypaimon.write.blob_format_writer import BlobFormatWriter
Expand Down Expand Up @@ -189,7 +190,7 @@ def new_output_stream(self, path: str):
parent_dir = '/'.join(path_str.split('/')[:-1])
else:
parent_dir = ''

if parent_dir and not self.exists(parent_dir):
self.mkdirs(parent_dir)
else:
Expand All @@ -214,10 +215,10 @@ def _get_file_info(self, path_str: str):
def get_file_status(self, path: str):
path_str = self.to_filesystem_path(path)
file_info = self._get_file_info(path_str)

if file_info.type == pafs.FileType.NotFound:
raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist")

return file_info

def list_status(self, path: str):
Expand All @@ -233,13 +234,25 @@ def exists(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
return self._get_file_info(path_str).type != pafs.FileType.NotFound

def exists_batch(self, paths: List[str]) -> Dict[str, bool]:
"""Check existence of multiple paths in a single batched API call."""
if not paths:
return {}

path_strs = [self.to_filesystem_path(p) for p in paths]
file_infos = self.filesystem.get_file_info(path_strs)
return {
paths[i]: info.type != pyarrow.fs.FileType.NotFound
for i, info in enumerate(file_infos)
}

def delete(self, path: str, recursive: bool = False) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self._get_file_info(path_str)

if file_info.type == pafs.FileType.NotFound:
return False

if file_info.type == pafs.FileType.Directory:
if not recursive:
selector = pafs.FileSelector(path_str, recursive=False, allow_not_found=True)
Expand All @@ -258,15 +271,15 @@ def delete(self, path: str, recursive: bool = False) -> bool:
def mkdirs(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self._get_file_info(path_str)

if file_info.type == pafs.FileType.NotFound:
self.filesystem.create_dir(path_str, recursive=True)
return True
if file_info.type == pafs.FileType.Directory:
return True
elif file_info.type == pafs.FileType.File:
raise FileExistsError(f"Path exists but is not a directory: {path}")

self.filesystem.create_dir(path_str, recursive=True)
return True

Expand All @@ -275,13 +288,13 @@ def rename(self, src: str, dst: str) -> bool:
dst_parent = Path(dst_str).parent
if str(dst_parent) and not self.exists(str(dst_parent)):
self.mkdirs(str(dst_parent))

src_str = self.to_filesystem_path(src)

try:
if hasattr(self.filesystem, 'rename'):
return self.filesystem.rename(src_str, dst_str)

dst_file_info = self._get_file_info(dst_str)
if dst_file_info.type != pafs.FileType.NotFound:
if dst_file_info.type == pafs.FileType.File:
Expand All @@ -293,7 +306,7 @@ def rename(self, src: str, dst: str) -> bool:
final_dst_info = self._get_file_info(dst_str)
if final_dst_info.type != pafs.FileType.NotFound:
return False

self.filesystem.move(src_str, dst_str)
return True
except FileNotFoundError:
Expand Down Expand Up @@ -331,7 +344,7 @@ def try_to_write_atomic(self, path: str, content: str) -> bool:
file_info = self._get_file_info(path_str)
if file_info.type == pafs.FileType.Directory:
return False

temp_path = path + str(uuid.uuid4()) + ".tmp"
success = False
try:
Expand All @@ -349,7 +362,7 @@ def copy_file(self, source_path: str, target_path: str, overwrite: bool = False)
source_str = self.to_filesystem_path(source_path)
target_str = self.to_filesystem_path(target_path)
target_parent = Path(target_str).parent

if str(target_parent) and not self.exists(str(target_parent)):
self.mkdirs(str(target_parent))

Expand All @@ -373,13 +386,14 @@ def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd',
zstd_level: int = 1, **kwargs):
try:
"""Write ORC file using PyArrow ORC writer.

Note: PyArrow's ORC writer doesn't support compression_level parameter.
ORC files will use zstd compression with default level
(which is 3, see https://github.com/facebook/zstd/blob/dev/programs/zstdcli.c)
instead of the specified level.
"""
import sys

import pyarrow.orc as orc

with self.new_output_stream(path) as output_stream:
Expand Down Expand Up @@ -432,7 +446,7 @@ def record_generator():
'zstd': 'zstandard', # zstd is commonly used in Paimon
}
compression_lower = compression.lower()

codec = codec_map.get(compression_lower)
if codec is None:
raise ValueError(
Expand All @@ -448,6 +462,7 @@ def record_generator():
def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
try:
import lance

from pypaimon.read.reader.lance_utils import to_lance_specified
file_path_for_lance, storage_options = to_lance_specified(self, path)

Expand Down Expand Up @@ -514,9 +529,10 @@ def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
raise RuntimeError(f"Failed to write blob file {path}: {e}") from e

def to_filesystem_path(self, path: str) -> str:
from pyarrow.fs import S3FileSystem
import re

from pyarrow.fs import S3FileSystem

parsed = urlparse(path)
normalized_path = re.sub(r'/+', '/', parsed.path) if parsed.path else ''

Expand Down
Loading
Loading