Skip to content

Conversation

@oneby-wang
Copy link
Contributor

Fixes #25083

Motivation

Use client-side looping instead of increasing broker settings to avoid potential HTTP call timeout in "pulsar-admin topics analyze-backlog" cli.

Modifications

Add client-side looping, add test.

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: oneby-wang#21

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 6, 2026
Comment on lines +3076 to +3079
String[] messageIdSplits = mergedResult.getLastMessageId().split(":");
MessageIdImpl nextScanMessageId =
new MessageIdImpl(Long.parseLong(messageIdSplits[0]), Long.parseLong(messageIdSplits[1]) + 1,
partitionIndex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of parsing the message Id String, it would be better to use org.apache.pulsar.client.api.MessageIdAdv interface to get the ledgerId and entryId.

Copy link
Contributor Author

@oneby-wang oneby-wang Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide me more information, I couldn't find one method that could parse ledgerId:entryId string to a MessageIdAdv instance.

Comment on lines +3072 to +3073
print("Analyze backlog progress, scanned entries: " + mergedResult.getEntries()
+ ", scan max entries: " + backlogScanMaxEntries);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to print the current value of mergedResult in json format without linefeeds so that the CLI output can be parsed as NDJSON.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two ObjectMapper in CliCommand, so we should use the one without pretty printer to print json without linefeeds, and then pass the json string to print() method. Is my understanding correct?

private static final ObjectMapper MAPPER = ObjectMapperFactory.create();
private static final ObjectWriter WRITER = MAPPER.writerWithDefaultPrettyPrinter();

<T> void print(T item) {
try {
if (item instanceof String) {
commandSpec.commandLine().getOut().println(item);
} else {
prettyPrint(item);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there would have to be an option to print out without linefeeds (no pretty printing, line feeds escaped).

partitionIndex);
startPosition = Optional.of(nextScanMessageId);
}
print(mergedResult);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output would have to be json format without linefeeds to make the CLI output parseable as NDJSON. The last line in the output would be the final result. Perhaps there could be a command line option to configure whether ndjson should be used since it's not pretty printed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps there could be a command line option to configure whether ndjson should be used since it's not pretty printed.

A little bit confused. Does this option only take effect on the final result, or does it also apply to the intermediate results?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little bit confused. Does this option only take effect on the final result, or does it also apply to the intermediate results?

it would be useful to be able to printout the aggregated result in the loop (in Predicate<AnalyzeSubscriptionBacklogResult> terminatePredicate added in #25127) in ndjson format (json without linefeeds) or json format. The ndjson format would be machine readable so that's the reason to have that option.

}

@Test
public void topicsAnalyzeBacklog() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test without mocks, similar to the tests in #25127 would be useful in addition since it would serve as an integration test.

Copy link
Contributor Author

@oneby-wang oneby-wang Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found similar examples for reference. Could you provide me some examples.

I don't know how to invoke admin CLI in integration tests and how to receive the outputs(since they are printed to the console output).

Copy link
Contributor Author

@oneby-wang oneby-wang Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need to handle the test in integration module like CLITest?

https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java

Is there a better approach? Running the tests in integration module requires local image building.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need to handle the test in integration module like CLITest?

https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java

Is there a better approach? Running the tests in integration module requires local image building.

Yes it's a bit clumbersome.

To build an image quickly, use this script:

./build/build_java_test_image.sh

That requires setting export PULSAR_TEST_IMAGE_NAME=apachepulsar/java-test-image:latest environment variable so that the apachepulsar/java-test-image:latest docker image is used instead of the default apachepulsar/pulsar-test-latest-version:latest which takes much longer to build.
If you are running the test in IntelliJ, you can edit the default value at

For running integration tests on command line, this is the sequence I have used:

# compile integration test dependencies
mvn -am -pl tests/integration -Dcheckstyle.skip=true -Dlicense.skip=true -Dspotbugs.skip=true -DskipTests install
./build/build_java_test_image.sh
export PULSAR_TEST_IMAGE_NAME=apachepulsar/java-test-image:latest
# run the test
mvn -DintegrationTests -pl tests/integration -DtestRetryCount=0 -DredirectTestOutputToFile=false test \
-Dtest=TestClassNameHere

If you are working on the test alone, you obviously don't have to rebuild the image after each change.

@oneby-wang
Copy link
Contributor Author

oneby-wang commented Feb 5, 2026

@lhotari Could help me solve the above questions when you have a moment? Especially about how to write integration tests easily in admin CLI module.

I'll refactor this PR using the API that PR #25127 provided once I'm back.

@lhotari
Copy link
Member

lhotari commented Feb 6, 2026

@oneby-wang #25127 has been merged

@oneby-wang oneby-wang force-pushed the pulsar_cli_client_side_analyze_backlog branch from b48e0eb to 83bdbf1 Compare February 6, 2026 11:56
print(getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition));

AnalyzeSubscriptionBacklogResult mergedResult = null;
while (true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to reuse the new API added in #25127 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll refactor this PR by reusing the new API added in #25127 and resolve the above comments once I'm back.

I'll re-request review once the refactoring is done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs ready-to-test

Projects

None yet

2 participants