Add opensearch-java module using java client#1868
Conversation
- Cloned external/opensearch to external/opensearch-java to introduce the new client as a drop-in replacement. - Updated Maven artifactId and names in the new local POMs (including the archetype). - Registered the new module in the root POM. This commit isolates the pure file duplication. The actual migration to the opensearch-java client will be done in the next commit to ensure a clean, readable Git diff for reviewers.
Introduces the external/opensearch-java module, replacing the deprecated RestHighLevelClient with the official opensearch-java client. Designed as a drop-in replacement for `external/opensearch` with identical configurations. Key improvements: - Implemented AsyncBulkProcessor (Semaphore + dedicated ThreadPool) to ensure strict backpressure and replace the legacy BulkProcessor. - Fixed historical tuple-ack race conditions in IndexerBolt and DeletionBolt. - Maintained RestClientTransport to seamlessly support the Sniffer and bypass the 100MB response buffer limit. - Synced recent upstream bugfixes, adapting resource cleanup to the new async architecture.
|
thanks @dpol1 |
Will do - I focused on the core logic first (
They are identical to the existing module - removing them in this PR.
Correct - the only change is the artifactId from |
Would be good to fix that in the existing module as a separate PR. Am about to push a refactoring of that module though, so maybe best to do after that if still applies? see #1869 |
|
Thanks by all means, I'll keep the race-condition fix as-is in this module for now. Once your PR on the legacy module lands, I'll double-check the legacy module afterwards and if needed apply to this module as well. |
Summary
This PR introduces the
external/opensearch-javamodule, migrating StormCrawler from the deprecatedRestHighLevelClientto the officialopensearch-javaclient.Following suggestion, this is built as a separate module to act as a drop-in replacement. Users can migrate seamlessly by simply updating their
pom.xmlartifactId, with zero changes required for Flux topologies, imports, or YAML configuration keys.The legacy
external/opensearchmodule remains untouched in this PR to allow a gradual phase-out.Architectural Decisions & Engineering
Since the new
opensearch-javaclient introduces a completely different paradigm (fluent builders, strict JSON mappers) and removes several legacy utility classes, the following architectural decisions were made:1.
AsyncBulkProcessor& BackpressureThe legacy
BulkProcessorwas removed in the new client. To preventOutOfMemoryErrors and preserve Storm's backpressure, I implemented a customAsyncBulkProcessor:BulkOperations and flushes based on action count or aScheduledExecutorServicetimer.Semaphoreto limit concurrent in-flight HTTP requests.ThreadPoolExecutorwithCallerRunsPolicyto process async callbacks without starving the JVM'sForkJoinPool.commonPool().2. Transport, 100MB Buffer Limit & Sniffer Support
In the original issue discussion, I mentioned I would likely use the new
ApacheHttpClient5TransportandApacheHttpClient5Optionsto configure the response buffer and bypass theContentTooLongException(100MB ceiling).However, during implementation, I realized that switching to HC5 would break the Sniffer feature, as the official
opensearch-rest-client-snifferheavily relies on the low-level HC4RestClient.To maintain 100% feature parity as a drop-in replacement, I intentionally kept the
RestClientTransport(which wraps the HC4RestClient). This brilliantly solves both problems:Snifferworks out of the box using the underlyingRestClient.HeapBufferedResponseConsumerFactoryvia the classicRequestOptions(whichRestClientTransportfully supports).3. Concurrency & Race Condition Fixes
During the migration, I identified and fixed a race condition in
IndexerBoltandDeletionBoltwhere tuples were added to the processor before being safely locked in thewaitAckmap. The locking order has been inverted to guarantee zero tuple loss during high-throughput flushes.4. Upstream Bugfixes Sync
This module is perfectly aligned with
main. It incorporates the recent bugfixes applied to the legacy module, adapted for the new asynchronous paradigm:AbstractSpout.OpenSearchConnectionduringSnifferinitialization failures.TimerandOpenSearchClientmemory leaks inJSONResourceWrapperandJSONURLFilterWrapperby properly implementingcleanup().Test plan
IndexerBolt,StatusUpdaterBolt,DeletionBolt, etc.) against a real OpenSearch instance using Testcontainers.AsyncBulkProcessorunder load to ensure it correctly flushes based on size/time thresholds and strictly respects theSemaphoreconcurrency limits without dropping tuples.nextFetchDateandtimestampfields conform to ISO-8601 format to prevent OpenSearch mapping errors.opensearch-javadependency and its associated configurations.Closes #1515