Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListe
final Map<String, AdvertisedListener> result = new LinkedHashMap<>();
final Map<String, Set<String>> reverseMappings = new LinkedHashMap<>();
for (final Map.Entry<String, List<String>> entry : listeners.entrySet()) {
if (entry.getValue().size() > 2) {
if (entry.getValue().size() > 4) {
throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey()
+ "`");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public interface LoadManager {
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;

default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
Optional<ServiceUnitId> topic, ServiceUnitId bundle, String advertisedListenerName) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public boolean isCentralized() {

@Override
public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
Optional<ServiceUnitId> topic, ServiceUnitId bundle, String advertisedListenerName) {
return loadManager.assign(topic, bundle)
.thenApply(lookupData -> lookupData.map(BrokerLookupData::toLookupResult));
.thenApply(lookupData -> lookupData.map(ld -> ld.toLookupResult(advertisedListenerName)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ public long getStartTimestamp() {
return this.startTimestamp;
}

public LookupResult toLookupResult() {
return new LookupResult(webServiceUrl, webServiceUrlTls, pulsarServiceUrl, pulsarServiceUrlTls,
LookupResult.Type.BrokerUrl, false);
public LookupResult toLookupResult(String advertisedListenerName) {
return LookupResult.create(this, advertisedListenerName, false);
}

public NamespaceEphemeralData toNamespaceEphemeralData() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;

@Slf4j
public class RedirectManager {
Expand Down Expand Up @@ -75,7 +74,7 @@ public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookup
});
}

public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync() {
public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(String advertisedListenerName) {
String currentLMClassName = pulsar.getConfiguration().getLoadManagerClassName();
boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log);
return getAvailableBrokerLookupDataAsync().thenApply(lookupDataMap -> {
Expand All @@ -84,7 +83,7 @@ public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync()
log.warn(errorMsg);
throw new IllegalStateException(errorMsg);
}
AtomicReference<ServiceLookupData> latestServiceLookupData = new AtomicReference<>();
AtomicReference<BrokerLookupData> latestServiceLookupData = new AtomicReference<>();
AtomicLong lastStartTimestamp = new AtomicLong(0L);
lookupDataMap.forEach((key, value) -> {
if (lastStartTimestamp.get() <= value.getStartTimestamp()) {
Expand All @@ -106,20 +105,15 @@ public CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync()
return Optional.empty();
}
var serviceLookupDataObj = latestServiceLookupData.get();
var candidateBrokers = new ArrayList<ServiceLookupData>();
var candidateBrokers = new ArrayList<BrokerLookupData>();
lookupDataMap.forEach((key, value) -> {
if (Objects.equals(value.getLoadManagerClassName(), serviceLookupDataObj.getLoadManagerClassName())) {
candidateBrokers.add(value);
}
});
var selectedBroker = candidateBrokers.get((int) (Math.random() * candidateBrokers.size()));

return Optional.of(new LookupResult(selectedBroker.getWebServiceUrl(),
selectedBroker.getWebServiceUrlTls(),
selectedBroker.getPulsarServiceUrl(),
selectedBroker.getPulsarServiceUrlTls(),
true));
return Optional.of(LookupResult.create(selectedBroker, advertisedListenerName, true));
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.pulsar.broker.lookup;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;

/**
* Represent a lookup result.
Expand Down Expand Up @@ -67,6 +70,62 @@ public LookupResult(NamespaceEphemeralData namespaceEphemeralData, String native
namespaceEphemeralData.getHttpUrlTls());
}

public static LookupResult create(BrokerLookupData selectedBroker, String advertisedListenerName,
boolean authoritativeRedirect) {
String httpUrl = selectedBroker.getWebServiceUrl();
String httpUrlTls = selectedBroker.getWebServiceUrlTls();
String brokerServiceUrl = selectedBroker.getPulsarServiceUrl();
String brokerServiceUrlTls = selectedBroker.getPulsarServiceUrlTls();

if (StringUtils.isNotBlank(advertisedListenerName)) {
var advertisedListener = selectedBroker.advertisedListeners().get(advertisedListenerName);
if (advertisedListener != null) {
if (advertisedListener.getBrokerHttpUrl() != null) {
httpUrl = advertisedListener.getBrokerHttpUrl().toString();
}
if (advertisedListener.getBrokerHttpsUrl() != null) {
httpUrlTls = advertisedListener.getBrokerHttpsUrl().toString();
}
if (advertisedListener.getBrokerServiceUrl() != null) {
brokerServiceUrl = advertisedListener.getBrokerServiceUrl().toString();
}
if (advertisedListener.getBrokerServiceUrlTls() != null) {
brokerServiceUrlTls = advertisedListener.getBrokerServiceUrlTls().toString();
}
}
}

return new LookupResult(httpUrl, httpUrlTls, brokerServiceUrl, brokerServiceUrlTls, authoritativeRedirect);
}

public static LookupResult create(LocalBrokerData selectedBroker, String advertisedListenerName,
boolean authoritativeRedirect) {
String httpUrl = selectedBroker.getWebServiceUrl();
String httpUrlTls = selectedBroker.getWebServiceUrlTls();
String brokerServiceUrl = selectedBroker.getPulsarServiceUrl();
String brokerServiceUrlTls = selectedBroker.getPulsarServiceUrlTls();

if (StringUtils.isNotBlank(advertisedListenerName)) {
var advertisedListener = selectedBroker.getAdvertisedListeners().get(advertisedListenerName);
if (advertisedListener != null) {
if (advertisedListener.getBrokerHttpUrl() != null) {
httpUrl = advertisedListener.getBrokerHttpUrl().toString();
}
if (advertisedListener.getBrokerHttpsUrl() != null) {
httpUrlTls = advertisedListener.getBrokerHttpsUrl().toString();
}
if (advertisedListener.getBrokerServiceUrl() != null) {
brokerServiceUrl = advertisedListener.getBrokerServiceUrl().toString();
}
if (advertisedListener.getBrokerServiceUrlTls() != null) {
brokerServiceUrlTls = advertisedListener.getBrokerServiceUrlTls().toString();
}
}
}

return new LookupResult(httpUrl, httpUrlTls, brokerServiceUrl, brokerServiceUrlTls, authoritativeRedirect);
}

public boolean isBrokerUrl() {
return type == Type.BrokerUrl;
}
Expand Down
Loading