feat(connectors): Implement influxdb v2 and v3 connector with separate source and sink crates#3140
feat(connectors): Implement influxdb v2 and v3 connector with separate source and sink crates#3140ryerraguntla wants to merge 54 commits intoapache:masterfrom
Conversation
Implements Issue apache#2540 - Redshift Sink Connector with S3 staging support. Features: - S3 staging with automatic CSV file upload - Redshift COPY command execution via PostgreSQL wire protocol - IAM role authentication (recommended) or access key credentials - Configurable batch size and compression (gzip, lzop, bzip2, zstd) - Automatic table creation with customizable schema - Retry logic with exponential backoff for transient failures - Automatic cleanup of staged S3 files Configuration options: - connection_string: Redshift cluster connection URL - target_table: Destination table name - iam_role: IAM role ARN for S3 access (recommended) - s3_bucket/s3_region/s3_prefix: S3 staging location - batch_size: Messages per batch (default: 10000) - compression: COPY compression format - delete_staged_files: Auto-cleanup toggle (default: true) - auto_create_table: Create table if missing (default: true) Closes apache#2540
- Fix markdown lint issues in README.md (table formatting, blank lines, code fence language) - Fix trailing newline in Cargo.toml - Apply TOML formatting via taplo - Add missing dependencies to DEPENDENCIES.md (rust-s3, rxml, rxml_validation, static_assertions)
- Add Redshift sink integration test using PostgreSQL (Redshift-compatible) and LocalStack for S3 - Add s3_endpoint config option to support custom endpoints (LocalStack, MinIO) - Add path-style S3 access for custom endpoints - Add localstack feature to testcontainers-modules - Create test configuration files for Redshift connector
- Add s3_endpoint: None to test_config() in lib.rs (fixes E0063) - Add endpoint parameter to S3Uploader tests in s3.rs - Fix formatting for long line in init_s3_uploader() - Add iggy_connector_redshift_sink to DEPENDENCIES.md - Add maybe-async, md5, minidom to DEPENDENCIES.md
Critical fixes: - Change Rust edition from 2024 to 2021 in Cargo.toml - Fix S3 cleanup to happen regardless of COPY result (prevents orphaned files) Moderate fixes: - Remove zstd from valid compression options (not supported by Redshift) - Update README to remove zstd from compression list - Handle bucket creation error in integration tests with expect() - Log JSON serialization errors instead of silent unwrap_or_default() Performance: - Cache escaped quote string to avoid repeated format! allocations Windows compatibility (for local testing): - Add #[cfg(unix)] conditionals for Unix-specific code in sender/mod.rs
Fixes clippy warning about unused 'runtime' field in test setup struct. The runtime field is kept for future test expansion.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Changed CONFIG_ to PLUGIN_CONFIG_ for plugin configuration fields - Changed TOPICS_0 to TOPICS with proper JSON array format - Added CONSUMER_GROUP environment variable
… into feat/redshift-connector
…ort with latest S3 crate Migrate S3 usage from rust-s3 to s3-tokio and update related dependencies. Top-level Cargo.toml updated (http, lz4_flex, toml) and DEPENDENCIES.md adjusted. redshift_sink/Cargo.toml switched to s3-tokio, made sqlx a workspace dependency and added rustls as a dev-dependency. Code changes: S3Uploader now owns Bucket (removed Box) and tests install the rustls crypto provider. Integration tests were refactored to remove the manual testcontainers setup in favor of the iggy_harness-based test harness.
Introduce a new core/connectors/influxdb_common crate that provides a version-abstraction layer for InfluxDB (InfluxDB V2 and V3). Adds the InfluxDbAdapter trait, ApiVersion factory, line-protocol escaping helpers, CSV/JSONL response parsers, and concrete V2/V3 adapters plus unit tests and architecture notes. Wire the new crate into the workspace (Cargo.toml/Cargo.lock) and update existing influxdb sink/source connector manifests/sources to depend on it. Also add integration test fixtures and v3-specific integration tests and configs to exercise V3 behavior.
Remove the external influxdb common adapter and refactor the sink to natively support both V2 and V3 configurations. Key changes: - Removed iggy_connector_influxdb_common dependency (Cargo.toml & Cargo.lock) and inlined adapter logic. - Introduced InfluxDbSinkConfig enum with V2/V3 variants and helper methods (url, auth header, build_write_url/health_url, precision mapping, feature flags, etc.). - Reworked InfluxDbSink struct: store unified config, auth_header, measurement/precision, metadata flags, batch size limit, and other derived fields. - Added line-protocol escaping helpers (write_measurement, write_tag_value, write_field_string) and simplified PayloadFormat handling. - Adjusted client initialization, connectivity checks, retry middleware setup, and improved error messages and transient vs permanent error handling. - Updated Sink impl: open(), consume(), process/ batching, circuit breaker interactions, and close() behavior. - Expanded and updated unit tests to cover v2/v3 config behavior, URL/precision mapping, escaping, and append_line error/success cases. - Added new source modules and test script files related to InfluxDB connectors. This refactor centralises version-specific behaviour, improves configurability, and prepares the connector for V3 line-protocol and auth differences.
Delete influx_dB_test_proc_docs/scripts/test-connectors.sh — an interactive Bash end-to-end test harness for InfluxDB v2/v3 connector scenarios (Iggy messaging, polling, and five connector tests). Removes helper functions, polling logic and all test cases bundled in the script.
Extract shared parsing and protocol logic into the influxdb_common crate and update sinks/sources to consume it. Introduces delegate! macros to remove repetitive variant matching, unifies URL/auth handling via InfluxDbAdapter (including V3 precision mapping), and centralises line-protocol escaping/row parsing. Optimises body construction (build_body) and Bytes usage, adds extensive unit & HTTP integration tests (axum dev-dep), and updates Cargo.toml entries accordingly to reflect the new shared dependency.
Ensure health_url trims a trailing '/' from the base URL in both V2 and V3 adapters to avoid double slashes when appending /health, and add tests verifying the behavior. Add tests that verify write_url percent-encodes bucket/org/db query parameters and that decoding recovers the original values. Improve CSV row parsing by preallocating Row with capacity based on active headers. Clean up influxdb_source Cargo.toml by removing unused csv and futures deps, add a comment explaining dashmap/once_cell are required due to macro expansion, and update the ignored list.
Refactor and harden InfluxDB connector common code: move Row type into row.rs and re-export it; make ApiVersion::from_config return Result and error on unknown values (avoid silent defaulting); make V3 precision mapping return Result and reject invalid precisions; validate sink precision early in open() to prevent silent timestamp mistakes. Add tab escaping to line-protocol writers and expand unit tests (empty inputs, tab escapes, unicode). Make CSV parser flexible for multi-table results and handle header updates. Strengthen RFC3339 cursor regex to reject out-of-range date parts. Improve test fixture container port handling to support IPv6 mappings and better error messages. Misc: minor visibility changes, JSONL format constant, Cargo description tweak, and additional tests to cover URL/health/build_query error cases.
Add validation and runtime fixes across InfluxDB connectors: - Require timezone suffix for cursor/initial_offset timestamps to avoid UTC-vs-local ambiguity and update regex/tests accordingly. - Validate V2 sink config to reject empty or whitespace-only orgs at open() to prevent runtime 400s. - Validate initial_offset early in source open() and add tests for invalid/timezone-free offsets. - Warn when a V2 Flux query lacks an explicit sort() because Skip-N dedup relies on stable ordering. - In V3 source row processing, emit a warning when no row contains the cursor column and ensure messages are still emitted while max_cursor remains None; add tests. - Simplify auth header and health URL construction (removed dynamic adapter usage for these paths). - Ensure circuit breaker records successes for successful batches and move record_success into the per-batch success path; add a test to prevent tripping on intermittent failures. - Change several atomic counter loads to SeqCst for correctness in tests and tighten an unreachable branch where precision is validated. - Minor protocol.rs doc clarifications about tab escaping in line protocol. Includes multiple unit/integration tests covering the new validations and circuit-breaker behavior.
core/connectors/influxdb_common: broaden CSV header detection to recognize any of `_time`, `_start`, or `_stop` so Flux window-aggregate results are parsed correctly; add tests covering _start/_stop-only headers and aggregation queries. core/connectors/sinks/influxdb_sink: strengthen atomic orderings (use AcqRel for fetch_add and Acquire for loads) to ensure correct cross-thread visibility of counters; update tests to use Acquire loads. core/connectors/sources/influxdb_source: derive Debug for RowProcessingResult and change process_rows to return an Err(Error::InvalidRecordValue) when no row contains the configured cursor field (instead of silently leaving max_cursor None). Update tests to expect the error — this prevents silent infinite re-delivery and surfaces misconfigured queries to the operator.
Delete the shared iggy_connector_influxdb_common crate and fold its functionality into the sink and source connectors. protocol.rs was moved/renamed into core/connectors/sinks/influxdb_sink/src/protocol.rs (helper functions made crate-private); row parsing was moved into core/connectors/sources/influxdb_source/src/row.rs and made crate-private. Adapter/config/v2/v3 logic was inlined into the respective sink/source code (URL builders, auth header generation, precision mapping, query builders, health URL checks), and relevant visibility and call sites were updated. Workspace Cargo.toml and Cargo.lock were updated to remove the member/dependency and to add CSV where needed; tests were adapted/added for the inlined helpers and validation behavior.
Implement backward-compatible deserialization for InfluxDB configs by adding custom Deserialize impls for InfluxDbSinkConfig and InfluxDbSourceConfig that default missing version to "v2" and reject unknown versions with a clear error. Add V3-specific options and safety checks: introduce include_metadata to omit the cursor field from emitted payloads, add QUERY_FORMAT_JSONL, and enforce MAX_STUCK_CAP_FACTOR (100) with validation on open to avoid extremely large queries. Make timestamp comparison conservative (return false on parse failure) to avoid skipping data. Switch message ID generation to per-message UUIDs (remove uuid_base usage), adjust payload building to filter cursor when include_metadata=false, and small sink fix to append lines without producing trailing newlines. Update and add tests covering config deserialization, timestamp behavior, stuck-cap validation, and other affected behaviors.
Various refactors and improvements to InfluxDB source/sink connectors: - Make many config fields pub(crate) to improve encapsulation. - Add toml as a dev-dependency for connectors and add default "version = \"v2\"" to example config.toml files. - Introduce base_url() helpers to normalize URLs (strip trailing slashes) and use them when building endpoints; validate V2 org is non-empty in sink config. - Introduce RowContext to consolidate per-poll parameters passed to row-processing routines; simplify signatures for process_rows and poll functions and propagate include_metadata consistently. - Optimize per-message UUID generation by deriving IDs from a single per-poll base UUID to reduce PRNG calls. - Add query_has_sort_call heuristic to detect Flux sort() calls (avoids false positives on identifier prefixes) and use it when checking V2 queries. - Improve error messages for cursor_field validation to be version-specific and add related tests. - Add comments clarifying escaping rules and rationale for using simd_json in the sink hot path. - Update integration test TOML keys from api_version to version and add unit tests verifying TOML deserialization defaults and behavior. These changes are focused on robustness, performance, and clearer configuration/validation behavior.
Minor refactor and formatting changes across the InfluxDB source, v3 logic and integration fixtures, plus Cargo.lock dependency updates. Key changes: - Reflow long argument lists and await expressions in influxdb_source lib and v3 for readability. - Tweak validate_cursor_field error message construction and test unwrap_err formatting in common.rs. - Use Option::is_none_or in query_has_sort_call and adjust related tests formatting. - Simplify container port mapping in Elasticsearch and MongoDB fixtures: collapse intermediate `ports` variable into `mapped_port` and remove the IPv6 fallback mapping. - Update Cargo.lock entries (add/remove/normalize some deps, e.g. axum, tokio, toml, simd-json; normalize lz4_flex entry and remove explicit package block). These are mostly non-functional formatting and small API-use changes to improve code clarity and dependency resolution.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3140 +/- ##
=============================================
- Coverage 74.46% 52.87% -21.59%
Complexity 943 943
=============================================
Files 1183 1191 +8
Lines 105866 95664 -10202
Branches 82899 72712 -10187
=============================================
- Hits 78835 50587 -28248
- Misses 24289 42445 +18156
+ Partials 2742 2632 -110
🚀 New features to boost your workflow:
|
Remove a duplicate lz4_flex entry from DEPENDENCIES.md and apply minor editorial fixes: normalize the markdown table header in the InfluxDB v3 architecture doc, change “mis-parsing” to “misparsing” in the InfluxDB sink protocol comment, and adjust a test comment from “Unparseable” to “Unparsable”. These are non-functional, readability/cleanliness updates.
Add 'text' to the Markdown code fence to ensure correct rendering and adjust the table header separator spacing for consistent Markdown formatting. Purely formatting changes; no functional code changes.
Reformat spacing/alignment in core/integration/tests/connectors/influxdb/sink_v3.toml and source_v3.toml. This is a whitespace-only cleanup (standardizing 'key = "value"' spacing) and does not change any configuration values or semantics.
|
@hubcio - Please review at your convinience |
hubcio
left a comment
There was a problem hiding this comment.
overall good direction - unified v2/v3 with backward-compat configs is the right shape, and the test coverage is solid. but not mergeable yet - please fix below issues.
Multiple improvements to InfluxDB source/sink connectors: - Add chrono dependency and tighten cursor validation: version field deserialization now rejects non-string values and validate_cursor uses chrono::DateTime::parse_from_rfc3339 to catch calendar-invalid dates. - Improve timestamp handling: is_timestamp_after now accepts a pre-parsed DateTime<Utc> for efficiency; added timestamps_equal to compare instants regardless of timezone formatting; cursor regex anchored for correct matching. - Protect secrets: auth_header changed from String to SecretBox<String> (sink and source), using expose_secret at request time and wrapping config auth strings into SecretBox. - Query/offset and validation: apply_query_params now supports $offset; batch_size accessor floors to >=1 and sources reject batch_size == 0 to avoid LIMIT 0 queries; validate_cursor_field rejects empty names and relaxed to allow custom names. - Parsing improvements: Row type changed to HashMap<String, serde_json::Value>; CSV rows store strings, JSONL parsing uses simd_json BorrowedValue to preserve numeric/boolean/null types and convert arrays/objects to serde_json values. - Misc: improved query_has_sort_call to ignore // line comments, remove non-Unix socket migration stub, replace some unreachable! with error returns, change Quickwit ingest commit param to commit=auto, and add comprehensive tests covering these behaviors. Also updates Cargo.toml to include chrono and corresponding lockfile changes. Need to confirm the needof SecretBox over Secret for the Sink connectors.
Add normalize_v3_timestamp to canonicalize InfluxDB v3 JSONL timestamps by appending a Z when missing and truncating fractional seconds to milliseconds (so timestamps round-trip through validate_cursor and SQL WHERE time comparisons). Integrate normalization into process_rows (cursor comparison, max cursor tracking, and rows_at_max_cursor counting) and adjust unparsable-cursor handling. Add unit tests covering normalization edge cases and a regression test for bare (no-Z) v3 timestamps. Also rename a test in common.rs to fix a typo.
Corrected the spelling of 'unparseable' to 'unparsable' in the test comment for normalize_v3_timestamp in core/connectors/sources/influxdb_source/src/v3.rs. This is a documentation-only change; no logic or tests were modified.
Change normalize_v3_timestamp to only append a trailing 'Z' when a timezone suffix is missing and otherwise return RFC3339 timestamps unchanged. Remove previous truncation-to-milliseconds logic so full sub-millisecond/nanosecond precision is preserved (avoids re-delivery when using WHERE time > '$cursor'). Update tests to expect nanosecond-preserving behavior, add a regression test ensuring distinct cursors for rows within the same millisecond, and keep fallback behavior of returning the Z-appended string for unparsable inputs.
|
Let me look at the failing tests. Could be Cargo dependencies. Let me update branch |
Update Cargo.lock via cargo update: add a new package entry for lz4_flex v0.13.0 (with checksum and twox-hash dependency) and normalize dependency entries to explicitly reference lz4_flex 0.12.1 where applicable. This records the updated dependency resolution in the lockfile.
Add a new entry for lz4_flex v0.13.0 (MIT) to DEPENDENCIES.md to record the updated dependency version. The previous 0.12.1 entry remains in the file.
|
@hubcio -Updated with the latest master. Please review at your convenience. |
|
|
||
| // Stuck-timestamp detection: if every row is at the current cursor | ||
| // and the batch was full, inflate and request more next time. | ||
| let stuck = result.all_at_cursor && rows.len() >= effective_batch as usize; |
There was a problem hiding this comment.
stuck-batch detection is unreachable with the default strict > '$cursor' query semantics. all_at_cursor at v3.rs:286-288 flips false unless every returned row's cursor value equals the input cursor - but strict > filters rows AT the cursor, so the input cursor never appears in results. default fixture (integration/tests/connectors/fixtures/influxdb/source_v3.rs:107) and unit-test config (v3.rs:793) both use WHERE time > '$cursor'. concrete data-loss scenario: 600 rows at one timestamp T_new, cursor=T_old, batch=100. first poll returns 100 rows at T_new, all_at_cursor=false (input cursor T_old != T_new), cursor advances to T_new, next poll's > T_new filters all 500 remaining T_new siblings - silent drop. fix: gate stuck on rows_at_max_cursor >= effective_batch as u64 (the actual signal) and validate that the user query contains $offset at open() when stuck_batch_cap_factor > 0.
|
|
||
| let status = response.status(); | ||
| if status.is_success() { | ||
| return response |
There was a problem hiding this comment.
unbounded response.text() here vs the v3 path which streams chunk-by-chunk with a 256 MiB cap (v3.rs:99-126). MAX_SKIP_INFLATION_FACTOR=10 at v2.rs:59 flows through render_query (v2.rs:71-73) so LIMIT can reach batch + min(already_seen, batch*10), up to ~11x batch_size. asymmetric DoS surface vs v3. mirror the v3 streaming cap: pre-check Content-Length, then response.chunk() loop that bails on len > MAX_RESPONSE_BODY_BYTES.
| trip_circuit_breaker: false, | ||
| }) | ||
| } | ||
| None => { |
There was a problem hiding this comment.
ref to lines 438-454. on stuck-cap circuit-breaker trip, new_state keeps effective_batch_size: effective_batch (which is at cap) and last_timestamp unchanged, then this state is persisted unconditionally at lib.rs:436-445. the only path that resets effective_batch_size to base_batch is the successful-advance path at v3.rs:481 - which never fires when data is genuinely stuck. after circuit cool-down, the next poll re-enters at cap and immediately re-trips. permanent stall, persisted across restart. fix: do not mutate effective_batch_size or last_timestamp_row_offset on the trip path, or reset effective_batch_size = base_batch when tripping the breaker. also note last_timestamp_row_offset: result.rows_at_max_cursor is written here on a path that emitted no messages, which is suspect on its own.
| skipped: u64, | ||
| ) { | ||
| if let Some(ref new_cursor) = max_cursor { | ||
| let should_advance = match state.last_timestamp.as_deref() { |
There was a problem hiding this comment.
ref to lines 533-546. is_ok_and(...) returns false on parse failure of state.last_timestamp, so should_advance=false, control falls into the else-branch which silently bumps cursor_row_count without emitting a warning or returning an error. validate_cursor only guards initial_offset at open() (lib.rs:205-207); a corrupted persisted cursor bypasses validation through restore_v2_state (lib.rs:127-156) since it deserializes without re-validating cursor strings. fix: on parse failure, return Err(Error::InvalidState) (with a message, see comment above) or at minimum error!() log so operators get a signal.
| if let InfluxDbSourceConfig::V2(cfg) = &self.config | ||
| && !query_has_sort_call(&cfg.query) | ||
| { | ||
| warn!( |
There was a problem hiding this comment.
ref to lines 231-244. v2 sort heuristic only warn!s. skip-N at v2.rs:387 is order-dependent: with >= query semantics and no |> sort(columns: ["_time"]), influxdb may return rows in storage order, causing dedup to skip the wrong rows silently. shipped default config uses strict > so skip-N is currently dead code, but any user that switches to >= (per the comment's own implication) silently gets wrong rows. fix: hard-error at open() when query lacks the sort call, or reject >=-style queries that don't pass the heuristic.
| ) -> Result<String, Error> { | ||
| // 1 KiB per message is a conservative estimate that accommodates JSON | ||
| // payloads without excessive reallocation. | ||
| let mut body = String::with_capacity(messages.len() * 1024); |
There was a problem hiding this comment.
String::with_capacity(messages.len() * 1024) reserves ~512 KiB per call at default batch=500. typical telemetry line is 250-600 bytes, so the heuristic is 2-4x over for normal payloads, dead-on for fat JSON. single allocation per batch (no buffer reuse across batches) means each batch is a malloc + free. low-impact at default batch sizes since one alloc dwarfs HTTP+serialize, but the real win is to keep a String buffer on &mut self and clear() between batches rather than tuning the capacity heuristic.
| /// Substitute `$cursor` and `$limit` placeholders in a query template in a | ||
| /// single pass, avoiding the two intermediate `String` allocations that | ||
| /// `clone() + replace() + replace()` would produce. | ||
| pub(crate) fn apply_query_params( |
There was a problem hiding this comment.
apply_query_params silently no-ops when $offset is absent from the user template. v3 stuck-batch logic at v3.rs:432,449 writes last_timestamp_row_offset into state on the assumption that the next query will use it via OFFSET $offset - but open() (lib.rs:193-302) never validates that the v3 query string contains $offset when stuck_batch_cap_factor > 1. default v3 fixture (integration/tests/connectors/influxdb/source_v3.toml) and the source config.toml template lack OFFSET $offset entirely. result: stuck-cap inflation runs without the offset clause, the same head rows get re-fetched and re-emitted on every poll - duplicate delivery. fix: at open(), when stuck_batch_cap_factor > 0 (or just unconditionally for v3), reject queries whose template doesn't contain the literal $offset token.
Replace the previous ports()/map_to_host_port logic with a direct call to container.get_host_port_ipv4(WIREMOCK_PORT).await in the WireMock test fixture, and simplify the associated error message. This makes host port retrieval more straightforward and reduces mapping complexity.
Remove the workspace tokio entry from core/connectors/sinks/quickwit_sink/Cargo.toml and update Cargo.lock to drop tokio from the dependency list. This removes an unnecessary direct dependency from the quickwit sink; run a build/tests to verify there are no missing runtime or compilation requirements.
Multiple fixes and improvements across InfluxDB connectors: - Cargo: pin assert_cmd to 2.2.0. - Sink: switch JSON serialization path to serde_json for Json payloads; parse+compact non-Json payloads; treat timestamp==0 as unset and substitute current wall-clock time with a warning. - Source (lib): add validation to reject V3 queries that lack an $offset when stuck-batch inflation is enabled (default); require V2 queries that use '>=' to include a sort call (error on open) to avoid skip-N dedup corruption; improve error handling when persisted V2 cursors fail RFC3339 parsing; update tests to include OFFSET and add V3 offset-related tests. - V2: add a 256 MiB response body cap and stream response chunks to avoid OOMs, with explicit errors if cap exceeded or body is invalid UTF-8. - V3: optimize payload serialization by serializing a borrowed RowView (avoids per-field clones); replace all_at_cursor boolean with rows_at_max_cursor counter and update stuck-batch detection to use this count; adjust stuck-batch circuit-breaker behavior to reset effective_batch_size to base and preserve offsets; update related tests. These changes improve safety (prevent OOMs and silent data-loss cases), clarify semantics for stuck-batches, and reduce allocation/serialization overhead in hot paths.
Remove simd-json from the InfluxDB sink crate's dependencies and clean up related source code. Drop the now-unused timestamps_equal helper and its test from the InfluxDB source common module and remove its import from v3. Also apply small refactors/formatting: streamline serde_json serialization error handling in the sink and v3 modules, fix minor whitespace/formatting in tests and query string literal, and other small tidy-ups. These changes remove an unused dependency and eliminate dead code / improve formatting for clarity.
Include an OFFSET $offset placeholder in the SQL used by InfluxDb3SourceFixture to support paginated queries. Also remove a couple of stray blank lines in core/connectors/sources/influxdb_source/src/common.rs.
Which issue does this PR close?
Closes # 3062
Rationale
This PR implements a unified InfluxDB connector supporting both InfluxDB V2 (Flux) and V3 (SQL) in a single crate per component (sink/source), eliminating code duplication while preserving full backward compatibility with existing V2 deployments.
Key Features:
#[must_use]on critical functions, version-strict cursor validationWhat changed?
Architecture
Before (V2-only):
influxdb_sink/src/lib.rs (single flat config, 1,625 LOC)
influxdb_source/src/lib.rs (single flat config, 1,400 LOC)
After (V2 + V3)
influxdb_sink/src/
├── lib.rs (enum dispatch, 1,330 LOC)
└── protocol.rs (shared line-protocol escaping, 115 LOC)
influxdb_source/src/
├── lib.rs (enum dispatch, 817 LOC)
├── common.rs (shared config/validation, 815 LOC)
├── row.rs (CSV/JSONL parsing, 193 LOC)
├── v2.rs (Flux query logic, 374 LOC)
└── v3.rs (SQL query + stuck detection, 506 LOC)
Benefits:
Single .so per component (no InfluxClient trait overhead)
Zero code duplication (shared validation, escaping, retry logic)
Asymmetric structure (sink: 30-line diff; source: separate modules for V2/V3 query semantics)
For more details , please refer to the #3062 Comments section.
#3062 (comment)
Local Execution
AI Usage
AI Tools Used - Claude and Copilot
Scope of usage - Code review for quality and identifying performance issues. Generation of test cases, Documentation and summary notes generation.
Generated code is tested with actual test execution.
Can you explain every line of the code - yes