Skip to content

Add new metric ingest/rows/published to fix flaky KinesisFaultToleranceTest#19177

Open
kfaraz wants to merge 6 commits intoapache:masterfrom
kfaraz:fix_flaky_kinesis_test
Open

Add new metric ingest/rows/published to fix flaky KinesisFaultToleranceTest#19177
kfaraz wants to merge 6 commits intoapache:masterfrom
kfaraz:fix_flaky_kinesis_test

Conversation

@kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Mar 18, 2026

Description

KinesisFaultToleranceTest.test_supervisorRecovers_afterChangeInTopicPartitions() has been flaky due to discrepancy in the number of records ingested:

Failure report
<testcase name="test_supervisorRecovers_afterChangeInTopicPartitions" classname="org.apache.druid.testing.embedded.kinesis.KinesisFaultToleranceTest" time="25.405">
    <failure message="expected: &lt;2000&gt; but was: &lt;2092&gt;" type="org.opentest4j.AssertionFailedError"><![CDATA[org.opentest4j.AssertionFailedError: expected: <2000
> but was: <2092>
        at org.apache.druid.testing.embedded.indexing.StreamIndexTestBase.waitUntilPublishedRecordsAreIngested(StreamIndexTestBase.java:148)
        at org.apache.druid.testing.embedded.indexing.StreamIndexFaultToleranceTest.verifyAndTearDown(StreamIndexFaultToleranceTest.java:64)
        at java.base/java.lang.reflect.Method.invoke(Method.java:565)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1604)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1604)
]]></failure>
    <system-out><![CDATA[2026-03-18T06:13:50,126 INFO [HttpRemoteTaskRunner-worker-sync-3] org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner - Worker[10.10.200.2
09:8091] completed task[index_kinesis_datasource_hneafbpc_08bbfa356e4ee10_ifffdfao] with status[FAILED]
2026-03-18T06:13:50,126 INFO [HttpRemoteTaskRunner-worker-sync-3] org.apache.druid.indexing.overlord.TaskQueue - Received status[FAILED] for task[index_kinesis_datasource_h
neafbpc_08bbfa356e4ee10_ifffdfao].
2026-03-18T06:13:50,126 INFO [TaskQueue-OnComplete-4] org.apache.druid.indexing.overlord.MetadataTaskStorage - Updating status of task [index_kinesis_datasource_hneafbpc_08
bbfa356e4ee10_ifffdfao] to [TaskStatus{id=index_kinesis_datasource_hneafbpc_08bbfa356e4ee10_ifffdfao, status=FAILED, duration=23, errorMsg=org.apache.druid.indexing.seekabl
estream.common.StreamException: software.amazon.awssdk.services.kinesis.model.ResourceInUseException: Stream arn:aws:kinesis:us-east-1:000000000000:stream/topic_datasource_
hneafbpc is not currently ACTIVE or UPDATING. (Service: Kinesis, Status Code: 400, Request ID: b148c33e-2dc2-4a23-9e67-bdc5ff28445e) (SDK Attempt Count: 1)
        at org.apache.druid.indexing.kinesis.KinesisRecordSupplier.wrapExceptions(KinesisRecordSupplier.java:123)
        at org.apache.druid.indexing.kinesis.Kinesi...27 characters omitted....common.RetryUtils.retry(RetryUtils.java:129)
        at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:81)
        at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:163)
        at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:153)
        at org.apache.druid.indexing.kinesis.KinesisRecordSupplier.lambda$isOffsetAvailable$0(KinesisRecordSupplier.java:705)
        at org.apache.druid.indexing.kinesis.KinesisRecordSupplier.wrapExceptions(KinesisRecordSupplier.java:120)
        ... 15 more
}].

The root cause seems to be the failure of tasks that were in progress when the partition count changed.
Since these tasks fail, some records are re-ingested by the supervisor thereby increasing the number of times the metric ingest/events/processed is emitted.

Also, this metric only indicates that the records have been processed, not that they have been published.

Changes

  • Add new metric ingest/rows/published
  • This metric indicates the total row count of segments successfully published to the metadata store by a streaming task
  • Reformat processing/src/test/resources/defaultMetrics.json to avoid confusing diffs later.

Release note

Emit new metric ingest/rows/published from all task types to denote the total row count of successfully published segments.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz changed the title Fix flaky KinesisFaultToleranceTest Add new metric ingest/rows/published to fix flaky KinesisFaultToleranceTest Mar 19, 2026
@kfaraz kfaraz marked this pull request as ready for review March 19, 2026 08:04
@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Mar 19, 2026
Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the metric to processing/src/main/resources/defaultMetrics.json, processing/src/test/resources/defaultMetrics.json, extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json, and extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json.

.map(DataSegment::getTotalRows)
.filter(Objects::nonNull)
.mapToInt(Integer::intValue)
.sum();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A single batch ingest could publish more than 2 billion rows, please make this a long sum.

@@ -1 +1,250 @@
{"compact/segmentAnalyzer/fetchAndProcessMillis":[],"compact/task/count":[],"compactTask/availableSlot/count":[],"compactTask/maxSlot/count":[],"coordinator/global/time":[],"coordinator/time":[],"groupBy/maxMergeDictionarySize":[],"groupBy/maxSpilledBytes":[],"groupBy/mergeDictionarySize":[],"groupBy/spilledBytes":[],"groupBy/spilledQueries":[],"ingest/count":[],"ingest/events/duplicate":[],"ingest/events/messageGap":[],"ingest/events/processed":[],"ingest/events/processedWithError":[],"ingest/events/thrownAway":[],"ingest/events/unparseable":[],"ingest/handoff/count":[],"ingest/handoff/failed":[],"ingest/handoff/time":[],"ingest/input/bytes":[],"ingest/kafka/avgLag":[],"ingest/kafka/lag":[],"ingest/kafka/maxLag":[],"ingest/kafka/partitionLag":[],"ingest/merge/cpu":[],"ingest/merge/time":[],"ingest/notices/queueSize":[],"ingest/notices/time":[],"ingest/pause/time":[],"ingest/persists/backPressure":[],"ingest/persists/count":[],"ingest/persists/cpu":[],"ingest/persists/failed":[],"ingest/persists/time":[],"ingest/rows/output":[],"ingest/segments/count":[],"ingest/sink/count":[],"ingest/tombstones/count":[],"interval/compacted/count":[],"interval/skipCompact/count":[],"interval/waitCompact/count":[],"jetty/numOpenConnections":[],"jetty/threadPool/busy":[],"jetty/threadPool/idle":[],"jetty/threadPool/isLowOnThreads":[],"jetty/threadPool/max":[],"jetty/threadPool/min":[],"jetty/threadPool/queueSize":[],"jetty/threadPool/ready":[],"jetty/threadPool/total":[],"jetty/threadPool/utilizationRate":[],"jetty/threadPool/utilized":[],"jvm/bufferpool/capacity":[],"jvm/bufferpool/count":[],"jvm/bufferpool/used":[],"jvm/gc/count":[],"jvm/gc/cpu":[],"jvm/mem/committed":[],"jvm/mem/init":[],"jvm/mem/max":[],"jvm/mem/used":[],"jvm/pool/committed":[],"jvm/pool/init":[],"jvm/pool/max":[],"jvm/pool/used":[],"kafka/consumer/bytesConsumed":[],"kafka/consumer/fetch":[],"kafka/consumer/fetchLatencyAvg":[],"kafka/consumer/fetchLatencyMax":[],"kafka/consumer/fetchRate":[],"kafka/consumer/fetchSizeAvg":[],"kafka/consumer/fetchSizeMax":[],"kafka/consumer/incomingBytes":[],"kafka/consumer/outgoingBytes":[],"kafka/consumer/recordsConsumed":[],"kafka/consumer/recordsLag":[],"kafka/consumer/recordsPerRequestAvg":[],"kill/eligibleUnusedSegments/count":[],"kill/pendingSegments/count":[],"kill/task/count":[],"killTask/availableSlot/count":[],"killTask/maxSlot/count":[],"mergeBuffer/acquisitionTimeNs":[],"mergeBuffer/maxAcquisitionTimeNs":[],"mergeBuffer/pendingRequests":[],"mergeBuffer/queries":[],"mergeBuffer/used":[],"metadata/kill/audit/count":[],"metadata/kill/compaction/count":[],"metadata/kill/datasource/count":[],"metadata/kill/rule/count":[],"metadata/kill/supervisor/count":[],"metadatacache/backfill/count":[],"metadatacache/init/time":[],"metadatacache/refresh/count":[],"metadatacache/refresh/time":[],"metadatacache/schemaPoll/count":[],"metadatacache/schemaPoll/failed":[],"metadatacache/schemaPoll/time":[],"query/bytes":[],"query/cache/delta/averageBytes":[],"query/cache/delta/errors":[],"query/cache/delta/evictions":[],"query/cache/delta/hitRate":[],"query/cache/delta/hits":[],"query/cache/delta/misses":[],"query/cache/delta/numEntries":[],"query/cache/delta/put/error":[],"query/cache/delta/put/ok":[],"query/cache/delta/put/oversized":[],"query/cache/delta/sizeBytes":[],"query/cache/delta/timeouts":[],"query/cache/total/averageBytes":[],"query/cache/total/errors":[],"query/cache/total/evictions":[],"query/cache/total/hitRate":[],"query/cache/total/hits":[],"query/cache/total/misses":[],"query/cache/total/numEntries":[],"query/cache/total/put/error":[],"query/cache/total/put/ok":[],"query/cache/total/put/oversized":[],"query/cache/total/sizeBytes":[],"query/cache/total/timeouts":[],"query/count":[],"query/cpu/time":[],"query/failed/count":[],"query/interrupted/count":[],"query/node/bytes":[],"query/node/time":[],"query/node/ttfb":[],"query/segment/time":[],"query/segmentAndCache/time":[],"query/success/count":[],"query/time":[],"query/timeout/count":[],"query/wait/time":[],"schemacache/finalizedSchemaPayload/count":[],"schemacache/finalizedSegmentMetadata/count":[],"schemacache/inTransitSMQPublishedResults/count":[],"schemacache/inTransitSMQResults/count":[],"schemacache/realtime/count":[],"segment/added/bytes":[],"segment/assignSkipped/count":[],"segment/assigned/count":[],"segment/availableDeepStorageOnly/count":[],"segment/compacted/bytes":[],"segment/compacted/count":[],"segment/count":[],"segment/deleted/count":[],"segment/dropQueue/count":[],"segment/dropSkipped/count":[],"segment/dropped/count":[],"segment/loadQueue/assigned":[],"segment/loadQueue/cancelled":[],"segment/loadQueue/count":[],"segment/loadQueue/failed":[],"segment/loadQueue/size":[],"segment/loadQueue/success":[],"segment/max":[],"segment/moveSkipped/count":[],"segment/moved/bytes":[],"segment/moved/count":[],"segment/nuked/bytes":[],"segment/overShadowed/count":[],"segment/pendingDelete":[],"segment/scan/active":[],"segment/scan/pending":[],"segment/size":[],"segment/skipCompact/bytes":[],"segment/skipCompact/count":[],"segment/unavailable/count":[],"segment/underReplicated/count":[],"segment/unneeded/count":[],"segment/unneededEternityTombstone/count":[],"segment/used":[],"segment/usedPercent":[],"segment/waitCompact/bytes":[],"segment/waitCompact/count":[],"serverview/init/time":[],"serverview/sync/healthy":[],"serverview/sync/unstableTime":[],"service/heartbeat":[],"sqlQuery/bytes":[],"sqlQuery/planningTimeMs":[],"sqlQuery/time":[],"sys/cpu":[],"sys/disk/queue":[],"sys/disk/read/count":[],"sys/disk/read/size":[],"sys/disk/transferTime":[],"sys/disk/write/count":[],"sys/disk/write/size":[],"sys/fs/files/count":[],"sys/fs/files/free":[],"sys/fs/max":[],"sys/fs/used":[],"sys/mem/free":[],"sys/mem/max":[],"sys/mem/used":[],"sys/net/read/dropped":[],"sys/net/read/errors":[],"sys/net/read/packets":[],"sys/net/read/size":[],"sys/net/write/collisions":[],"sys/net/write/errors":[],"sys/net/write/packets":[],"sys/net/write/size":[],"sys/storage/used":[],"sys/swap/free":[],"sys/swap/max":[],"sys/swap/pageIn":[],"sys/swap/pageOut":[],"sys/tcpv4/activeOpens":[],"sys/tcpv4/attemptFails":[],"sys/tcpv4/estabResets":[],"sys/tcpv4/in/errs":[],"sys/tcpv4/in/segs":[],"sys/tcpv4/out/rsts":[],"sys/tcpv4/out/segs":[],"sys/tcpv4/passiveOpens":[],"sys/tcpv4/retrans/segs":[],"sys/uptime":[],"task/action/batch/attempts":[],"task/action/batch/queueTime":[],"task/action/batch/runTime":[],"task/action/batch/size":[],"task/action/failed/count":[],"task/action/run/time":[],"task/action/success/count":[],"task/autoScaler/requiredCount":[],"task/failed/count":[],"task/pending/count":[],"task/pending/time":[],"task/run/time":[],"task/running/count":[],"task/segmentAvailability/wait/time":[],"task/success/count":[],"task/waiting/count":[],"tier/historical/count":[],"tier/replication/factor":[],"tier/required/capacity":[],"tier/total/capacity":[],"zk/connected":[],"zk/reconnect/time":[]}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reformatted this file so that future diffs are cleaner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants