Skip to content

Add opensearch-java module using java client#1868

Open
dpol1 wants to merge 3 commits intoapache:mainfrom
dpol1:feat/1515-opensearch-java
Open

Add opensearch-java module using java client#1868
dpol1 wants to merge 3 commits intoapache:mainfrom
dpol1:feat/1515-opensearch-java

Conversation

@dpol1
Copy link
Copy Markdown
Contributor

@dpol1 dpol1 commented Apr 3, 2026

Summary

This PR introduces the external/opensearch-java module, migrating StormCrawler from the deprecated RestHighLevelClient to the official opensearch-java client.

Following suggestion, this is built as a separate module to act as a drop-in replacement. Users can migrate seamlessly by simply updating their pom.xml artifactId, with zero changes required for Flux topologies, imports, or YAML configuration keys.

The legacy external/opensearch module remains untouched in this PR to allow a gradual phase-out.

Architectural Decisions & Engineering

Since the new opensearch-java client introduces a completely different paradigm (fluent builders, strict JSON mappers) and removes several legacy utility classes, the following architectural decisions were made:

1. AsyncBulkProcessor & Backpressure

The legacy BulkProcessor was removed in the new client. To prevent OutOfMemoryErrors and preserve Storm's backpressure, I implemented a custom AsyncBulkProcessor:

  • Buffers BulkOperations and flushes based on action count or a ScheduledExecutorService timer.
  • Uses a Semaphore to limit concurrent in-flight HTTP requests.
  • Uses a dedicated ThreadPoolExecutor with CallerRunsPolicy to process async callbacks without starving the JVM's ForkJoinPool.commonPool().

2. Transport, 100MB Buffer Limit & Sniffer Support

In the original issue discussion, I mentioned I would likely use the new ApacheHttpClient5Transport and ApacheHttpClient5Options to configure the response buffer and bypass the ContentTooLongException (100MB ceiling).

However, during implementation, I realized that switching to HC5 would break the Sniffer feature, as the official opensearch-rest-client-sniffer heavily relies on the low-level HC4 RestClient.

To maintain 100% feature parity as a drop-in replacement, I intentionally kept the RestClientTransport (which wraps the HC4 RestClient). This brilliantly solves both problems:

  1. The Sniffer works out of the box using the underlying RestClient.
  2. We bypass the 100MB buffer limit by injecting HeapBufferedResponseConsumerFactory via the classic RequestOptions (which RestClientTransport fully supports).

3. Concurrency & Race Condition Fixes

During the migration, I identified and fixed a race condition in IndexerBolt and DeletionBolt where tuples were added to the processor before being safely locked in the waitAck map. The locking order has been inverted to guarantee zero tuple loss during high-throughput flushes.

4. Upstream Bugfixes Sync

This module is perfectly aligned with main. It incorporates the recent bugfixes applied to the legacy module, adapted for the new asynchronous paradigm:

Test plan

  • End-to-End Integration: Verified the correct behavior of all Spouts and Bolts (IndexerBolt, StatusUpdaterBolt, DeletionBolt, etc.) against a real OpenSearch instance using Testcontainers.
  • Concurrency & Backpressure: Validated the custom AsyncBulkProcessor under load to ensure it correctly flushes based on size/time thresholds and strictly respects the Semaphore concurrency limits without dropping tuples.
  • Data Serialization: Confirmed correct JSON serialization for complex types, specifically ensuring that nextFetchDate and timestamp fields conform to ISO-8601 format to prevent OpenSearch mapping errors.
  • Archetype Validation: Verified that the updated Maven archetype successfully generates a working StormCrawler project, correctly wired with the new opensearch-java dependency and its associated configurations.
  • Project Compliance: Ensured all static analysis checks pass, including Apache RAT for license headers (with explicit safe exclusions for NDJSON dashboards) and code formatting rules.

Closes #1515

dpol1 added 3 commits April 2, 2026 10:16
- Cloned external/opensearch to external/opensearch-java to introduce the new client as a drop-in replacement.
- Updated Maven artifactId and names in the new local POMs (including the archetype).
- Registered the new module in the root POM.

This commit isolates the pure file duplication. The actual migration to the opensearch-java client will be done in the next commit to ensure a clean, readable Git diff for reviewers.
Introduces the external/opensearch-java module, replacing the deprecated
RestHighLevelClient with the official opensearch-java client. Designed as
a drop-in replacement for `external/opensearch` with identical configurations.

Key improvements:
- Implemented AsyncBulkProcessor (Semaphore + dedicated ThreadPool)
  to ensure strict backpressure and replace the legacy BulkProcessor.
- Fixed historical tuple-ack race conditions in IndexerBolt and DeletionBolt.
- Maintained RestClientTransport to seamlessly support the Sniffer and
  bypass the 100MB response buffer limit.
- Synced recent upstream bugfixes, adapting resource cleanup to the new
  async architecture.
@jnioche
Copy link
Copy Markdown
Contributor

jnioche commented Apr 3, 2026

thanks @dpol1
why not use the 3.x branch of OpenSearch since this is a complete rewrite? If this is to replace the older module in the future, we might as well start with whatever is the most recent.
Are the archetypes and dashboards very different from the existing module? If not, I would be in favour of removing them from here - having duplicates is a pain.
From a user's perspective all they will have to do is modify the name of the dependency

@dpol1
Copy link
Copy Markdown
Contributor Author

dpol1 commented Apr 4, 2026

thanks @dpol1 why not use the 3.x branch of OpenSearch since this is a complete rewrite? If this is to replace the older module in the future, we might as well start with whatever is the most recent.

Will do - I focused on the core logic first (AsyncBulkProcessor for buffers), the 3.x transport migration is next.

Are the archetypes and dashboards very different from the existing module? If not, I would be in favour of removing them from here - having duplicates is a pain.

They are identical to the existing module - removing them in this PR.

From a user's perspective all they will have to do is modify the name of the dependency

Correct - the only change is the artifactId from stormcrawler-opensearch to stormcrawler-opensearch-java. The package path and all YAML config keys remain unchanged.

@jnioche
Copy link
Copy Markdown
Contributor

jnioche commented Apr 4, 2026

During the migration, I identified and fixed a race condition in IndexerBolt and DeletionBolt where tuples were added to the processor before being safely locked in the waitAck map. The locking order has been inverted to guarantee zero tuple loss during high-throughput flushes.

Would be good to fix that in the existing module as a separate PR. Am about to push a refactoring of that module though, so maybe best to do after that if still applies? see #1869

@dpol1
Copy link
Copy Markdown
Contributor Author

dpol1 commented Apr 4, 2026

Thanks by all means, I'll keep the race-condition fix as-is in this module for now. Once your PR on the legacy module lands, I'll double-check the legacy module afterwards and if needed apply to this module as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Improvement] Replace OpenSearch High Level REST Client with Java Client

2 participants