Skip to content

Conversation

@zuston
Copy link
Member

@zuston zuston commented Jan 6, 2026

Purpose

Linked issue: close #2299

Brief change log

Tests

API and Format

Documentation

@zuston
Copy link
Member Author

zuston commented Jan 19, 2026

Could you help review this? @wuchong @luoyuxia

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @zuston , I left some comments in the PR.

Besides, could you also update the metric configuration page[1] and the metric report page [2]?

[1] https://fluss.apache.org/docs/next/maintenance/configuration/#metrics
[2] https://fluss.apache.org/docs/next/maintenance/observability/metric-reporters/#prometheus

* Base class for Prometheus metric reporters. Contains common logic for metric registration and
* collector management.
*/
public abstract class AbstractPrometheusReporter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implements MetricReporter

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
* @param config the configuration
*/
public void open(Configuration config) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @Override to all the implemented interface methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/** {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus. */
public class PrometheusReporter implements MetricReporter {
/** {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus HTTP server. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add qualifier for the Metric class.

Comment on lines 48 to 65
private PushGateway pushGateway;
private String jobName;
private String groupingKey;
private boolean deleteOnShutdown;
private boolean filterLabelValueCharacters;

@Override
public void open(Configuration config) {
String hostUrl = config.getString(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_HOST_URL);
this.jobName = config.getString(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_JOB_NAME);
this.deleteOnShutdown =
config.getBoolean(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_DELETE_ON_SHUTDOWN);
this.filterLabelValueCharacters =
config.getBoolean(
METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_FILTER_LABEL_VALUE_CHARACTERS);

if (config.getBoolean(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX)) {
this.jobName = jobName + "_" + RANDOM.nextLong();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you follow Flink PrometheusPushGatewayReporterFactory to initialize the PrometheusPushGatewayReporter in the PrometheusPushGatewayReporterPlugin#createMetricReporter? This can make all the member variables immutable (better for JVM). And we can print an information for the registered reporter in the PrometheusPushGatewayReporterPlugin#createMetricReporter.

LOG.info(
                "Configured PrometheusPushGatewayReporter with {hostUrl:{}, jobName:{}, randomJobNameSuffix:{}, deleteOnShutdown:{}, groupingKey:{}}",
                hostUrl,
                jobName,
                randomSuffix,
                deleteOnShutdown,
                groupingKey);

Comment on lines 1806 to 1808
.defaultValue(false)
.withDescription(
"Whether to delete metrics from PushGateway on shutdown.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink sets the config to true by default, why do we use false?

Besides, maybe we can add a note like Flink for this config? Fluss will try its best to delete the metrics but this is not guaranteed.

.noDefaultValue()
.withDescription(
"The grouping key to use when pushing metrics to Prometheus PushGateway. "
+ "The format should be k1=v1;k2=v2.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX =
key("metrics.reporter.prometheus-pushgateway.randomJobNameSuffix")
.booleanType()
.defaultValue(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not true like Flink?


@Override
public Duration scheduleInterval() {
return Duration.ofSeconds(60);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this configureable? Most cases, 60s is too late. In Flink, the default interval is 10s.

Comment on lines 59 to 62
config.getBoolean(METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_DELETE_ON_SHUTDOWN);
this.filterLabelValueCharacters =
config.getBoolean(
METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_FILTER_LABEL_VALUE_CHARACTERS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t see a strong use case for this configuration, as it’s very difficult to guarantee that all characters in table, database, or user names will conform to an allowlist—especially in real-world environments with diverse naming conventions.

Can we remove this config for now? Doing so would simplify the implementation and avoid introducing an overloaded notifyOfAddedMetric method (e.g., with an extra filterCharacters boolean) and the corresponding override in PrometheusPushGatewayReporter.

If users later express a concrete need for character filtering, we can always reintroduce this capability with a well-scoped design at that time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let me delete this.

// ConfigOptions for prometheus push gateway reporter
// ------------------------------------------------------------------------
public static final ConfigOption<String> METRICS_REPORTER_PROMETHEUS_PUSHGATEWAY_HOST_URL =
key("metrics.reporter.prometheus-pushgateway.hostUrl")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration key is not aligned with Fluss format, I suggest to change them into

metrics.reporter.prometheus-push.url
metrics.reporter.prometheus-push.job-name
metrics.reporter.prometheus-push.random-job-name-suffix
metrics.reporter.prometheus-push.delete-on-shutdown
metrics.reporter.prometheus-push.filter-label-value-characters
metrics.reporter.prometheus-push.grouping-key

Use prometheus-push in the key as it is the identifier of metric.reporters=prometheus-push

@zuston
Copy link
Member Author

zuston commented Jan 28, 2026

Updated. Could you help review again? @wuchong

@zuston zuston requested a review from wuchong January 28, 2026 05:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce the prometheusPushGatewayReporter

2 participants