Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,19 @@ public class ConfigOptions {
+ "If the value is `YEAR`, the partition format for "
+ "auto created is yyyy.");

public static final ConfigOption<String> TABLE_AUTO_PARTITION_TIME_FORMAT =
key("table.auto-partition.time-format")
.stringType()
.noDefaultValue()
.withDescription(
"The time format used for auto-created partition values. "
+ "If not set, the format is derived from `table.auto-partition.time-unit` "
+ "(e.g. `yyyyMMdd` for DAY, `yyyyMMddHH` for HOUR). "
+ "When set, this value overrides the format derived from the time unit, "
+ "while the partition granularity still follows `table.auto-partition.time-unit`. "
+ "A custom format must use zero-padded fields covering at least the unit's precision, "
+ "so partition values sort by time as strings (e.g. `yyyy-MM-dd` for DAY).");

public static final ConfigOption<String> TABLE_AUTO_PARTITION_TIMEZONE =
key("table.auto-partition.time-zone")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.TimeZone;

Expand All @@ -30,6 +32,7 @@ public class AutoPartitionStrategy {
private final boolean autoPartitionEnable;
private final String key;
private final AutoPartitionTimeUnit timeUnit;
@Nullable private final String timeFormat;
private final int numPreCreate;
private final int numToRetain;
private final TimeZone timeZone;
Expand All @@ -38,12 +41,14 @@ private AutoPartitionStrategy(
boolean autoPartitionEnable,
String key,
AutoPartitionTimeUnit autoPartitionTimeUnit,
@Nullable String timeFormat,
int numPreCreate,
int numToRetain,
TimeZone timeZone) {
this.autoPartitionEnable = autoPartitionEnable;
this.key = key;
this.timeUnit = autoPartitionTimeUnit;
this.timeFormat = timeFormat;
this.numPreCreate = numPreCreate;
this.numToRetain = numToRetain;
this.timeZone = timeZone;
Expand All @@ -58,6 +63,7 @@ public static AutoPartitionStrategy from(Configuration conf) {
conf.getBoolean(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED),
conf.getString(ConfigOptions.TABLE_AUTO_PARTITION_KEY),
conf.get(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT),
conf.getString(ConfigOptions.TABLE_AUTO_PARTITION_TIME_FORMAT),
conf.getInt(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE),
conf.getInt(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION),
TimeZone.getTimeZone(conf.getString(ConfigOptions.TABLE_AUTO_PARTITION_TIMEZONE)));
Expand All @@ -75,6 +81,11 @@ public AutoPartitionTimeUnit timeUnit() {
return timeUnit;
}

@Nullable
public String timeFormat() {
return timeFormat;
}

public int numPreCreate() {
return numPreCreate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.types.DataTypeRoot;

import javax.annotation.Nullable;

import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.apache.fluss.metadata.TablePath.detectInvalidName;
Expand Down Expand Up @@ -129,19 +132,24 @@ public static void validateAutoPartitionTime(
: partitionKeys.get(0);
String partitionTime = partitionSpec.getSpecMap().get(autoPartitionKey);
AutoPartitionTimeUnit timeUnit = autoPartitionStrategy.timeUnit();
if (partitionTime == null || !isValidPartitionTime(partitionTime, timeUnit)) {
String timeFormat = autoPartitionStrategy.timeFormat();
String resolvedFormat = resolveTimeFormat(timeUnit, timeFormat);
if (partitionTime == null || !isValidPartitionTime(partitionTime, resolvedFormat)) {
throw new InvalidPartitionException(
String.format(
"Partition value '%s' does not match the expected format '%s' "
+ "for auto-partition time unit '%s'.",
partitionTime, getPartitionTimeFormat(timeUnit), timeUnit));
partitionTime, resolvedFormat, timeUnit));
}
ZonedDateTime currentZonedDateTime =
ZonedDateTime.ofInstant(Instant.now(), autoPartitionStrategy.timeZone().toZoneId());
// Get the earliest partition time that needs to be retained.
String lastRetainPartitionTime =
generateAutoPartitionTime(
currentZonedDateTime, -autoPartitionStrategy.numToRetain(), timeUnit);
currentZonedDateTime,
-autoPartitionStrategy.numToRetain(),
timeUnit,
timeFormat);
if (lastRetainPartitionTime.compareTo(partitionTime) > 0) {
throw new InvalidPartitionException(
String.format(
Expand All @@ -152,55 +160,60 @@ public static void validateAutoPartitionTime(
}

/**
* Generate {@link ResolvedPartitionSpec} for auto partition in server. When we auto creating a
* partition, we need to first generate a {@link ResolvedPartitionSpec}.
* Generates a {@link ResolvedPartitionSpec} for an auto-created partition.
*
* <p>The value is the formatted time with the specified time unit.
* <p>The partition value is rendered with the default format for {@code timeUnit}; when {@code
* timeFormat} is non-null, it overrides that format.
*
* @param partitionKeys the partition keys
* @param current the current time
* @param offset the offset
* @param timeUnit the time unit
* @param current the base time
* @param offset the number of {@code timeUnit}s to advance from {@code current}
* @param timeUnit the time unit driving the partition granularity
* @param timeFormat the user-configured time format, or {@code null} to use the unit's default
* @return the resolved partition spec
*/
public static ResolvedPartitionSpec generateAutoPartition(
List<String> partitionKeys,
ZonedDateTime current,
int offset,
AutoPartitionTimeUnit timeUnit) {
String autoPartitionFieldSpec = generateAutoPartitionTime(current, offset, timeUnit);

AutoPartitionTimeUnit timeUnit,
@Nullable String timeFormat) {
String autoPartitionFieldSpec =
generateAutoPartitionTime(current, offset, timeUnit, timeFormat);
return ResolvedPartitionSpec.fromPartitionName(partitionKeys, autoPartitionFieldSpec);
}

public static String generateAutoPartitionTime(
ZonedDateTime current, int offset, AutoPartitionTimeUnit timeUnit) {
String autoPartitionFieldSpec;
ZonedDateTime current,
int offset,
AutoPartitionTimeUnit timeUnit,
@Nullable String timeFormat) {
String format = resolveTimeFormat(timeUnit, timeFormat);
switch (timeUnit) {
case YEAR:
autoPartitionFieldSpec = getFormattedTime(current.plusYears(offset), YEAR_FORMAT);
break;
return getFormattedTime(current.plusYears(offset), format);
case QUARTER:
autoPartitionFieldSpec =
getFormattedTime(current.plusMonths(offset * 3L), QUARTER_FORMAT);
break;
return getFormattedTime(current.plusMonths(offset * 3L), format);
case MONTH:
autoPartitionFieldSpec = getFormattedTime(current.plusMonths(offset), MONTH_FORMAT);
break;
return getFormattedTime(current.plusMonths(offset), format);
case DAY:
autoPartitionFieldSpec = getFormattedTime(current.plusDays(offset), DAY_FORMAT);
break;
return getFormattedTime(current.plusDays(offset), format);
case HOUR:
autoPartitionFieldSpec = getFormattedTime(current.plusHours(offset), HOUR_FORMAT);
break;
return getFormattedTime(current.plusHours(offset), format);
default:
throw new IllegalArgumentException("Unsupported time unit: " + timeUnit);
}
return autoPartitionFieldSpec;
}

/** Returns the time string format pattern for the given time unit. */
private static String getPartitionTimeFormat(AutoPartitionTimeUnit timeUnit) {
/**
* Returns given {@code timeFormat} when non-null, otherwise the time format for the given
* {@code timeUnit}.
*/
private static String resolveTimeFormat(
AutoPartitionTimeUnit timeUnit, @Nullable String timeFormat) {
if (timeFormat != null) {
return timeFormat;
}
switch (timeUnit) {
case YEAR:
return YEAR_FORMAT;
Expand All @@ -217,20 +230,18 @@ private static String getPartitionTimeFormat(AutoPartitionTimeUnit timeUnit) {
}
}

/**
* Returns true if the given time string matches the format expected for the given time unit.
*/
private static boolean isValidPartitionTime(String time, AutoPartitionTimeUnit timeUnit) {
/** Returns true if {@code time} can be parsed under the given date-time {@code format}. */
private static boolean isValidPartitionTime(String time, String format) {
try {
DateTimeFormatter.ofPattern(getPartitionTimeFormat(timeUnit)).parse(time);
DateTimeFormatter.ofPattern(format, Locale.ROOT).parse(time);
return true;
} catch (DateTimeParseException e) {
return false;
}
}

private static String getFormattedTime(ZonedDateTime zonedDateTime, String format) {
return DateTimeFormatter.ofPattern(format).format(zonedDateTime);
return DateTimeFormatter.ofPattern(format, Locale.ROOT).format(zonedDateTime);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,38 @@ void testGenerateAutoPartitionName(
Collections.singletonList("dt"),
zonedDateTime,
offsets[i],
autoPartitionTimeUnit);
autoPartitionTimeUnit,
null);
assertThat(resolvedPartitionSpec.getPartitionName()).isEqualTo(expected[i]);
}
}

@Test
void testGenerateAutoPartitionNameWithCustomTimeFormat() {
ZonedDateTime zonedDateTime =
ZonedDateTime.of(LocalDateTime.of(2024, 11, 11, 11, 0), ZoneId.of("UTC-8"));

assertThat(
generateAutoPartition(
Collections.singletonList("dt"),
zonedDateTime,
0,
AutoPartitionTimeUnit.DAY,
"yyyy-MM-dd")
.getPartitionName())
.isEqualTo("2024-11-11");

assertThat(
generateAutoPartition(
Collections.singletonList("dt"),
zonedDateTime,
1,
AutoPartitionTimeUnit.HOUR,
"yyyy-MM-dd-HH")
.getPartitionName())
.isEqualTo("2024-11-11-12");
}

@Test
void testString() {
Object value = BinaryString.fromString("Fluss");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,16 @@ private List<ResolvedPartitionSpec> partitionNamesToPreCreate(
currentInstant, autoPartitionStrategy.timeZone().toZoneId());

int partitionToPreCreate = autoPartitionStrategy.numPreCreate();
String timeFormat = autoPartitionStrategy.timeFormat();
List<ResolvedPartitionSpec> partitionsToCreate = new ArrayList<>();
for (int idx = 0; idx < partitionToPreCreate; idx++) {
ResolvedPartitionSpec partition =
generateAutoPartition(
partitionKeys, currentZonedDateTime, idx, autoPartitionTimeUnit);
partitionKeys,
currentZonedDateTime,
idx,
autoPartitionTimeUnit,
timeFormat);
// if the partition already exists, we don't need to create it, otherwise, create it
if (!currentPartitions.containsKey(partition.getPartitionName())) {
partitionsToCreate.add(partition);
Expand Down Expand Up @@ -451,7 +456,10 @@ private void dropPartitions(
// Get the earliest one partition time that need to retain.
String lastRetainPartitionTime =
generateAutoPartitionTime(
currentZonedDateTime, -numToRetain, autoPartitionStrategy.timeUnit());
currentZonedDateTime,
-numToRetain,
autoPartitionStrategy.timeUnit(),
autoPartitionStrategy.timeFormat());

// For partition table with a single partition key, for example dt(yyyyMMdd)
// assuming now is 20250508, and table.auto-partition.num-retention=2 then partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@

import javax.annotation.Nullable;

import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -478,6 +480,26 @@ private static void checkPartition(
+ "partition is enabled, please set table property '%s'.",
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT.key()));
}

String timeFormat = autoPartition.timeFormat();
if (timeFormat != null) {
if (timeFormat.trim().isEmpty()) {
throw new InvalidConfigException(
String.format(
"'%s' must not be empty.",
ConfigOptions.TABLE_AUTO_PARTITION_TIME_FORMAT.key()));
}
try {
DateTimeFormatter.ofPattern(timeFormat, Locale.ROOT);
} catch (IllegalArgumentException e) {
throw new InvalidConfigException(
String.format(
"Invalid time format '%s' for '%s': %s",
timeFormat,
ConfigOptions.TABLE_AUTO_PARTITION_TIME_FORMAT.key(),
e.getMessage()));
}
Comment on lines +484 to +501
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table.auto-partition.time-format is only validated for DateTimeFormatter pattern syntax. It can still generate partition values that are invalid for Fluss/ZooKeeper paths (e.g., /, ., spaces) or collide for a given time-unit (e.g., time-unit=HOUR with format yyyy-MM-dd produces identical values for different hours). This will lead to auto-partition creation failures or incorrect retention behavior at runtime. Consider validating at table creation that a sample formatted value passes the same partition-value rules (TablePath.detectInvalidName/validatePrefix) and that formatting differs between now and now + 1 <time-unit> (and ideally preserves lexicographic order).

Copilot uses AI. Check for mistakes.
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ public static List<String> getExpectAddedPartitions(
List<String> partitions = new ArrayList<>();
for (int i = 0; i < newPartitions; i++) {
partitions.add(
generateAutoPartition(partitionKeys, addDateTime, i, timeUnit)
generateAutoPartition(partitionKeys, addDateTime, i, timeUnit, null)
.getPartitionName());
}
return partitions;
Expand Down
Loading