Skip to content

Commit d28c226

Browse files
authored
Implement one-batch lookahead for index enumerators (opensearch-project#4345)
1 parent 6f7eae0 commit d28c226

File tree

20 files changed

+453
-75
lines changed

20 files changed

+453
-75
lines changed

async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
1313
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
1414
import org.opensearch.plugins.Plugin;
15-
import org.opensearch.sql.legacy.executor.AsyncRestExecutor;
1615
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
1716
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
1817
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
@@ -21,6 +20,8 @@
2120
import org.opensearch.threadpool.ThreadPool;
2221
import org.opensearch.transport.client.Client;
2322

23+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
24+
2425
/**
2526
* The job runner class for scheduling async query.
2627
*
@@ -37,7 +38,7 @@
3738
public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner {
3839
// Share SQL plugin thread pool
3940
private static final String ASYNC_QUERY_THREAD_POOL_NAME =
40-
AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME;
41+
SQL_WORKER_THREAD_POOL_NAME;
4142
private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class);
4243

4344
private static final ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner();

async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import static org.mockito.Mockito.spy;
1616
import static org.mockito.Mockito.verify;
1717
import static org.mockito.Mockito.when;
18+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
1819

1920
import java.time.Instant;
2021
import org.apache.logging.log4j.LogManager;
@@ -87,7 +88,7 @@ public void testRunJobWithCorrectParameter() {
8788
spyJobRunner.runJob(request, context);
8889

8990
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
90-
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
91+
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
9192
.submit(captor.capture());
9293

9394
Runnable runnable = captor.getValue();
@@ -145,7 +146,7 @@ public void testDoRefreshThrowsException() {
145146
spyJobRunner.runJob(request, context);
146147

147148
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
148-
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
149+
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
149150
.submit(captor.capture());
150151

151152
Runnable runnable = captor.getValue();

datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,11 @@
1111
import org.opensearch.common.unit.TimeValue;
1212
import org.opensearch.threadpool.ThreadPool;
1313
import org.opensearch.transport.client.node.NodeClient;
14+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
1415

1516
/** The scheduler which schedule the task run in sql-worker thread pool. */
1617
@UtilityClass
1718
public class Scheduler {
18-
19-
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
20-
2119
public static void schedule(NodeClient client, Runnable task) {
2220
ThreadPool threadPool = client.threadPool();
2321
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);

docs/user/admin/settings.rst

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,27 @@ Result set::
253253
"transient": {}
254254
}
255255

256+
Thread Pool Settings
257+
====================
258+
259+
The SQL plugin is integrated with the `OpenSearch Thread Pool Settings <https://docs.opensearch.org/latest/install-and-configure/configuring-opensearch/thread-pool-settings/>`_.
260+
There are two thread pools which can be configured on cluster setup via `settings.yml`::
261+
262+
thread_pool:
263+
sql-worker:
264+
size: 30
265+
queue_size: 100
266+
sql_background_io:
267+
size: 30
268+
queue_size: 1000
269+
270+
The ``sql-worker`` pool corresponds to compute resources related to running queries, such as compute-heavy evaluations on result sets.
271+
This directly maps to the number of queries that can be run concurrently.
272+
This is the primary pool you interact with externally.
273+
``sql_background_io`` is a low-footprint pool for IO requests the plugin makes,
274+
and can be used to limit indirect load that SQL places on your cluster for Calcite-enabled operations.
275+
A ``sql-worker`` thread may spawn multiple background threads.
276+
256277
plugins.query.executionengine.spark.session.limit
257278
==================================================
258279

legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.legacy.executor;
77

8+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
9+
810
import java.io.IOException;
911
import java.time.Duration;
1012
import java.util.Map;
@@ -30,10 +32,6 @@
3032

3133
/** A RestExecutor wrapper to execute request asynchronously to avoid blocking transport thread. */
3234
public class AsyncRestExecutor implements RestExecutor {
33-
34-
/** Custom thread pool name managed by OpenSearch */
35-
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
36-
3735
private static final Logger LOG = LogManager.getLogger(AsyncRestExecutor.class);
3836

3937
/**

legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.legacy.executor.cursor;
77

8+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
9+
810
import java.io.IOException;
911
import java.time.Duration;
1012
import java.util.Map;
@@ -24,9 +26,6 @@
2426
import org.opensearch.transport.client.Client;
2527

2628
public class CursorAsyncRestExecutor {
27-
/** Custom thread pool name managed by OpenSearch */
28-
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
29-
3029
private static final Logger LOG = LogManager.getLogger(CursorAsyncRestExecutor.class);
3130

3231
/** Delegated rest executor to async */

legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.sql.legacy.plugin;
77

88
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
9+
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
910

1011
import com.google.common.collect.ImmutableList;
1112
import java.util.Arrays;
@@ -90,7 +91,7 @@ protected Set<String> responseParams() {
9091

9192
private void schedule(NodeClient client, Runnable task) {
9293
ThreadPool threadPool = client.threadPool();
93-
threadPool.schedule(withCurrentContext(task), new TimeValue(0), "sql-worker");
94+
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
9495
}
9596

9697
private Runnable withCurrentContext(final Runnable task) {

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.util.List;
99
import java.util.Map;
10+
import java.util.Optional;
1011
import org.opensearch.action.search.CreatePitRequest;
1112
import org.opensearch.action.search.DeletePitRequest;
1213
import org.opensearch.sql.opensearch.mapping.IndexMapping;
@@ -97,7 +98,7 @@ public interface OpenSearchClient {
9798
*/
9899
void schedule(Runnable task);
99100

100-
NodeClient getNodeClient();
101+
Optional<NodeClient> getNodeClient();
101102

102103
/**
103104
* Create PIT for given indices

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.Collection;
1212
import java.util.List;
1313
import java.util.Map;
14+
import java.util.Optional;
1415
import java.util.concurrent.ExecutionException;
1516
import java.util.function.Function;
1617
import java.util.function.Predicate;
@@ -223,8 +224,8 @@ public void schedule(Runnable task) {
223224
}
224225

225226
@Override
226-
public NodeClient getNodeClient() {
227-
return client;
227+
public Optional<NodeClient> getNodeClient() {
228+
return Optional.of(client);
228229
}
229230

230231
@Override

opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.HashMap;
1414
import java.util.List;
1515
import java.util.Map;
16+
import java.util.Optional;
1617
import java.util.stream.Collectors;
1718
import java.util.stream.Stream;
1819
import lombok.RequiredArgsConstructor;
@@ -236,8 +237,8 @@ public void schedule(Runnable task) {
236237
}
237238

238239
@Override
239-
public NodeClient getNodeClient() {
240-
throw new UnsupportedOperationException("Unsupported method.");
240+
public Optional<NodeClient> getNodeClient() {
241+
return Optional.empty();
241242
}
242243

243244
@Override

0 commit comments

Comments
 (0)