Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/content.zh/docs/deployment/metric_reporters.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,21 @@ PrometheusPushGatewayReporter 发送器将运行指标发送给 [Pushgateway](ht

更多使用方法可查看 [Prometheus 的文档](https://prometheus.io/docs/practices/pushing/)

#### HTTP Basic 认证

该发送器支持通过 HTTP Basic 认证连接到需要安全验证的 PushGateway 实例。要启用认证功能,需要同时配置 `username` 和 `password`:

```yaml
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.hostUrl: https://pushgateway.example.com:9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.username: flink-reporter
metrics.reporter.promgateway.password: ${PUSHGATEWAY_PASSWORD}
metrics.reporter.promgateway.interval: 60 SECONDS
```

<span class="label label-info">注意</span> 只有同时配置了 `username` 和 `password` 时,Basic 认证才会启用。建议在启用认证时使用 HTTPS 以保护传输中的凭据安全。

<a name="statsd"></a>

### StatsD
Expand Down
15 changes: 15 additions & 0 deletions docs/content/docs/deployment/metric_reporters.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,21 @@ The PrometheusPushGatewayReporter pushes metrics to a [Pushgateway](https://gith

Please see the [Prometheus documentation](https://prometheus.io/docs/practices/pushing/) for use-cases.

#### HTTP Basic Authentication

The reporter supports HTTP Basic Authentication for connecting to secured PushGateway instances. To enable authentication, configure both `username` and `password`:

```yaml
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.hostUrl: https://pushgateway.example.com:9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.username: flink-reporter
metrics.reporter.promgateway.password: ${PUSHGATEWAY_PASSWORD}
metrics.reporter.promgateway.interval: 60 SECONDS
```

<span class="label label-info">Note</span> Basic authentication is enabled only when both `username` and `password` are configured. It is recommended to use HTTPS when authentication is enabled to protect credentials in transit.

### StatsD
#### (org.apache.flink.metrics.statsd.StatsDReporter)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,23 @@
<td>String</td>
<td>The job name under which metrics will be pushed</td>
</tr>
<tr>
<td><h5>metrics.reporter.prometheus.password</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>(Optional) The password for HTTP Basic Authentication with the PushGateway.</td>
</tr>
<tr>
<td><h5>metrics.reporter.prometheus.randomJobNameSuffix</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Specifies whether a random suffix should be appended to the job name.</td>
</tr>
<tr>
<td><h5>metrics.reporter.prometheus.username</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>(Optional) The username for HTTP Basic Authentication with the PushGateway.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,23 @@
<td>String</td>
<td>推送运行指标数据时的作业名。</td>
</tr>
<tr>
<td><h5>metrics.reporter.prometheus.password</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>(可选)用于 PushGateway HTTP Basic 认证的密码。</td>
</tr>
<tr>
<td><h5>metrics.reporter.prometheus.randomJobNameSuffix</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>是否在作业名后添加一个随机后缀。</td>
</tr>
<tr>
<td><h5>metrics.reporter.prometheus.username</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>(可选)用于 PushGateway HTTP Basic 认证的用户名。</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.util.Preconditions;

import io.prometheus.client.exporter.BasicAuthHttpConnectionFactory;
import io.prometheus.client.exporter.PushGateway;

import javax.annotation.Nullable;

import java.io.IOException;
import java.net.URL;
import java.util.Map;
Expand All @@ -42,14 +45,24 @@ public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter im
private final Map<String, String> groupingKey;
private final boolean deleteOnShutdown;
@VisibleForTesting final URL hostUrl;
@VisibleForTesting final boolean basicAuthEnabled;

PrometheusPushGatewayReporter(
URL hostUrl,
String jobName,
Map<String, String> groupingKey,
final boolean deleteOnShutdown) {
final boolean deleteOnShutdown,
@Nullable String username,
@Nullable String password) {
this.hostUrl = hostUrl;
this.pushGateway = new PushGateway(hostUrl);
if (username != null && password != null) {
this.pushGateway.setConnectionFactory(
new BasicAuthHttpConnectionFactory(username, password));
this.basicAuthEnabled = true;
} else {
this.basicAuthEnabled = false;
}
this.jobName = Preconditions.checkNotNull(jobName);
this.groupingKey = Preconditions.checkNotNull(groupingKey);
this.deleteOnShutdown = deleteOnShutdown;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.GROUPING_KEY;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST_URL;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.JOB_NAME;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PASSWORD;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.RANDOM_JOB_NAME_SUFFIX;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.USERNAME;

/** {@link MetricReporterFactory} for {@link PrometheusPushGatewayReporter}. */
public class PrometheusPushGatewayReporterFactory implements MetricReporterFactory {
Expand Down Expand Up @@ -70,17 +72,36 @@ public PrometheusPushGatewayReporter createMetricReporter(Properties properties)
jobName = configuredJobName + new AbstractID();
}

LOG.info(
"Configured PrometheusPushGatewayReporter with {hostUrl:{}, jobName:{}, randomJobNameSuffix:{}, deleteOnShutdown:{}, groupingKey:{}}",
hostUrl,
jobName,
randomSuffix,
deleteOnShutdown,
groupingKey);
String username = metricConfig.getString(USERNAME.key(), USERNAME.defaultValue());
String password = metricConfig.getString(PASSWORD.key(), PASSWORD.defaultValue());

if ((username != null && password == null) || (username == null && password != null)) {
LOG.warn(
"Both username and password must be configured to enable HTTP Basic Authentication. "
+ "Currently only {} is configured, Basic Auth will be disabled.",
username != null ? "username" : "password");
}

try {
return new PrometheusPushGatewayReporter(
new URL(hostUrl), jobName, groupingKey, deleteOnShutdown);
PrometheusPushGatewayReporter reporter =
new PrometheusPushGatewayReporter(
new URL(hostUrl),
jobName,
groupingKey,
deleteOnShutdown,
username,
password);

LOG.info(
"Configured PrometheusPushGatewayReporter with {hostUrl:{}, jobName:{}, randomJobNameSuffix:{}, deleteOnShutdown:{}, groupingKey:{}, basicAuth:{}}",
hostUrl,
jobName,
randomSuffix,
deleteOnShutdown,
groupingKey,
reporter.basicAuthEnabled);

return reporter;
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,18 @@ public class PrometheusPushGatewayReporterOptions {
"https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels",
"Prometheus requirements"))
.build());

public static final ConfigOption<String> USERNAME =
Comment thread
Myracle marked this conversation as resolved.
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription(
"(Optional) The username for HTTP Basic Authentication with the PushGateway.");

public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription(
"(Optional) The password for HTTP Basic Authentication with the PushGateway.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,27 @@
package org.apache.flink.metrics.prometheus;

import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.event.Level;

import java.util.Map;

import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST_URL;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PASSWORD;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.USERNAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link PrometheusPushGatewayReporter}. */
class PrometheusPushGatewayReporterTest {

@RegisterExtension
private final LoggerAuditingExtension loggerExtension =
new LoggerAuditingExtension(PrometheusPushGatewayReporterFactory.class, Level.WARN);

@Test
void testParseGroupingKey() {
Map<String, String> groupingKey =
Expand Down Expand Up @@ -68,4 +77,66 @@ void testConnectToPushGatewayThrowsExceptionWithoutHostInformation() {
assertThatThrownBy(() -> factory.createMetricReporter(metricConfig))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void testBasicAuthNotEnabledWithoutCredentials() {
PrometheusPushGatewayReporterFactory factory = new PrometheusPushGatewayReporterFactory();
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty(HOST_URL.key(), "http://localhost:9091");
// No authentication configured - should create reporter successfully without Basic Auth
PrometheusPushGatewayReporter reporter = factory.createMetricReporter(metricConfig);
assertThat(reporter).isNotNull();
assertThat(reporter.hostUrl.toString()).isEqualTo("http://localhost:9091");
assertThat(reporter.basicAuthEnabled).isFalse();
}

@Test
void testBasicAuthNotEnabledWithOnlyUsername() {
PrometheusPushGatewayReporterFactory factory = new PrometheusPushGatewayReporterFactory();
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty(HOST_URL.key(), "http://localhost:9091");
metricConfig.setProperty(USERNAME.key(), "flink-user");
// Only username configured - should create reporter successfully without Basic Auth
PrometheusPushGatewayReporter reporter = factory.createMetricReporter(metricConfig);
assertThat(reporter).isNotNull();
assertThat(reporter.basicAuthEnabled).isFalse();
// Verify warning log is emitted
assertThat(loggerExtension.getMessages())
.anyMatch(
msg ->
msg.contains("Both username and password must be configured")
&& msg.contains("Currently only username is configured"));
}

@Test
void testBasicAuthNotEnabledWithOnlyPassword() {
PrometheusPushGatewayReporterFactory factory = new PrometheusPushGatewayReporterFactory();
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty(HOST_URL.key(), "http://localhost:9091");
metricConfig.setProperty(PASSWORD.key(), "flink-password");
// Only password configured - should create reporter successfully without Basic Auth
PrometheusPushGatewayReporter reporter = factory.createMetricReporter(metricConfig);
assertThat(reporter).isNotNull();
assertThat(reporter.basicAuthEnabled).isFalse();
// Verify warning log is emitted
assertThat(loggerExtension.getMessages())
.anyMatch(
msg ->
msg.contains("Both username and password must be configured")
&& msg.contains("Currently only password is configured"));
}

@Test
void testBasicAuthEnabledWithBothCredentials() {
PrometheusPushGatewayReporterFactory factory = new PrometheusPushGatewayReporterFactory();
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty(HOST_URL.key(), "http://localhost:9091");
metricConfig.setProperty(USERNAME.key(), "flink-user");
metricConfig.setProperty(PASSWORD.key(), "flink-password");
// Both username and password configured - Basic Auth should be enabled
PrometheusPushGatewayReporter reporter = factory.createMetricReporter(metricConfig);
Copy link
Copy Markdown
Contributor

@rionmonster rionmonster Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add another assertion to this test to verify that authentication was actually enabled. We may want to consider following a pattern similar to hostUrl to expose it for testing (e.g. @VisibleForTesting package-private boolean field), something like basicAuthEnabled and then we could simply check that property here:

assertThat(reporter.basicAuthEnabled).isTrue();

Either that or rely on a reflection based approach to inspect the underlying connection factory.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I've added a @VisibleForTesting boolean basicAuthEnabled field following the same pattern as hostUrl. Now all four basic auth test cases verify this property:
testBasicAuthNotEnabledWithoutCredentials - asserts basicAuthEnabled is false
testBasicAuthNotEnabledWithOnlyUsername - asserts basicAuthEnabled is false
testBasicAuthNotEnabledWithOnlyPassword - asserts basicAuthEnabled is false
testBasicAuthEnabledWithBothCredentials - asserts basicAuthEnabled is true

assertThat(reporter).isNotNull();
assertThat(reporter.hostUrl.toString()).isEqualTo("http://localhost:9091");
assertThat(reporter.basicAuthEnabled).isTrue();
}
}