test: add replica config for consumer#4290
Conversation
📝 WalkthroughWalkthroughThis PR updates integration test scripts to explicitly pass changefeed configuration files to consumer invocations across all sink types (Kafka, Storage, Pulsar). Previously, many tests passed empty strings or omitted configs; now they supply configuration paths as positional arguments or via Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~15 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a change across multiple integration test scripts to enable the passing of a configuration file to consumer functions. This enhancement allows for more granular control and testing of consumer behavior, particularly concerning replica configurations for various sink types like Kafka, Storage, and Pulsar, without altering the core logic of the consumers themselves. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request updates numerous integration test scripts to pass the changefeed configuration file to the respective consumers (kafka, storage, pulsar). This is a good improvement for consistency and ensures consumers use the same configuration as the changefeed. The changes are mostly correct and consistent across the modified files.
I've found a potential pre-existing issue in a few scripts where run_pulsar_consumer is called with a -- which seems to be a typo and could cause issues with argument parsing. I've left specific comments with suggestions to fix this.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: hongyunyan, wlwilliamx The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
@wk989898: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
tests/integration_tests/split_region/run.sh (1)
47-49: Config file naming is misleading for non-pulsar sinks.The config file is named
pulsar_test.tomlbut is passed to all consumer types (kafka, storage, pulsar). For non-pulsar sinks, this file is empty (line 43). Consider renaming to something more generic likeconsumer_config.tomlorsink_config.tomlto avoid confusion.That said, the functional changes correctly pass the configuration file to all consumer invocations, which aligns with the PR's objective.
♻️ Optional: Rename config file for clarity
if [ "$SINK_TYPE" == "pulsar" ]; then - cat <<EOF >>$WORK_DIR/pulsar_test.toml + cat <<EOF >>$WORK_DIR/sink_config.toml [sink.pulsar-config.oauth2] oauth2-issuer-url="http://localhost:9096" oauth2-audience="cdc-api-uri" oauth2-client-id="1234" oauth2-private-key="${WORK_DIR}/credential.json" EOF else - echo "" >$WORK_DIR/pulsar_test.toml + echo "" >$WORK_DIR/sink_config.toml fi - cdc_cli_changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $WORK_DIR/pulsar_test.toml + cdc_cli_changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $WORK_DIR/sink_config.toml case $SINK_TYPE in - kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" $WORK_DIR/pulsar_test.toml ;; - storage) run_storage_consumer $WORK_DIR $SINK_URI $WORK_DIR/pulsar_test.toml "" ;; - pulsar) run_pulsar_consumer --upstream-uri $SINK_URI --config $WORK_DIR/pulsar_test.toml --oauth2-private-key ${WORK_DIR}/credential.json --oauth2-issuer-url "http://localhost:9096" --oauth2-client-id "1234" ;; + kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" $WORK_DIR/sink_config.toml ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI $WORK_DIR/sink_config.toml "" ;; + pulsar) run_pulsar_consumer --upstream-uri $SINK_URI --config $WORK_DIR/sink_config.toml --oauth2-private-key ${WORK_DIR}/credential.json --oauth2-issuer-url "http://localhost:9096" --oauth2-client-id "1234" ;; esac🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration_tests/split_region/run.sh` around lines 47 - 49, The config file name pulsar_test.toml is misleading because it is passed to all consumers; rename the file to a generic name (e.g., consumer_config.toml or sink_config.toml) and update all references: change the file creation/assignment (where WORK_DIR/pulsar_test.toml is created/left empty) and the calls to run_kafka_consumer, run_storage_consumer, and run_pulsar_consumer to use WORK_DIR/consumer_config.toml (or your chosen name) so the intent is clear while preserving the same variable (WORK_DIR) and function calls.tests/integration_tests/maintainer_failover_when_operator/run.sh (1)
261-263: Consider quoting$SINK_URIfor consistency and defensive coding.The changes correctly pass the changefeed configuration to each consumer type. However,
$SINK_URIis unquoted here while it is quoted on line 259 (--sink-uri="$SINK_URI"). For the storage sink,SINK_URIcontains$work_dirwhich could theoretically include spaces.🔧 Suggested fix for consistent quoting
- kafka) run_kafka_consumer "$work_dir" $SINK_URI "$changefeed_config" ;; - storage) run_storage_consumer "$work_dir" $SINK_URI "$changefeed_config" "" ;; - pulsar) run_pulsar_consumer --upstream-uri $SINK_URI --config "$changefeed_config" ;; + kafka) run_kafka_consumer "$work_dir" "$SINK_URI" "$changefeed_config" ;; + storage) run_storage_consumer "$work_dir" "$SINK_URI" "$changefeed_config" "" ;; + pulsar) run_pulsar_consumer --upstream-uri "$SINK_URI" --config "$changefeed_config" ;;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration_tests/maintainer_failover_when_operator/run.sh` around lines 261 - 263, The SINK_URI parameter is unquoted in the calls to run_kafka_consumer, run_storage_consumer and run_pulsar_consumer which can break if $SINK_URI contains spaces; update the three invocations to quote the variable (e.g., run_kafka_consumer "$work_dir" "$SINK_URI" "$changefeed_config", run_storage_consumer "$work_dir" "$SINK_URI" "$changefeed_config" "" and run_pulsar_consumer --upstream-uri "$SINK_URI" --config "$changefeed_config") so the shell treats the entire URI as a single argument.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/integration_tests/_utils/run_pulsar_consumer`:
- Line 16: The run_pulsar_consumer wrapper now forwards all args via "$@" so
every invocation of run_pulsar_consumer in the integration test run.sh scripts
must be updated to pass an explicit --config flag; find all call sites of
run_pulsar_consumer and add --config=$WORK_DIR/<pulsar_config_file> (matching
the pattern used in the updated examples) before other args so the consumer gets
the intended Pulsar config, mirroring the change made in the already-updated
example invocation.
In `@tests/integration_tests/ddl_for_split_tables_with_failover/run.sh`:
- Around line 46-47: The two consumer invocation lines pass shell variables
unquoted causing possible word-splitting/globbing; update the calls to use
quoted expansions for $WORK_DIR and $SINK_URI (and keep existing quoted
"$CUR/conf/$1.toml") so run_storage_consumer is invoked as run_storage_consumer
"$WORK_DIR" "$SINK_URI" "$CUR/conf/$1.toml" "" and run_pulsar_consumer is
invoked as run_pulsar_consumer --upstream-uri "$SINK_URI" --config
"$CUR/conf/$1.toml" to prevent shell expansion issues.
---
Nitpick comments:
In `@tests/integration_tests/maintainer_failover_when_operator/run.sh`:
- Around line 261-263: The SINK_URI parameter is unquoted in the calls to
run_kafka_consumer, run_storage_consumer and run_pulsar_consumer which can break
if $SINK_URI contains spaces; update the three invocations to quote the variable
(e.g., run_kafka_consumer "$work_dir" "$SINK_URI" "$changefeed_config",
run_storage_consumer "$work_dir" "$SINK_URI" "$changefeed_config" "" and
run_pulsar_consumer --upstream-uri "$SINK_URI" --config "$changefeed_config") so
the shell treats the entire URI as a single argument.
In `@tests/integration_tests/split_region/run.sh`:
- Around line 47-49: The config file name pulsar_test.toml is misleading because
it is passed to all consumers; rename the file to a generic name (e.g.,
consumer_config.toml or sink_config.toml) and update all references: change the
file creation/assignment (where WORK_DIR/pulsar_test.toml is created/left empty)
and the calls to run_kafka_consumer, run_storage_consumer, and
run_pulsar_consumer to use WORK_DIR/consumer_config.toml (or your chosen name)
so the intent is clear while preserving the same variable (WORK_DIR) and
function calls.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (35)
tests/integration_tests/_utils/run_pulsar_consumertests/integration_tests/batch_add_table/run.shtests/integration_tests/canal_json_claim_check/run.shtests/integration_tests/canal_json_handle_key_only/run.shtests/integration_tests/cdc/run.shtests/integration_tests/checkpoint_race_ddl_crash/run.shtests/integration_tests/common_1/run.shtests/integration_tests/ddl_for_split_tables_with_failover/run.shtests/integration_tests/ddl_for_split_tables_with_merge_and_split/run.shtests/integration_tests/ddl_for_split_tables_with_random_merge_and_split/run.shtests/integration_tests/ddl_for_split_tables_with_random_move_table/run.shtests/integration_tests/ddl_with_random_move_table/run.shtests/integration_tests/event_filter/run.shtests/integration_tests/fail_over_ddl_mix/run.shtests/integration_tests/fail_over_ddl_mix_with_syncpoint/run.shtests/integration_tests/kafka_column_selector/run.shtests/integration_tests/kafka_simple_basic/run.shtests/integration_tests/kafka_simple_basic_avro/run.shtests/integration_tests/kafka_simple_claim_check/run.shtests/integration_tests/kafka_simple_claim_check_avro/run.shtests/integration_tests/kafka_simple_handle_key_only/run.shtests/integration_tests/kafka_simple_handle_key_only_avro/run.shtests/integration_tests/kv_client_stream_reconnect/run.shtests/integration_tests/maintainer_failover_when_operator/run.shtests/integration_tests/merge_table/run.shtests/integration_tests/mq_sink_dispatcher/run.shtests/integration_tests/multi_changefeeds/run.shtests/integration_tests/multi_tables_ddl/run.shtests/integration_tests/multi_topics/run.shtests/integration_tests/open_protocol_claim_check/run.shtests/integration_tests/open_protocol_handle_key_only/run.shtests/integration_tests/random_drop_message/run.shtests/integration_tests/region_merge/run.shtests/integration_tests/split_region/run.shtests/integration_tests/tiflash/run.sh
|
|
||
| # output debug log to allow us to check the consumer's behavior when it encounters errors | ||
| cdc_pulsar_consumer --log-file $workdir/cdc_pulsar_consumer$log_suffix.log --log-level debug --downstream-uri ${downstream_uri} "${consumer_config_args[@]}" "$@" >>$workdir/cdc_pulsar_consumer_stdout$log_suffix.log 2>&1 & | ||
| cdc_pulsar_consumer --log-file $workdir/cdc_pulsar_consumer$log_suffix.log --log-level debug --downstream-uri ${downstream_uri} "$@" >>$workdir/cdc_pulsar_consumer_stdout$log_suffix.log 2>&1 & |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Checking run_pulsar_consumer call sites without --config..."
rg -n --type=sh '\brun_pulsar_consumer\b' tests/integration_tests | rg -v -- '--config' || true
echo
echo "Expected: no output above. Any listed line is a call site missing --config."Repository: pingcap/ticdc
Length of output: 6279
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check the run_pulsar_consumer wrapper
echo "=== Wrapper script content ==="
cat tests/integration_tests/_utils/run_pulsar_consumer
echo
echo "=== Example call site ==="
head -50 tests/integration_tests/simple/run.sh | tail -20
echo
echo "=== Check if any config files exist ==="
fd -t f 'pulsar.*config\|replica.*config' tests/integration_tests | head -20Repository: pingcap/ticdc
Length of output: 1617
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Searching for cdc_pulsar_consumer config handling ==="
rg -A 5 'config' tests/integration_tests/_utils/run_pulsar_consumer || echo "No config in wrapper"
echo
echo "=== Check if cdc_pulsar_consumer has default config ==="
which cdc_pulsar_consumer 2>/dev/null || echo "Not in PATH (expected)"
echo
echo "=== Looking for config usage in test scripts ==="
rg '\-\-config' tests/integration_tests/*/run.sh | head -5 || echo "No --config found in test scripts"
echo
echo "=== Summary of the change ==="
echo "Call sites found (without --config): $(rg -c '\brun_pulsar_consumer\b' tests/integration_tests/*/run.sh || echo 0)"
echo "Call sites with --config: $(rg -c '\brun_pulsar_consumer.*--config' tests/integration_tests/*/run.sh || echo 0)"Repository: pingcap/ticdc
Length of output: 7137
Update 56+ run_pulsar_consumer call sites to include --config flag.
After removing implicit Pulsar config handling from the wrapper, 56 test files are calling run_pulsar_consumer without --config, while only ~28 have been updated with it. The wrapper now passes all arguments through via "$@", so without explicit --config, tests will silently use incorrect or missing configurations. Only ~28 test files (e.g., tiflash/run.sh) have been updated to include --config=$WORK_DIR/.... All remaining call sites across tests/integration_tests/*/run.sh must be updated.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/integration_tests/_utils/run_pulsar_consumer` at line 16, The
run_pulsar_consumer wrapper now forwards all args via "$@" so every invocation
of run_pulsar_consumer in the integration test run.sh scripts must be updated to
pass an explicit --config flag; find all call sites of run_pulsar_consumer and
add --config=$WORK_DIR/<pulsar_config_file> (matching the pattern used in the
updated examples) before other args so the consumer gets the intended Pulsar
config, mirroring the change made in the already-updated example invocation.
| storage) run_storage_consumer $WORK_DIR $SINK_URI "$CUR/conf/$1.toml" "" ;; | ||
| pulsar) run_pulsar_consumer --upstream-uri $SINK_URI --config "$CUR/conf/$1.toml" ;; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "run.sh" -path "*/ddl_for_split_tables_with_failover/*" | head -5Repository: pingcap/ticdc
Length of output: 124
🏁 Script executed:
cat -n ./tests/integration_tests/ddl_for_split_tables_with_failover/run.shRepository: pingcap/ticdc
Length of output: 7473
Quote $SINK_URI and $WORK_DIR to prevent shell expansion issues.
Lines 46-47 pass $SINK_URI unquoted (and $WORK_DIR on line 46 unquoted). Unquoted variable expansions can be affected by word-splitting and globbing, potentially causing test flakiness in edge environments.
Suggested fix
- storage) run_storage_consumer $WORK_DIR $SINK_URI "$CUR/conf/$1.toml" "" ;;
- pulsar) run_pulsar_consumer --upstream-uri $SINK_URI --config "$CUR/conf/$1.toml" ;;
+ storage) run_storage_consumer "$WORK_DIR" "$SINK_URI" "$CUR/conf/$1.toml" "" ;;
+ pulsar) run_pulsar_consumer --upstream-uri "$SINK_URI" --config "$CUR/conf/$1.toml" ;;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| storage) run_storage_consumer $WORK_DIR $SINK_URI "$CUR/conf/$1.toml" "" ;; | |
| pulsar) run_pulsar_consumer --upstream-uri $SINK_URI --config "$CUR/conf/$1.toml" ;; | |
| storage) run_storage_consumer "$WORK_DIR" "$SINK_URI" "$CUR/conf/$1.toml" "" ;; | |
| pulsar) run_pulsar_consumer --upstream-uri "$SINK_URI" --config "$CUR/conf/$1.toml" ;; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/integration_tests/ddl_for_split_tables_with_failover/run.sh` around
lines 46 - 47, The two consumer invocation lines pass shell variables unquoted
causing possible word-splitting/globbing; update the calls to use quoted
expansions for $WORK_DIR and $SINK_URI (and keep existing quoted
"$CUR/conf/$1.toml") so run_storage_consumer is invoked as run_storage_consumer
"$WORK_DIR" "$SINK_URI" "$CUR/conf/$1.toml" "" and run_pulsar_consumer is
invoked as run_pulsar_consumer --upstream-uri "$SINK_URI" --config
"$CUR/conf/$1.toml" to prevent shell expansion issues.
|
/retest |
What problem does this PR solve?
Issue Number: ref #442 ref #4130
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit