Distributed Query System with Join Enrichment and Enhanced Streaming#168
Distributed Query System with Join Enrichment and Enhanced Streaming#168olsonanl wants to merge 81 commits into
Conversation
Implement a distributed query system that queries Solr shards directly in parallel for improved performance on large result sets. Features: - ParallelQueryCoordinator: Concurrent queries across shards with configurable parallelism and efficient batch draining - ShardCursorStream: Cursor-based pagination with retry logic and proper timeout handling - MergeSortStream: K-way merge sort for sorted output across shards - SolrClusterClient: Cluster metadata with caching and auth propagation - Node exclusion: excludeNodes config to filter inaccessible hosts - SSL support: rejectUnauthorized and ca options for self-signed certs - Clean cancellation: Proper cleanup when clients disconnect - URL sanitization: Credentials hidden in debug logs Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Documentation: - DISTRIBUTED_QUERY_DOCS.md: Comprehensive API and architecture docs - DISTRIBUTED_QUERY_QUICKSTART.md: Quick start guide - DISTRIBUTED_QUERY_SPEC.md: Original specification - DISTRIBUTED_QUERY_IMPL_PLAN.md: Implementation plan Tests: - MinHeap unit tests - CacheManager unit tests - MergeSortStream unit tests - DistributedQueryConfig unit tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add documentation for the new distributed query system: - Debug commands for distributed query components - New test command (npm run test-distributed) - Key components reference (distributedQueryRouter, lib/distributed) - Configuration examples (excludeNodes, SSL options) - Network requirements for direct shard access Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Enable multiple parallel HTTP clients to each query a subset of shards, multiplying effective throughput. New parameters: - clientCount: Total number of parallel clients (e.g., 4) - clientIndex: This client's 0-based index (e.g., 0, 1, 2, or 3) Shards are distributed via round-robin: with 8 shards and 4 clients, client 0 gets shards [0,4], client 1 gets [1,5], etc. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously _hasSortRequirement() only recognized 'id' as the unique key, causing unnecessary merge-sort mode when sorting by collection-specific keys like 'feature_id'. Now fetches and caches the actual unique key from the collection schema. This allows sorting by the unique key (e.g., feature_id asc) to use the fast parallel query mode instead of the slow merge-sort mode. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The merge sort stream would deadlock when the number of shards times the batch size exceeded maxHeapDocs. The heap would fill before all shards could contribute documents, but _canOutput() requires all active shards to have at least one document in the heap. Fix: dynamically increase maxHeapDocs to at least (shardCount * batchSize) to ensure the heap can hold one batch from every shard. Example: 80 shards * 2000 batch = 160,000 minimum heap size. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The low highWaterMark values (16-64) caused excessive pause/resume cycles even on fast local connections. Increased buffer sizes: - ShardCursorStream: 16 -> 500 - ParallelQueryCoordinator: 64 -> 1000 - MergeSortStream: 64 -> 1000 - drainBatchSize: 100 -> 500 This reduces the frequency of backpressure-related pauses while still maintaining memory bounds. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace manual event-based streaming with Node.js pipeline() for proper backpressure coordination. The previous approach caused excessive pause/resume cycles because each res.write() that returned false would pause the stream, and each drain event only allowed one more document. Changes: - Use stream/promises pipeline() for automatic backpressure handling - Add Transform stream to convert objects to NDJSON with highWaterMark: 1000 - Move metadata output to transform flush() callback - Handle pipeline errors gracefully (including client disconnects) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The debug output was extremely noisy during normal operation due to per-document logging of backpressure state. Changes: - ShardCursorStream: Only log when pushing >1 doc before backpressure - ParallelQueryCoordinator: Only log backpressure when >100 docs pushed or buffer >1000, only log drains when >=1000 docs - Remove noisy _read() and resume event logging Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add ability to query Solr metrics API for real-time cluster load monitoring. This enables adaptive parallelism recommendations based on current cluster conditions. New SolrClusterClient methods: - getNodeMetrics(): Fetch metrics from /admin/metrics endpoint - getClusterLoad(): Aggregate load metrics across all cluster nodes - Helper methods for extracting and averaging metrics New DistributedQueryManager methods: - getClusterLoad(): Proxy to cluster client - getAdaptiveParallelism(): Recommend parallelism based on load - Reduces parallelism when avgQueryTimeMs > 200-500ms - Reduces parallelism when heap usage > 80-90% - Reduces parallelism when < 50% of nodes healthy New API endpoints: - GET /test/distributed-query/cluster-load - GET /test/distributed-query/adaptive-parallelism Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Inject auth credentials from configured Solr URL into metrics requests. The base_url from cluster status does not include auth, so we need to add credentials before making the request. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Solr returns metrics as direct values, not wrapped objects:
- memory.heap.used: 354440112 (not { value: N })
- QUERY./select.requests: 11903 (direct number)
- QUERY./select.requestTimes: { mean_ms: N, ... }
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Fetching metrics from all nodes is expensive (5+ seconds for 133 nodes). Cache results to avoid hammering Solr on repeated requests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When using multi-field sort with many shards, a deadlock could occur: 1. Fast shards fill the heap before slow shards respond 2. Heap backpressure pauses shards (including ones that had not contributed) 3. _canOutput() waits for every shard to have a doc in the heap 4. Paused shards never contribute -> deadlock, no output Fix: Track hasContributed flag per shard and never pause a shard until it has contributed at least one document. This ensures _canOutput() can eventually return true even when the heap fills up quickly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously, the replica selection logic preferred non-leaders and only used leaders as a fallback. This meant only half the cluster capacity was being utilized when each shard has both a leader and non-leader. Now randomly select from all active replicas to spread load across the entire cluster. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add periodic debug output to help diagnose merge sort issues: - Log heap state, contribution counts, and output counts every 10k docs - Log which shards are missing when _canOutput() returns false Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When using MergeSortStream with many shards, we must wait for all shards to contribute before outputting any documents. Using a smaller initial batch size (default 100 vs 2000) significantly reduces the time for each shard to respond with its first document. After the first batch, subsequent fetches use the normal batch size for throughput. New config option: initialBatchSize (default: 100, range: 1-2000) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Backpressure pause/resume events happen frequently during normal operation (every ~5-6ms). Reduce log spam by: - Only logging backpressure events every 5 seconds at most - Including backpressure count and document count for context - Removing resume logging entirely (too noisy, not useful) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The _canOutput() method was scanning the entire heap (up to 160,000 docs) on every single document output to check which shards are represented. For 1 million documents, this resulted in ~80-160 billion iterations. Fix: Track shard representation incrementally using a Map that counts how many docs each shard has in the heap. Update on push/pop for O(1) lookup instead of O(heap_size) scan. This should dramatically improve merge sort throughput to match the non-sorted (parallel) query performance. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When all documents from a paused shard are popped from the heap, _canOutput() returns false waiting for that shard. But the shard is paused and will not add more docs until heap drops to 80%. This creates a deadlock. Fix: When we pop the last doc from a shard, immediately resume that shard regardless of heap size. This ensures the shard can contribute more docs so _canOutput() can return true. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The _read() method only called _tryOutput() when outputPaused was true. But _tryOutput() needs to run whenever the consumer is ready, even if we were not in a backpressure state. Scenario that caused stalls: 1. All shards paused (heap full) 2. Consumer calls _read() wanting more data 3. outputPaused was false (we werent in backpressure from our side) 4. _tryOutput() never called, no output produced 5. Consumer hangs waiting for data Fix: Always call _tryOutput() in _read() - it will check _canOutput() and either output docs or return quickly if it cannot. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The heap full and shard resume messages were logging per-shard which created excessive noise. Now rate-limited to log at most every 5 seconds with aggregate counts. Debug output now shows: - Heap full: total pause count, logged every 5s - Shard resume: total resume count, logged every 5s - Backpressure: already rate-limited - _canOutput waiting: already rate-limited - _tryOutput state: already rate-limited (every 10k pushes) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Pre-warm shards before querying to warm Solr caches and get total document count. This establishes connections and warms filter caches by sending rows=0 queries to all shards in parallel. New config options: - prewarmShards: Enable/disable pre-warming (default: true) - prewarmTimeoutMs: Timeout for pre-warm requests (default: 10000) - prewarmMaxConcurrent: Max concurrent pre-warm requests (default: 50) Response now includes: - X-Total-Found header with total document count - X-Prewarm-Time-Ms header with pre-warm elapsed time - totalFound and prewarmElapsedMs in final _meta object Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously the prewarm was hardcoding q=*:* and then appending the query string, resulting in duplicate q= parameters. Solr would use the first q=*:* and ignore the actual query filter, returning 0 results. Now we append the query parameters directly (which include the q= and fq= values) and only default to q=*:* if no query is provided. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Log the constructed prewarm URL (sanitized) and any errors that occur during prewarm requests to help diagnose issues. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Prewarm requests were failing with self-signed certificate errors because they were not using the configured HTTPS agent with rejectUnauthorized: false. Now the prewarm function accepts an optional agent parameter that gets passed through to the HTTP request, using the same SSL configuration as the main query streams. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Include the actual parallelism value in response headers (X-Parallelism) and final _meta object. For merge-sort queries this equals shard count (all shards queried simultaneously), for parallel queries it reflects the configured maxParallelism. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The fastaHeaderFormatter was reading from req.query, but the dataType routes use custom http-params middleware that stores query parameters in req._parsedUrl.query instead of req.query. Add getQueryParams() helper that: - Checks req.query first (for routes using Express query parser) - Falls back to parsing req._parsedUrl.query or req.call_params[0] This allows http_fasta_* query parameters to work correctly: - http_fasta_id_fields - http_fasta_id_delimiter - http_fasta_description_fields - http_fasta_context_fields Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The http-params middleware was deleting all http_* query parameters that were not in the ALLOWED_HEADERS whitelist. This caused http_fasta_id_fields, http_fasta_description_fields, etc. to be removed before reaching the FASTA serializer. Change: non-whitelisted http_* params are now kept in the query string instead of being deleted. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The http-params middleware now extracts http_fasta_* parameters into req.fastaParams instead of passing them to Solr, which was causing "undefined field" errors. The fastaHeaderFormatter.js now merges req.fastaParams when getting query parameters, completing the FASTA header configuration flow. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Changed condition checks from truthy to !== undefined so that empty string values like http_fasta_context_fields= can be used to remove the default context fields from FASTA headers. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
FASTA headers can now include fields from the genome collection using the 'genome_metadata.field_name' syntax. When any header field references genome_metadata.*, the serializer automatically adds a GenomeMetadataJoinStream to the pipeline. Available genome fields: genome_name, taxon_id, genome_status, strain, assembly_accession, bioproject_accession, biosample_accession Example: http_fasta_context_fields=genome_metadata.strain,genome_metadata.assembly_accession Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Temporary commit to diagnose why genome metadata join is not being triggered. This should be removed after debugging. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Log call_method and fastaParams at start of serialize to help diagnose which code path is being taken and whether params are being passed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Error messages from SolrClusterClient._request() were including the full URL with credentials. Now using sanitizeUrl() to redact sensitive info. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When genome_metadata.* fields are requested in FASTA headers, the query mode path now fetches genome metadata via HTTP API and attaches it to documents before formatting. This works even when direct Solr client is unavailable (e.g., certificate issues), falling back to the regular API endpoint. Also adds getGenomeMetadataDict helper function for HTTP-based genome metadata lookups. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The FASTA serializers were creating SolrClusterClient without the TLS options from distributedQuery config, causing "self-signed certificate" errors. Now reads rejectUnauthorized and ca settings from the distributed query config and passes them via an HTTPS agent to SolrClusterClient. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The HTTPS agent with TLS options was being passed to SolrClusterClient but not to DirectSolrClient, which makes its own HTTP requests. Now both clients receive the agent with rejectUnauthorized settings. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The genome metadata join feature is working correctly. Removing the console.log debug statements that were added for troubleshooting. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Initial implementation of Genbank flat file export for genomes. Generates one Genbank record per contig with: - LOCUS, DEFINITION, ACCESSION, VERSION headers - SOURCE/ORGANISM with taxonomy lineage - FEATURES section with CDS, tRNA, rRNA, etc. - ORIGIN section with formatted sequence Usage: GET /genome/GENOME_ID?http_accept=application/genbank GET /genome_feature/?eq(genome_id,GENOME_ID)&http_accept=application/genbank Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add checks for null/undefined values in wrapText and wrapQualifierValue functions to prevent "text.split is not a function" errors. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The production FASTA output includes a trailing | after the ID fields: >patric_id|refseq_locus_tag|alt_locus_tag| product [...] This matches the legacy format that tools may depend on. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The default idFields was using feature_id first, but the PATRIC format uses patric_id, refseq_locus_tag, alt_locus_tag. Since the annotation filter is in the RQL query, not a query parameter, the annotation detection was not working. Updating the default to match PATRIC format ensures consistent output. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Include the genome ID (XXXX.XX format) in two places: - DBLINK section alongside BioProject/BioSample - db_xref qualifier in the source feature Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When http_genbank_merged=true is specified, generates a single Genbank record with all contigs concatenated into one sequence: - Builds offset map for each contig - Adjusts all feature coordinates by contig offset - Adds assembly_gap features at contig junctions - Useful for tools like Artemis that prefer single-record files Also adds http_genbank_* parameter extraction to http-params middleware. Usage: GET /genome/ID?http_accept=application/genbank&http_genbank_merged=true Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Documents all API endpoints, query syntax, output formats, and configuration options including: - Core data endpoints (/:dataType/, GET/POST/PATCH) - RQL and Solr query syntax with all operators - All 14+ output formats (JSON, CSV, FASTA, GFF, Genbank, etc.) - FASTA header customization (http_fasta_* parameters) - Genbank export options including merged mode - JSON-RPC methods (msa, cluster, proteinFamily, etc.) - Authentication and permission model - Distributed query endpoints - Error handling and security headers Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The API accepts the token directly in the Authorization header without requiring a "Bearer" prefix. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Refactor Genbank serializer to use true streaming for multi-record output: - Stream contigs one at a time from genome_sequence - Stream features for each contig individually - Write each record chunk immediately to response - Minimal memory footprint: only current contig + current feature in memory This significantly reduces memory pressure for large/fragmented genomes with many contigs. A draft genome with 500 contigs now only holds one contigs data in memory at a time, vs. holding all 500 previously. Merged mode (http_genbank_merged=true) remains non-streaming as it requires all data for coordinate adjustment. Also update API_REFERENCE.md to document streaming support. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The stream end handler was resolving the promise before all queued contigs finished processing. Fixed by: - Tracking the current processing promise explicitly - Adding a checkComplete function that waits for processing to finish - Properly awaiting all pending work before resolving This ensures all contigs are output before the HTTP response ends. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When querying the genome collection, the limit now applies to the number of genomes exported. Each genome in the result set is processed and all its contigs are output. For example: limit(10) returns Genbank records for 10 genomes, each with all their contigs. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implement select-aware enrichment joins that fetch fields from related collections when explicitly requested via select() or fl= parameter. Key features: - BatchJoiner class with LRU cache for efficient batch lookups - JoinEnrichment middleware inserted after ContentRange, before media - parseFieldList utility to extract field lists from query strings - Configurable per-collection join mappings in config.js Supported collections: genome_feature, pathway, subsystem, sp_gene, genome_amr - all can join genome_name, taxon_id from genome collection. Joins only trigger when joinable fields are explicitly requested, avoiding performance overhead for queries that do not need enrichment. Response headers indicate when joins are performed: - X-Join-Enrichment: true/error - X-Join-Fields: comma-separated list - X-Join-Time-Ms: enrichment duration Includes 57 unit and integration tests. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The JoinEnrichment middleware runs after APIMethodHandler, but the join key field (e.g., genome_id) must be present in the Solr response for the join to work. When users select only joinable fields like genome_name without including genome_id, the join would silently fail. Add JoinFieldInjector middleware that runs before APIMethodHandler to: - Detect when joinable fields are requested - Inject the required join key fields into the fl= parameter - Enable post-query enrichment to find the foreign key values Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The DirectSolrClient used for join lookups was not configured with the SSL/TLS options (CA certificate, rejectUnauthorized), causing self-signed certificate errors when connecting to Solr over HTTPS. Now uses the same TLS configuration as the distributed query system. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add development notes section to CLAUDE.md explaining the requirement to pass properly configured HTTPS agents when creating Solr clients. This prevents self-signed certificate errors in production. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Configuration File ChangesThis PR modifies the configuration structure in Removed:
|
Resolve conflict in routes/dataType.js by including both: - SolrQuerySanitizer from alpha (security middleware) - DistributedQuery, JoinFieldInjector, JoinEnrichment from this branch Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Summary
This PR introduces a comprehensive distributed query system along with join enrichment capabilities, improved streaming with backpressure, and new media format support. The changes enable:
event-streamStatistics
lib/distributed/tests/test-distributed/andtests/test-join/Major Components
1. Distributed Query System (
lib/distributed/)DistributedQueryManager.jsParallelQueryCoordinator.jsMergeSortStream.jsShardCursorStream.jsSolrClusterClient.jsDirectSolrClient.jsDistributedQueryConfig.jsDecision flow for distributed queries:
enabledCollections(or not indisabledCollections)minLimitThreshold(default: 10,000)queryorstreamX-Distributed-Queryheader or?distributed=param2. Join Enrichment System
middleware/JoinFieldInjector.jsfl=middleware/JoinEnrichment.jslib/BatchJoiner.jsSupported joins:
3. Enhanced Media Handlers
util/streamWithBackpressure.jshttp_fasta_*parameters, genome metadata enrichmentmedia/genbank.jsfor genome export with multi-contig supportMiddleware Chain Changes
Risk Assessment
🔴 HIGH RISK: Distributed Query Bypass
Impact: Queries meeting threshold criteria bypass standard
APIMethodHandlerand execute through the new distributed system.Trigger conditions (all must be true):
distributedQuery.enabled: true(default)disabledCollectionsqueryorstreamRisks:
queryrequestsMitigation:
{ "distributedQuery": { "enabled": false } }Or per-request:
X-Distributed-Query: falseheader🟡 MEDIUM RISK: Media Handler Changes
Impact: All streaming responses use new backpressure-aware implementation.
Changes:
event-stream.mapSync()→streamWithBackpressure()Affected formats: JSON, CSV, TSV, GFF, NDJSON, FASTA
Testing required:
🟡 MEDIUM RISK: Join Enrichment
Impact: Queries requesting joinable fields trigger secondary Solr lookups.
Risks:
Mitigation:
{ "joinEnrichment": { "enabled": false } }🟢 LOW RISK: http-params Changes
Impact: New parameter namespaces
http_fasta_*andhttp_genbank_*extracted before Solr query.Risk: Minimal - new parameters only, no existing behavior change.
Configuration
Add to
p3api.conf:{ "distributedQuery": { "enabled": false, "minLimitThreshold": 10000, "enabledCollections": [], "maxParallelism": 8, "cursorBatchSize": 2000, "rejectUnauthorized": false, "ca": "/path/to/ca-cert.pem" }, "joinEnrichment": { "enabled": true, "cacheSize": 200 } }Deployment Recommendations
Phase 1: Safe Merge
distributedQuery.enabled: falsePhase 2: Gradual Distributed Query Enablement
enabledCollections: ["genome"](single collection)Phase 3: Full Enablement
enabled: truewith appropriate thresholdTest Plan
npm testnpx mocha tests/test-distributed/npx mocha tests/test-join/Documentation
New documentation added:
Docs/DISTRIBUTED_QUERY_DOCS.md- System overviewDocs/DISTRIBUTED_QUERY_QUICKSTART.md- Quick start guideDocs/DISTRIBUTED_QUERY_SPEC.md- Technical specificationAPI_REFERENCE.md- API documentation🤖 Generated with Claude Code