Replace DataCarrier with BatchQueue for metrics pipeline#13703
Open
Replace DataCarrier with BatchQueue for metrics pipeline#13703
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a new library-batch-queue module and migrates OAP server internal pipelines from DataCarrier to BatchQueue, aiming to reduce thread counts, unify OAL/MAL queues, and improve observability via named executor threads. It also updates OTel rules and Grafana/So11y dashboards to match the new queue metrics dimensions and removes the legacy library-datacarrier-queue module.
Changes:
- Add
library-batch-queue(partitioned, self-draining queue) and replaceDataCarrierusages across aggregation/persistence/TopN/exporters/remote client. - Name anonymous executor threads across the OAP server for easier thread-dump analysis.
- Update telemetry rules and dashboards for the new queue metrics tags/dimensions; remove
library-datacarrier-queueand related tests.
Reviewed changes
Copilot reviewed 98 out of 98 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCBatchDAO.java | Switch JDBC async batching from DataCarrier to BatchQueue. |
| oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/pom.xml | Replace datacarrier dependency with library-batch-queue. |
| oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml | Remove unused library-datacarrier-queue dependency. |
| oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json | Update So11y dashboard expressions/titles for unified queues and new labels. |
| oap-server/server-starter/src/main/resources/otel-rules/oap.yaml | Update OTel aggregation rule dimensions (remove kind, add slot). |
| oap-server/server-library/pom.xml | Remove library-datacarrier-queue, add library-batch-queue module. |
| oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/benchmark/StringFormatGroupBenchmark.java | Add benchmark-style test for StringFormatGroup. |
| oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/MultipleFilesChangeMonitor.java | Name the scheduled executor thread for file monitoring. |
| oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/partition/SimpleRollingPartitionerTest.java | Remove datacarrier tests (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/partition/ProducerThreadPartitionerTest.java | Remove datacarrier tests (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/SampleConsumer.java | Remove datacarrier tests (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerTest.java | Remove datacarrier tests (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPoolFactoryTest.java | Remove datacarrier tests (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumeDriverTest.java | Remove datacarrier tests (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/SampleData.java | Remove datacarrier tests (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/EnvUtilTest.java | Remove datacarrier tests (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierTest.java | Remove datacarrier tests (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/partition/SimpleRollingPartitioner.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/partition/ProducerThreadPartitioner.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/partition/IDataPartitioner.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/IDriver.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerThread.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPoolFactory.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPool.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumeDriver.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/common/AtomicRangeInteger.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/QueueBuffer.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/Channels.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/ArrayBlockingQueueBuffer.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/EnvUtil.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrier.java | Remove datacarrier implementation (module removal). |
| oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java | Name the health-check scheduled executor thread. |
| oap-server/server-library/library-batch-queue/src/test/resources/log4j2-test.xml | Add log4j2 test config for the new module tests. |
| oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicyTest.java | Add unit tests for ThreadPolicy. |
| oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java | Add unit tests for PartitionPolicy. |
| oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes.java | Add benchmark helper for dispatch testing. |
| oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueManagerTest.java | Add tests for queue registry/shared scheduler semantics. |
| oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfigTest.java | Add tests for config validation/defaults. |
| oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java | Introduce thread-count policy abstraction. |
| oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/QueueErrorHandler.java | Add error handler functional interface in batchqueue package. |
| oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionSelector.java | Add partition-selection strategy API. |
| oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java | Add partition-count policy abstraction (fixed/threadMultiply/adaptive). |
| oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/HandlerConsumer.java | Define handler consumer contract and idle hook. |
| oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BufferStrategy.java | Define buffer backpressure/drop strategies. |
| oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueStats.java | Add queue usage snapshot/top-N partition reporting. |
| oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueManager.java | Add global queue registry + shared scheduler ref-counting. |
| oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java | Add queue config builder + validation. |
| oap-server/server-library/library-batch-queue/pom.xml | Rename module artifact + add test dependencies. |
| oap-server/server-library/library-batch-queue/DESIGN.md | Document drain rebalancing design. |
| oap-server/server-library/library-batch-queue/CLAUDE.md | Add module-specific assistant documentation. |
| oap-server/server-library/library-banyandb-client/src/main/java/org/apache/skywalking/library/banyandb/v1/client/grpc/channel/ChannelManager.java | Name the BanyanDB channel manager scheduler thread. |
| oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java | Name the health checker scheduler thread. |
| oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java | Update tests for new GRPCRemoteClient ctor signature. |
| oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java | Update tests for new GRPCRemoteClient ctor signature. |
| oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/benchmark/RegexVSQuickMatchBenchmark.java | Add benchmark-style tests for endpoint grouping performance. |
| oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/benchmark/EndpointGrouping4OpenapiBenchmark.java | Add benchmark-style tests for OpenAPI grouping reader/formatter. |
| oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/benchmark/EBPFProfilingAnalyzerBenchmark.java | Add benchmark-style tests for EBPF profiling analyzer. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java | Name the watermark watcher scheduler thread. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java | Name the TTL keeper scheduler thread. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java | Name persistence timer threads (scheduler + prepare pool). |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java | Name remote client manager scheduler thread; update GRPCRemoteClient ctor usage. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java | Replace DataCarrier with per-peer BatchQueue. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java | Name EBPF fetch thread pool threads. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/hierarchy/HierarchyService.java | Name hierarchy auto-matching scheduler thread. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java | Simplify exporter service interface (remove start()). |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java | Name endpoint URI recognition scheduler thread. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java | Name cache update scheduler thread. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java | Switch TopN persistence to shared BatchQueue + per-type handler. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java | Pass TopN class into worker to register handler. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java | Unify OAL/MAL aggregation & persistence workers onto shared queues. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java | Replace L2 persistence DataCarrier with unified BatchQueue + queue metrics. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java | Remove OAL-specific L2 worker (unified queue). |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java | Remove MAL-specific L2 worker (unified queue). |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java | Replace L1 DataCarrier pools with unified BatchQueue + queue metrics. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java | Remove OAL-specific L1 worker (unified queue). |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java | Remove MAL-specific L1 worker (unified queue). |
| oap-server/server-core/pom.xml | Replace datacarrier dependency with library-batch-queue. |
| oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java | Name the alarm scheduler thread. |
| oap-server/microbench/src/main/java/org/apache/skywalking/oap/server/microbench/library/datacarrier/common/AtomicRangeIntegerV2.java | Remove microbench datacarrier artifacts (module removal). |
| oap-server/microbench/src/main/java/org/apache/skywalking/oap/server/microbench/library/datacarrier/common/AtomicRangeIntegerV1.java | Remove microbench datacarrier artifacts (module removal). |
| oap-server/microbench/src/main/java/org/apache/skywalking/oap/server/microbench/library/datacarrier/common/AtomicRangeIntegerBenchmark.java | Remove microbench datacarrier artifacts (module removal). |
| oap-server/microbench/pom.xml | Drop datacarrier dependency from microbench module. |
| oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java | Adjust tests for new exporter queue lifecycle and method names. |
| oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java | Update config assertions for new buffer setting field. |
| oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java | Replace exporter DataCarrier with BatchQueue. |
| oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java | Replace exporter DataCarrier with BatchQueue. |
| oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java | Replace exporter DataCarrier with BatchQueue. |
| oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java | Simplify exporter buffer config to a single bufferSize. |
| oap-server/exporter/pom.xml | Replace datacarrier dependency with library-batch-queue. |
| docs/en/setup/backend/grafana-instance.json | Update Grafana instance dashboard panels/labels for unified queues and slot. |
| docs/en/setup/backend/grafana-cluster.json | Update Grafana cluster dashboard panels/labels for unified queues and slot. |
| docs/en/changes/changes.md | Document new module, migration details, and thread naming inventory. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...eue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueManager.java
Show resolved
Hide resolved
...eue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueManager.java
Show resolved
Hide resolved
3 tasks
Introduce BatchQueue as a new queue implementation for metrics aggregation with type-based dispatch, configurable partition selection, and adaptive backoff. Includes comprehensive unit tests, throughput benchmarks, and DataCarrier comparison benchmarks. Also migrates existing microbench benchmarks out of the JMH-based microbench module into standard JUnit tests.
Replace DataCarrier with BatchQueue for L1 metrics aggregation, L2 metrics persistence, TopN persistence, all three exporters (gRPC metrics, Kafka trace, Kafka log), and gRPC remote client. Remove the library-datacarrier-queue module entirely. All metric types (OAL + MAL) now share unified queues instead of separate pools. Thread count reduced from 36 to 15 on 8-core. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
CLAUDE.md documents the module's design principles, architecture, dispatch modes, scheduler modes, key classes, and current usage across the codebase. DESIGN.md describes the throughput-weighted partition rebalancing feature: two-phase handoff protocol with cycle-count fencing for safe concurrent-free reassignment, targeting L2 persistence (primary) and L1 aggregation (secondary) queues. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace default pool-N-thread-M naming with descriptive thread names across all Executors.newXxx() calls for easier thread dump analysis. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Enable periodic partition-to-thread reassignment based on drain throughput to equalize load when metric types have skewed volume. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
516da79 to
7d69bbf
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
New feature:
library-batch-queuemoduleA partitioned, self-draining queue with type-based dispatch. Designed to replace DataCarrier in high-fan-out scenarios.
Key features:
addHandler()calls (~330 partitions for typical OAL+MAL workload on 8 threads), giving each metric type its own lock-freeArrayBlockingQueue.minIdleMs * 2^count, capped atmaxIdleMs), resetting on first non-empty drain. No busy-waiting.ThreadPolicyresolves thread count at runtime:cpuCores(1.0),cpuCoresWithBase(1, 0.25), orfixed(N).DrainBalancer.throughputWeighted()periodically reassigns partitions to equalize per-thread load when metric types have skewed throughput. Enabled for L1 and L2 with 10-second interval.Adaptive partition mode (
PartitionPolicy.adaptive())Used by L1 aggregation, L2 persistence, and TopN queues. The partition count grows as metric type handlers are registered via
addHandler(), rather than being fixed at construction time.The algorithm uses a threshold =
threadCount * multiplier(default multiplier = 25). With 8 drain threads the threshold is 200:ArrayBlockingQueue, eliminating contention between types.threshold + (handlerCount - threshold) / 2. This avoids unbounded partition growth while keeping most types isolated.Examples with 8 threads (threshold = 200):
In practice, a typical OAL + MAL deployment has ~460 metric types, producing ~330 partitions on 8 threads (threshold 200, so
200 + (460-200)/2 = 330).Thread policies
ThreadPolicy.cpuCores(multiplier)— Threads =round(multiplier × availableProcessors()), minimum 1. Scales with hardware: same config runs 8 threads on 8-core, 16 on 16-core.Used by L1 aggregation with
cpuCores(1.0)— one thread per core. L1 is CPU-bound (merging metrics in memory) and benefits from maximum parallelism.ThreadPolicy.cpuCoresWithBase(base, multiplier)— Threads =base + round(multiplier × availableProcessors()), minimum 1. Provides a guaranteed minimum (base) plus hardware-proportional scaling.Used by L2 persistence with
cpuCoresWithBase(1, 0.25)— 1 + ¼ of cores. On 8-core = 3 threads, on 16-core = 5 threads, on 24-core = 7 threads. L2 is I/O-bound (writing to storage); fewer threads are needed but at least 1 is always guaranteed even on minimal hardware.ThreadPolicy.fixed(N)— Exactly N threads regardless of hardware.Used by TopN (1 thread), exporters (1 thread each), gRPC remote client (1 thread per peer), JDBC batch (configurable, default 4).
Improve the performance of metrics aggregation and persistence pipeline
Thread count reduction from 36 to 15 on an 8-core machine (gRPC remote client excluded — unchanged 1 thread per peer).
cpuCores(1.0)cpuCoresWithBase(1, 0.25)fixed(1)fixed(1)eachThroughput: BatchQueue vs DataCarrier (ideal — uniform load across types)
Benchmark config: 32 producers,
fixed(8)drain threads,IF_POSSIBLEstrategy.Metric types simulated with 2000 distinct classes, each carrying a boxed
Longpayload.Throughput: static vs rebalanced (OAL/MAL skewed load)
Real OAL/MAL workloads have highly skewed entity counts — endpoint-scoped metrics (~24 entities) dominate
over service-scoped (~4 entities) and MAL (~1 entity). Static round-robin partition assignment creates
thread imbalance.
DrainBalancer.throughputWeighted()periodically snapshots per-partition throughput counters,sorts partitions by load descending, and assigns each to the least-loaded thread (LPT heuristic).
A two-phase handoff (revoke → cycle-count fence → assign) prevents concurrent handler invocations
during reassignment. Enabled for L1 aggregation and L2 persistence with a 10-second rebalance interval.
Benchmark config: 4 drain threads, 16 producers, 100 types with entity-count-driven skew, 500 LCG iterations per item.
Stability over 20 seconds (sampled every 2s after initial rebalance):
Other changes
ThreadFactoryto all anonymousExecutorspool threads for easier thread dump analysis.library-datacarrier-queuemodule. All usages replaced bylibrary-batch-queue.Commits
BatchQueue,BatchQueueManager,BatchQueueConfig,ThreadPolicy,PartitionPolicy,PartitionSelector,BufferStrategy,HandlerConsumer,BatchQueueStats,QueueErrorHandler. Includes unit tests, throughput benchmarks, and DataCarrier comparison benchmarks.library-datacarrier-queuemodule. FixRemoteClientManagerTestCasefor staticBatchQueueManagerregistry cleanup.pool-N-thread-Mthread names with descriptive names across allExecutors.newXxx()calls in the OAP server for easier thread dump analysis.DrainBalancerinterface withThroughputWeightedBalancerimplementation (LPT heuristic, two-phase handoff, cycle-count fencing). Enabled for L1 aggregation and L2 persistence queues with 10-second rebalance interval. Includesequals/hashCodeonThreadPolicy, duplicate queue name detection inBatchQueueManager.CHANGESlog.