Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
29c7b93
rough cascading compaction impl
capistrant Dec 19, 2025
ae21c82
Renaming refactor
capistrant Jan 15, 2026
71830fc
add support for more config knobs in the cascading reindex spec
capistrant Jan 15, 2026
1940267
add some testing
capistrant Jan 15, 2026
bdb514e
stop using forbidden apis
capistrant Jan 16, 2026
0f71da2
working on some test coverage
capistrant Jan 17, 2026
acad478
simplify search interval creation and enhance embedded test for casca…
capistrant Jan 20, 2026
d49fbf4
fixup checkstyle
capistrant Jan 20, 2026
5efb4bf
temporary fixup to test
capistrant Jan 20, 2026
5a87759
fix checkstyle
capistrant Jan 20, 2026
0850a8f
remove native runner for one compaction supervisor test due to native…
capistrant Jan 21, 2026
3667456
refactorings from self review
capistrant Jan 22, 2026
a280690
Fixup naming to prefer reindexing over compaction
capistrant Jan 22, 2026
3108b61
fix up a javadoc with up to date design spec
capistrant Jan 22, 2026
1f671e1
Fill in UT gaps for the composing provider
capistrant Jan 22, 2026
3be0da1
refactor test class for inline rule provider
capistrant Jan 22, 2026
79ff44b
Self review refactorings
capistrant Jan 22, 2026
420f3b2
Trying to transform cascadingreindexingtemplate to a compaction state…
capistrant Jan 22, 2026
bf2e02d
refactor the location of the reindexing filter rule optimizer
capistrant Jan 23, 2026
5b4f3d2
Refactor this idea of additivity and how it works for building configs
capistrant Jan 23, 2026
6f5ead7
Add a missing test class
capistrant Jan 23, 2026
6d1fc6e
fix checkstyle
capistrant Jan 23, 2026
799db27
Merge branch 'master' into reindexing-rule-providers-with-cascading-r…
capistrant Jan 23, 2026
6853b02
clean up a javadoc
capistrant Jan 23, 2026
2200467
trivial fixes
capistrant Jan 23, 2026
a0d68eb
Prevent an edge case for a negative period
capistrant Jan 23, 2026
bbb5bbd
fix a nasty bug opportunity
capistrant Jan 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,40 @@

package org.apache.druid.testing.embedded.compact;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.catalog.guice.CatalogClientModule;
import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.compact.CascadingReindexingTemplate;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.segment.metadata.DefaultIndexingStateFingerprintMapper;
import org.apache.druid.segment.metadata.IndexingStateCache;
import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper;
import org.apache.druid.server.compaction.InlineReindexingRuleProvider;
import org.apache.druid.server.compaction.ReindexingFilterRule;
import org.apache.druid.server.compaction.ReindexingGranularityRule;
import org.apache.druid.server.compaction.ReindexingIOConfigRule;
import org.apache.druid.server.compaction.ReindexingTuningConfigRule;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
Expand All @@ -51,11 +65,17 @@
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -268,6 +288,135 @@ public void test_compaction_withPersistLastCompactionStateFalse_storesOnlyFinger
verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint();
}

@Test
public void test_cascadingCompactionTemplate_multiplePeriodsApplyDifferentCompactionRules()
{
// We eventually want to run with parameterized test for both engines but right now using RANGE partitioning and filtering
// out all rows with native engine cant handle right now.
CompactionEngine compactionEngine = CompactionEngine.MSQ;
configureCompaction(compactionEngine);

DateTime now = DateTimes.nowUtc();

// Note that we are purposely creating events in intervals like this to make the test deterministic regardless of when it is run.
// The supervisor will use the current time as reference time to determine which rules apply to which segments so we take extra
// care to create segments that fall cleanly into the different rule periods that we are testing.
String freshEvents = generateEventsInInterval(
new Interval(now.minusHours(4), now),
4,
Duration.ofMinutes(30).toMillis()
);
String hourRuleEvents = generateEventsInInterval(
new Interval(now.minusDays(3), now.minusDays(2)),
5,
Duration.ofMinutes(90).toMillis()
);
String dayRuleEvents = generateEventsInInterval(
new Interval(now.minusDays(31), now.minusDays(14)),
7,
Duration.ofHours(25).toMillis()
);

String allData = freshEvents + "\n" + hourRuleEvents + "\n" + dayRuleEvents;

runIngestionAtGranularity(
"FIFTEEN_MINUTE",
allData
);
Assertions.assertEquals(16, getNumSegmentsWith(Granularities.FIFTEEN_MINUTE));

ReindexingGranularityRule hourRule = new ReindexingGranularityRule(
"hourRule",
"Compact to HOUR granularity for data older than 1 days",
Period.days(1),
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, false)
);
ReindexingGranularityRule dayRule = new ReindexingGranularityRule(
"dayRule",
"Compact to DAY granularity for data older than 7 days",
Period.days(7),
new UserCompactionTaskGranularityConfig(Granularities.DAY, null, false)
);

ReindexingTuningConfigRule tuningConfigRule = new ReindexingTuningConfigRule(
"tuningConfigRule",
"Use dimension range partitioning with max 1000 rows per segment",
Period.days(1),
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DimensionRangePartitionsSpec(1000, null, List.of("item"), false),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);

ReindexingFilterRule filterRule = new ReindexingFilterRule(
"filterRule",
"Drop rows where item is 'hat'",
Period.days(7),
new SelectorDimFilter("item", "hat", null)
);

InlineReindexingRuleProvider.Builder ruleProvider = InlineReindexingRuleProvider.builder()
.granularityRules(List.of(hourRule, dayRule))
.tuningConfigRules(List.of(tuningConfigRule))
.filterRules(List.of(filterRule));

if (compactionEngine == CompactionEngine.NATIVE) {
ruleProvider = ruleProvider.ioConfigRules(
List.of(new ReindexingIOConfigRule("dropExisting", null, Period.days(7), new UserCompactionTaskIOConfig(true)))
);
}

CascadingReindexingTemplate cascadingReindexingTemplate = new CascadingReindexingTemplate(
dataSource,
null,
null,
ruleProvider.build(),
compactionEngine,
null
);
runCompactionWithSpec(cascadingReindexingTemplate);
waitForAllCompactionTasksToFinish();

Assertions.assertEquals(4, getNumSegmentsWith(Granularities.FIFTEEN_MINUTE));
Assertions.assertEquals(5, getNumSegmentsWith(Granularities.HOUR));
Assertions.assertEquals(7, getNumSegmentsWith(Granularities.DAY));
verifyEventCountOlderThan(Period.days(7), "item", "hat", 0);
}

private String generateEventsInInterval(Interval interval, int numEvents, long spacingMillis)
{
List<String> events = new ArrayList<>();

for (int i = 1; i <= numEvents; i++) {
DateTime eventTime = interval.getStart().plus(spacingMillis * i);
if (eventTime.isAfter(interval.getEnd())) {
throw new IAE("Interval cannot fit [%d] events with spacing of [%d] millis", numEvents, spacingMillis);
}
String item = i % 2 == 0 ? "hat" : "shirt";
int metricValue = 100 + i * 5;
events.add(eventTime + "," + item + "," + metricValue);
}

return String.join("\n", events);
}

private void verifySegmentsHaveNullLastCompactionStateAndNonNullFingerprint()
{
overlord
Expand Down Expand Up @@ -374,4 +523,47 @@ public static List<CompactionEngine> getEngine()
{
return List.of(CompactionEngine.NATIVE, CompactionEngine.MSQ);
}

private void verifyEventCountOlderThan(Period period, String dimension, String value, int expectedCount)
{
DateTime now = DateTimes.nowUtc();
DateTime threshold = now.minus(period);

ClientSqlQuery query = new ClientSqlQuery(
StringUtils.format(
"SELECT COUNT(*) as cnt FROM \"%s\" WHERE %s = '%s' AND __time < MILLIS_TO_TIMESTAMP(%d)",
dataSource,
dimension,
value,
threshold.getMillis()
),
null,
false,
false,
false,
null,
null
);

final String resultAsJson = cluster.callApi().onAnyBroker(b -> b.submitSqlQuery(query));

List<Map<String, Object>> result = JacksonUtils.readValue(
new DefaultObjectMapper(),
resultAsJson.getBytes(StandardCharsets.UTF_8),
new TypeReference<>() {}
);

Assertions.assertEquals(1, result.size());
Assertions.assertEquals(
expectedCount,
result.get(0).get("cnt"),
StringUtils.format(
"Expected %d events where %s='%s' older than %s",
expectedCount,
dimension,
value,
period
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.indexing.compact.CascadingReindexingTemplate;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.scheduledbatch.ScheduledBatchSupervisorSpec;
Expand All @@ -47,7 +48,8 @@ public List<? extends Module> getJacksonModules()
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(CompactionSupervisorSpec.class, CompactionSupervisorSpec.TYPE),
new NamedType(ScheduledBatchSupervisorSpec.class, ScheduledBatchSupervisorSpec.TYPE)
new NamedType(ScheduledBatchSupervisorSpec.class, ScheduledBatchSupervisorSpec.TYPE),
new NamedType(CascadingReindexingTemplate.class, CascadingReindexingTemplate.TYPE)
)
);
}
Expand Down
Loading
Loading