Skip to content

Replace DataCarrier with BatchQueue for metrics pipeline#13703

Open
wu-sheng wants to merge 5 commits intomasterfrom
library-batch-queue
Open

Replace DataCarrier with BatchQueue for metrics pipeline#13703
wu-sheng wants to merge 5 commits intomasterfrom
library-batch-queue

Conversation

@wu-sheng
Copy link
Member

@wu-sheng wu-sheng commented Feb 15, 2026

New feature: library-batch-queue module

  • Update the documentation to include this new feature.
  • Tests(including UT, IT, E2E) are added to verify the new feature.

A partitioned, self-draining queue with type-based dispatch. Designed to replace DataCarrier in high-fan-out scenarios.

Key features:

  • One queue per concern, many types per queue. OAL + MAL metric types share unified queues instead of separate pools. Individual metric types register handlers; the queue groups drained items by class and dispatches to matching handlers.
  • Adaptive partitioning. Partition count grows automatically with addHandler() calls (~330 partitions for typical OAL+MAL workload on 8 threads), giving each metric type its own lock-free ArrayBlockingQueue.
  • Idle backoff. Drain loops double their sleep interval when idle (minIdleMs * 2^count, capped at maxIdleMs), resetting on first non-empty drain. No busy-waiting.
  • Shared scheduler with ref-counting. Low-throughput queues (exporters) share one thread pool; the pool auto-shuts down when the last queue releases it.
  • Hardware-aware thread sizing. ThreadPolicy resolves thread count at runtime: cpuCores(1.0), cpuCoresWithBase(1, 0.25), or fixed(N).
  • Throughput-weighted drain rebalancing. DrainBalancer.throughputWeighted() periodically reassigns partitions to equalize per-thread load when metric types have skewed throughput. Enabled for L1 and L2 with 10-second interval.

Adaptive partition mode (PartitionPolicy.adaptive())

Used by L1 aggregation, L2 persistence, and TopN queues. The partition count grows as metric type handlers are registered via addHandler(), rather than being fixed at construction time.

The algorithm uses a threshold = threadCount * multiplier (default multiplier = 25). With 8 drain threads the threshold is 200:

  • Below threshold (handlers ≤ 200): one partition per handler (1:1 mapping). Each metric type gets its own dedicated ArrayBlockingQueue, eliminating contention between types.
  • Above threshold (handlers > 200): excess handlers share partitions at 1:2 ratio: threshold + (handlerCount - threshold) / 2. This avoids unbounded partition growth while keeping most types isolated.

Examples with 8 threads (threshold = 200):

Registered handlers Partitions Ratio
0 (initial) 8 = threadCount
100 100 1:1
200 200 1:1, at threshold
500 350 200 + 300/2
1000 600 200 + 800/2

In practice, a typical OAL + MAL deployment has ~460 metric types, producing ~330 partitions on 8 threads (threshold 200, so 200 + (460-200)/2 = 330).

Thread policies

ThreadPolicy.cpuCores(multiplier) — Threads = round(multiplier × availableProcessors()), minimum 1. Scales with hardware: same config runs 8 threads on 8-core, 16 on 16-core.

Used by L1 aggregation with cpuCores(1.0) — one thread per core. L1 is CPU-bound (merging metrics in memory) and benefits from maximum parallelism.

ThreadPolicy.cpuCoresWithBase(base, multiplier) — Threads = base + round(multiplier × availableProcessors()), minimum 1. Provides a guaranteed minimum (base) plus hardware-proportional scaling.

Used by L2 persistence with cpuCoresWithBase(1, 0.25) — 1 + ¼ of cores. On 8-core = 3 threads, on 16-core = 5 threads, on 24-core = 7 threads. L2 is I/O-bound (writing to storage); fewer threads are needed but at least 1 is always guaranteed even on minimal hardware.

ThreadPolicy.fixed(N) — Exactly N threads regardless of hardware.

Used by TopN (1 thread), exporters (1 thread each), gRPC remote client (1 thread per peer), JDBC batch (configurable, default 4).

Improve the performance of metrics aggregation and persistence pipeline

  • Add a benchmark for the improvement.
  • The benchmark result.
  • Links/URLs to the theory proof or discussion articles/blogs: LPT (Longest Processing Time) heuristic — see Wikipedia: Multiprocessor scheduling.

Thread count reduction from 36 to 15 on an 8-core machine (gRPC remote client excluded — unchanged 1 thread per peer).

Queue Old threads New threads New policy
L1 Aggregation (OAL+MAL unified) 26 8 cpuCores(1.0)
L2 Persistence (OAL+MAL unified) 3 3 cpuCoresWithBase(1, 0.25)
TopN Persistence 4 1 fixed(1)
Exporters (gRPC/Kafka) 3 3 fixed(1) each
Total 36 15

Throughput: BatchQueue vs DataCarrier (ideal — uniform load across types)

Benchmark config: 32 producers, fixed(8) drain threads, IF_POSSIBLE strategy.
Metric types simulated with 2000 distinct classes, each carrying a boxed Long payload.

Partition strategy 500 types 1000 types 2000 types
DataCarrier baseline 33.4M 37.6M 38.0M
BatchQueue adaptive 45.7M (+37%) 50.5M (+34%) 64.0M (+68%)
BatchQueue 1:1 51.3M (+53%) 61.2M (+63%) 75.7M (+99%)

Throughput: static vs rebalanced (OAL/MAL skewed load)

Real OAL/MAL workloads have highly skewed entity counts — endpoint-scoped metrics (~24 entities) dominate
over service-scoped (~4 entities) and MAL (~1 entity). Static round-robin partition assignment creates
thread imbalance.

DrainBalancer.throughputWeighted() periodically snapshots per-partition throughput counters,
sorts partitions by load descending, and assigns each to the least-loaded thread (LPT heuristic).
A two-phase handoff (revoke → cycle-count fence → assign) prevents concurrent handler invocations
during reassignment. Enabled for L1 aggregation and L2 persistence with a 10-second rebalance interval.

Benchmark config: 4 drain threads, 16 producers, 100 types with entity-count-driven skew, 500 LCG iterations per item.

                    Static          Rebalanced
  Throughput:    7,211,794         8,729,310  items/sec
  Load ratio:       1.30x             1.04x  (max/min thread)
  Improvement:                       +21.0%

Stability over 20 seconds (sampled every 2s after initial rebalance):

  Interval    Throughput      Ratio
   0- 2s     8,915,955       1.00x
   2- 4s     8,956,595       1.01x
   4- 6s     8,934,778       1.00x
   6- 8s     8,838,461       1.01x
   8-10s     8,887,092       1.00x
  10-12s     8,844,614       1.00x
  12-14s     8,877,651       1.00x
  14-16s     8,851,595       1.01x
  16-18s     8,639,045       1.01x
  18-20s     8,708,210       1.01x
  Stable: YES (avg ratio 1.01x)

Other changes

  • Add named ThreadFactory to all anonymous Executors pool threads for easier thread dump analysis.
  • Remove library-datacarrier-queue module. All usages replaced by library-batch-queue.

Commits

# Commit Description
1 Add library-batch-queue module Core queue implementation: BatchQueue, BatchQueueManager, BatchQueueConfig, ThreadPolicy, PartitionPolicy, PartitionSelector, BufferStrategy, HandlerConsumer, BatchQueueStats, QueueErrorHandler. Includes unit tests, throughput benchmarks, and DataCarrier comparison benchmarks.
2 Replace DataCarrier with BatchQueue Migrate L1 aggregation, L2 persistence, TopN, all three exporters (gRPC metrics, Kafka trace, Kafka log), gRPC remote client, and JDBC batch DAO from DataCarrier to BatchQueue. Unify OAL+MAL into shared queues. Remove library-datacarrier-queue module. Fix RemoteClientManagerTestCase for static BatchQueueManager registry cleanup.
3 Add CLAUDE.md for library-batch-queue Module documentation: design principles, architecture, dispatch modes, scheduler modes, key classes, drain rebalancing, and per-queue usage across the codebase.
4 Add named ThreadFactory Replace anonymous pool-N-thread-M thread names with descriptive names across all Executors.newXxx() calls in the OAP server for easier thread dump analysis.
5 Add throughput-weighted drain rebalancing DrainBalancer interface with ThroughputWeightedBalancer implementation (LPT heuristic, two-phase handoff, cycle-count fencing). Enabled for L1 aggregation and L2 persistence queues with 10-second rebalance interval. Includes equals/hashCode on ThreadPolicy, duplicate queue name detection in BatchQueueManager.

@wu-sheng wu-sheng added this to the 10.4.0 milestone Feb 15, 2026
@wu-sheng wu-sheng added core feature Core and important feature. Sometimes, break backwards compatibility. enhancement Enhancement on performance or codes complexity:high Relate to multiple(>4) components of SkyWalking labels Feb 15, 2026
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new library-batch-queue module and migrates OAP server internal pipelines from DataCarrier to BatchQueue, aiming to reduce thread counts, unify OAL/MAL queues, and improve observability via named executor threads. It also updates OTel rules and Grafana/So11y dashboards to match the new queue metrics dimensions and removes the legacy library-datacarrier-queue module.

Changes:

  • Add library-batch-queue (partitioned, self-draining queue) and replace DataCarrier usages across aggregation/persistence/TopN/exporters/remote client.
  • Name anonymous executor threads across the OAP server for easier thread-dump analysis.
  • Update telemetry rules and dashboards for the new queue metrics tags/dimensions; remove library-datacarrier-queue and related tests.

Reviewed changes

Copilot reviewed 98 out of 98 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCBatchDAO.java Switch JDBC async batching from DataCarrier to BatchQueue.
oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/pom.xml Replace datacarrier dependency with library-batch-queue.
oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml Remove unused library-datacarrier-queue dependency.
oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json Update So11y dashboard expressions/titles for unified queues and new labels.
oap-server/server-starter/src/main/resources/otel-rules/oap.yaml Update OTel aggregation rule dimensions (remove kind, add slot).
oap-server/server-library/pom.xml Remove library-datacarrier-queue, add library-batch-queue module.
oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/benchmark/StringFormatGroupBenchmark.java Add benchmark-style test for StringFormatGroup.
oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/MultipleFilesChangeMonitor.java Name the scheduled executor thread for file monitoring.
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/partition/SimpleRollingPartitionerTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/partition/ProducerThreadPartitionerTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/SampleConsumer.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPoolFactoryTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumeDriverTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/SampleData.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/EnvUtilTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/test/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrierTest.java Remove datacarrier tests (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/partition/SimpleRollingPartitioner.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/partition/ProducerThreadPartitioner.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/partition/IDataPartitioner.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/MultipleChannelsConsumer.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/IDriver.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerThread.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPoolFactory.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumerPool.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/ConsumeDriver.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/consumer/BulkConsumePool.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/common/AtomicRangeInteger.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/QueueBuffer.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/Channels.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/buffer/ArrayBlockingQueueBuffer.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/EnvUtil.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-datacarrier-queue/src/main/java/org/apache/skywalking/oap/server/library/datacarrier/DataCarrier.java Remove datacarrier implementation (module removal).
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java Name the health-check scheduled executor thread.
oap-server/server-library/library-batch-queue/src/test/resources/log4j2-test.xml Add log4j2 test config for the new module tests.
oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicyTest.java Add unit tests for ThreadPolicy.
oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java Add unit tests for PartitionPolicy.
oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BenchmarkMetricTypes.java Add benchmark helper for dispatch testing.
oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueManagerTest.java Add tests for queue registry/shared scheduler semantics.
oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfigTest.java Add tests for config validation/defaults.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java Introduce thread-count policy abstraction.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/QueueErrorHandler.java Add error handler functional interface in batchqueue package.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionSelector.java Add partition-selection strategy API.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java Add partition-count policy abstraction (fixed/threadMultiply/adaptive).
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/HandlerConsumer.java Define handler consumer contract and idle hook.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BufferStrategy.java Define buffer backpressure/drop strategies.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueStats.java Add queue usage snapshot/top-N partition reporting.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueManager.java Add global queue registry + shared scheduler ref-counting.
oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java Add queue config builder + validation.
oap-server/server-library/library-batch-queue/pom.xml Rename module artifact + add test dependencies.
oap-server/server-library/library-batch-queue/DESIGN.md Document drain rebalancing design.
oap-server/server-library/library-batch-queue/CLAUDE.md Add module-specific assistant documentation.
oap-server/server-library/library-banyandb-client/src/main/java/org/apache/skywalking/library/banyandb/v1/client/grpc/channel/ChannelManager.java Name the BanyanDB channel manager scheduler thread.
oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java Name the health checker scheduler thread.
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java Update tests for new GRPCRemoteClient ctor signature.
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientRealClient.java Update tests for new GRPCRemoteClient ctor signature.
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/benchmark/RegexVSQuickMatchBenchmark.java Add benchmark-style tests for endpoint grouping performance.
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/benchmark/EndpointGrouping4OpenapiBenchmark.java Add benchmark-style tests for OpenAPI grouping reader/formatter.
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/benchmark/EBPFProfilingAnalyzerBenchmark.java Add benchmark-style tests for EBPF profiling analyzer.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java Name the watermark watcher scheduler thread.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ttl/DataTTLKeeperTimer.java Name the TTL keeper scheduler thread.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java Name persistence timer threads (scheduler + prepare pool).
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java Name remote client manager scheduler thread; update GRPCRemoteClient ctor usage.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java Replace DataCarrier with per-peer BatchQueue.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java Name EBPF fetch thread pool threads.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/hierarchy/HierarchyService.java Name hierarchy auto-matching scheduler thread.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterService.java Simplify exporter service interface (remove start()).
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/group/EndpointNameGrouping.java Name endpoint URI recognition scheduler thread.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java Name cache update scheduler thread.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java Switch TopN persistence to shared BatchQueue + per-type handler.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java Pass TopN class into worker to register handler.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java Unify OAL/MAL aggregation & persistence workers onto shared queues.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java Replace L2 persistence DataCarrier with unified BatchQueue + queue metrics.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinOALWorker.java Remove OAL-specific L2 worker (unified queue).
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinMALWorker.java Remove MAL-specific L2 worker (unified queue).
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java Replace L1 DataCarrier pools with unified BatchQueue + queue metrics.
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateOALWorker.java Remove OAL-specific L1 worker (unified queue).
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateMALWorker.java Remove MAL-specific L1 worker (unified queue).
oap-server/server-core/pom.xml Replace datacarrier dependency with library-batch-queue.
oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/AlarmCore.java Name the alarm scheduler thread.
oap-server/microbench/src/main/java/org/apache/skywalking/oap/server/microbench/library/datacarrier/common/AtomicRangeIntegerV2.java Remove microbench datacarrier artifacts (module removal).
oap-server/microbench/src/main/java/org/apache/skywalking/oap/server/microbench/library/datacarrier/common/AtomicRangeIntegerV1.java Remove microbench datacarrier artifacts (module removal).
oap-server/microbench/src/main/java/org/apache/skywalking/oap/server/microbench/library/datacarrier/common/AtomicRangeIntegerBenchmark.java Remove microbench datacarrier artifacts (module removal).
oap-server/microbench/pom.xml Drop datacarrier dependency from microbench module.
oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java Adjust tests for new exporter queue lifecycle and method names.
oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProviderTest.java Update config assertions for new buffer setting field.
oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/trace/KafkaTraceExporter.java Replace exporter DataCarrier with BatchQueue.
oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/kafka/log/KafkaLogExporter.java Replace exporter DataCarrier with BatchQueue.
oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java Replace exporter DataCarrier with BatchQueue.
oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/ExporterSetting.java Simplify exporter buffer config to a single bufferSize.
oap-server/exporter/pom.xml Replace datacarrier dependency with library-batch-queue.
docs/en/setup/backend/grafana-instance.json Update Grafana instance dashboard panels/labels for unified queues and slot.
docs/en/setup/backend/grafana-cluster.json Update Grafana cluster dashboard panels/labels for unified queues and slot.
docs/en/changes/changes.md Document new module, migration details, and thread naming inventory.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@wu-sheng wu-sheng changed the title Replace DataCarrier with BatchQueue; name all thread pools Replace DataCarrier with BatchQueue for metrics pipeline Feb 15, 2026
wu-sheng and others added 5 commits February 15, 2026 22:55
Introduce BatchQueue as a new queue implementation for metrics aggregation
with type-based dispatch, configurable partition selection, and adaptive
backoff. Includes comprehensive unit tests, throughput benchmarks, and
DataCarrier comparison benchmarks. Also migrates existing microbench
benchmarks out of the JMH-based microbench module into standard JUnit tests.
Replace DataCarrier with BatchQueue for L1 metrics aggregation,
L2 metrics persistence, TopN persistence, all three exporters
(gRPC metrics, Kafka trace, Kafka log), and gRPC remote client.
Remove the library-datacarrier-queue module entirely.

All metric types (OAL + MAL) now share unified queues instead of
separate pools. Thread count reduced from 36 to 15 on 8-core.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
CLAUDE.md documents the module's design principles, architecture,
dispatch modes, scheduler modes, key classes, and current usage
across the codebase.

DESIGN.md describes the throughput-weighted partition rebalancing
feature: two-phase handoff protocol with cycle-count fencing for
safe concurrent-free reassignment, targeting L2 persistence (primary)
and L1 aggregation (secondary) queues.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace default pool-N-thread-M naming with descriptive thread names
across all Executors.newXxx() calls for easier thread dump analysis.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Enable periodic partition-to-thread reassignment based on drain
throughput to equalize load when metric types have skewed volume.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@wu-sheng wu-sheng force-pushed the library-batch-queue branch from 516da79 to 7d69bbf Compare February 15, 2026 14:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

complexity:high Relate to multiple(>4) components of SkyWalking core feature Core and important feature. Sometimes, break backwards compatibility. enhancement Enhancement on performance or codes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant