Skip to content

Commit 56a3cce

Browse files
committed
Add embedded test
1 parent a559e94 commit 56a3cce

1 file changed

Lines changed: 187 additions & 0 deletions

File tree

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.server;
21+
22+
import com.google.common.collect.ImmutableSet;
23+
import org.apache.druid.audit.AuditInfo;
24+
import org.apache.druid.common.config.JacksonConfigManager;
25+
import org.apache.druid.common.utils.IdUtils;
26+
import org.apache.druid.indexing.common.task.TaskBuilder;
27+
import org.apache.druid.server.QueryBlocklistRule;
28+
import org.apache.druid.server.broker.BrokerDynamicConfig;
29+
import org.apache.druid.server.http.BrokerDynamicConfigSyncer;
30+
import org.apache.druid.testing.embedded.EmbeddedBroker;
31+
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
32+
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
33+
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
34+
import org.apache.druid.testing.embedded.EmbeddedHistorical;
35+
import org.apache.druid.testing.embedded.EmbeddedIndexer;
36+
import org.apache.druid.testing.embedded.EmbeddedOverlord;
37+
import org.apache.druid.testing.embedded.indexing.Resources;
38+
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
39+
import org.junit.jupiter.api.Assertions;
40+
import org.junit.jupiter.api.BeforeAll;
41+
import org.junit.jupiter.api.BeforeEach;
42+
import org.junit.jupiter.api.Test;
43+
import org.junit.jupiter.api.Timeout;
44+
45+
import java.util.List;
46+
import java.util.Map;
47+
48+
/**
49+
* Integration test for broker dynamic configuration
50+
*/
51+
public class EmbeddedBrokerDynamicConfigTest extends EmbeddedClusterTestBase
52+
{
53+
// Fixed datasource ingested once for all tests; restored before each test since the
54+
// base class @BeforeEach would otherwise assign a fresh name.
55+
private String fixedDataSource;
56+
57+
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
58+
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
59+
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
60+
private final EmbeddedHistorical historical = new EmbeddedHistorical();
61+
private final EmbeddedBroker broker = new EmbeddedBroker();
62+
63+
@Override
64+
protected EmbeddedDruidCluster createCluster()
65+
{
66+
indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
67+
68+
return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
69+
.useLatchableEmitter()
70+
.addServer(overlord)
71+
.addServer(coordinator)
72+
.addServer(indexer)
73+
.addServer(historical)
74+
.addServer(broker);
75+
}
76+
77+
@BeforeAll
78+
@Override
79+
public void setup() throws Exception
80+
{
81+
fixedDataSource = EmbeddedClusterApis.createTestDatasourceName();
82+
dataSource = fixedDataSource;
83+
super.setup();
84+
ingestData();
85+
cluster.callApi().waitForAllSegmentsToBeAvailable(fixedDataSource, coordinator, broker);
86+
}
87+
88+
@BeforeEach
89+
@Override
90+
protected void refreshDatasourceName()
91+
{
92+
dataSource = fixedDataSource;
93+
}
94+
95+
@Test
96+
@Timeout(30)
97+
public void testQueryBlocklistBlocksMatchingQueries()
98+
{
99+
// Baseline: query succeeds before blocklist is applied
100+
String initialResult = cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource);
101+
Assertions.assertFalse(initialResult.isBlank());
102+
103+
// Apply blocklist rule that matches all queries on this datasource
104+
QueryBlocklistRule blockRule = new QueryBlocklistRule(
105+
"block-test-datasource",
106+
ImmutableSet.of(dataSource),
107+
null,
108+
null
109+
);
110+
updateBrokerDynamicConfig(
111+
BrokerDynamicConfig.builder()
112+
.withQueryBlocklist(List.of(blockRule))
113+
.build()
114+
);
115+
116+
// Query should now throw due to FORBIDDEN blocklist rule
117+
Assertions.assertThrows(
118+
RuntimeException.class,
119+
() -> cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource)
120+
);
121+
122+
// Clear the blocklist and verify queries resume
123+
updateBrokerDynamicConfig(BrokerDynamicConfig.builder().build());
124+
String finalResult = cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource);
125+
Assertions.assertFalse(finalResult.isBlank());
126+
}
127+
128+
@Test
129+
@Timeout(30)
130+
public void testDynamicQueryContext_timeoutCausesQueryToFail()
131+
{
132+
// Baseline: query succeeds before a timeout context is applied
133+
String initialResult = cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource);
134+
Assertions.assertFalse(initialResult.isBlank());
135+
136+
// Apply a 1ms timeout via dynamic query context — any real query will expire before responding
137+
updateBrokerDynamicConfig(
138+
BrokerDynamicConfig.builder()
139+
.withQueryContext(Map.<String, Object>of("timeout", 1))
140+
.build()
141+
);
142+
143+
// Query should now throw due to the timeout being exceeded
144+
Assertions.assertThrows(
145+
RuntimeException.class,
146+
() -> cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource)
147+
);
148+
149+
// Clear the dynamic context and verify queries resume
150+
updateBrokerDynamicConfig(BrokerDynamicConfig.builder().build());
151+
String finalResult = cluster.callApi().runSql("SELECT COUNT(*) FROM %s", dataSource);
152+
Assertions.assertFalse(finalResult.isBlank());
153+
}
154+
155+
private void ingestData()
156+
{
157+
cluster.callApi().runTask(
158+
TaskBuilder.ofTypeIndex()
159+
.dataSource(dataSource)
160+
.isoTimestampColumn("time")
161+
.csvInputFormatWithColumns("time", "item", "value")
162+
.inlineInputSourceWithData(Resources.InlineData.CSV_10_DAYS)
163+
.segmentGranularity("DAY")
164+
.dimensions()
165+
.withId(IdUtils.getRandomId()),
166+
overlord
167+
);
168+
}
169+
170+
/**
171+
* Updates the broker dynamic config on the coordinator and synchronously broadcasts it
172+
* to all brokers.
173+
*
174+
* Uses {@link JacksonConfigManager} directly to avoid the HTTP endpoint's builder-merge
175+
* semantics, which cannot distinguish "clear to empty" from "not specified" when fields
176+
* are omitted via {@code @JsonInclude(NON_EMPTY)}.
177+
*/
178+
private void updateBrokerDynamicConfig(BrokerDynamicConfig config)
179+
{
180+
coordinator.bindings()
181+
.getInstance(JacksonConfigManager.class)
182+
.set(BrokerDynamicConfig.CONFIG_KEY, config, new AuditInfo("test", "test@test.com", "Testing", "127.0.0.1"));
183+
coordinator.bindings()
184+
.getInstance(BrokerDynamicConfigSyncer.class)
185+
.broadcastConfigToBrokers();
186+
}
187+
}

0 commit comments

Comments
 (0)