Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
ResourceUnit represents any machine/unit which has resources that broker can use to serve its service units
*/
public interface ResourceUnit extends Comparable<ResourceUnit> {

String PROPERTY_KEY_BROKER_ZNODE_NAME = "__advertised_addr";

String getResourceId();

ResourceDescription getAvailableResource();

boolean canFit(ResourceDescription resourceDescription);

Object getProperty(String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -65,8 +66,12 @@ public LoadManagerReport generateLoadReport() {
@Override
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
return leastLoadedBroker.map(s -> new SimpleResourceUnit(getBrokerWebServiceUrl(s),
new PulsarResourceDescription()));
return leastLoadedBroker.map(s -> {
String webServiceUrl = getBrokerWebServiceUrl(s);
String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl);
return new SimpleResourceUnit(webServiceUrl,
new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
});
}

private String getBrokerWebServiceUrl(String broker) {
Expand All @@ -78,6 +83,11 @@ private String getBrokerWebServiceUrl(String broker) {
return String.format("http://%s", broker);
}

private String getBrokerZnodeName(String broker, String webServiceUrl) {
String scheme = webServiceUrl.substring(0, webServiceUrl.indexOf("://"));
return String.format("%s://%s", scheme, broker);
}

@Override
public List<Metrics> getLoadBalancingMetrics() {
return loadManager.getLoadBalancingMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,32 @@
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.base.MoreObjects;
import java.util.Collections;
import java.util.Map;
import org.apache.pulsar.broker.loadbalance.ResourceDescription;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;

public class SimpleResourceUnit implements ResourceUnit {

private String resourceId;
private ResourceDescription resourceDescription;
private final String resourceId;
private final ResourceDescription resourceDescription;

private final Map<String, Object> properties;

public SimpleResourceUnit(String resourceId, ResourceDescription resourceDescription) {
this.resourceId = resourceId;
this.resourceDescription = resourceDescription;
this.properties = Collections.emptyMap();
}

public SimpleResourceUnit(String resourceId, ResourceDescription resourceDescription,
Map<String, Object> properties) {
this.resourceId = resourceId;
this.resourceDescription = resourceDescription;
this.properties = properties == null ? Collections.emptyMap() : properties;
}


@Override
public String getResourceId() {
// TODO Auto-generated method stub
Expand All @@ -50,6 +63,11 @@ public boolean canFit(ResourceDescription resourceDescription) {
return this.resourceDescription.compareTo(resourceDescription) > 0;
}

@Override
public Object getProperty(String key) {
return properties.get(key);
}

@Override
public int compareTo(ResourceUnit o) {
return resourceId.compareTo(o.getResourceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -457,6 +458,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
return;
}
String candidateBroker = null;
String candidateBrokerAdvertisedAddr = null;

LeaderElectionService les = pulsar.getLeaderElectionService();
if (les == null) {
Expand Down Expand Up @@ -517,15 +519,16 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}
}
if (makeLoadManagerDecisionOnThisBroker) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
Optional<Pair<String, String>> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
LOG.warn("Load manager didn't return any available broker. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
bundle);
lookupFuture.complete(Optional.empty());
return;
}
candidateBroker = availableBroker.get();
candidateBroker = availableBroker.get().getLeft();
candidateBrokerAdvertisedAddr = availableBroker.get().getRight();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
Expand Down Expand Up @@ -596,7 +599,8 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}

// Now setting the redirect url
createLookupResult(candidateBroker, authoritativeRedirect, options.getAdvertisedListenerName())
createLookupResult(candidateBrokerAdvertisedAddr == null ? candidateBroker
Comment thread
BewareMyPower marked this conversation as resolved.
: candidateBrokerAdvertisedAddr, authoritativeRedirect, options.getAdvertisedListenerName())
.thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))
.exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
Expand Down Expand Up @@ -691,20 +695,22 @@ private Set<String> getAvailableBrokers() {
* @return
* @throws Exception
*/
private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
private Optional<Pair<String, String>> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
Optional<ResourceUnit> leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
if (!leastLoadedBroker.isPresent()) {
LOG.warn("No broker is available for {}", serviceUnit);
return Optional.empty();
}

String lookupAddress = leastLoadedBroker.get().getResourceId();
String advertisedAddr = (String) leastLoadedBroker.get()
.getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
pulsar.getSafeWebServiceAddress(),
lookupAddress);
}
return Optional.of(lookupAddress);
return Optional.of(Pair.of(lookupAddress, advertisedAddr));
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.loadbalance;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import java.net.URI;
import java.util.Optional;
import lombok.Cleanup;
Expand Down Expand Up @@ -69,14 +71,14 @@ private void updateConfig(ServiceConfiguration conf, String advertisedAddress) {
int httpsPort = PortManager.nextFreePort();

// Use invalid domain name as identifier and instead make sure the advertised listeners work as intended
this.conf.setAdvertisedAddress(advertisedAddress);
this.conf.setAdvertisedListeners(
conf.setAdvertisedAddress(advertisedAddress);
conf.setAdvertisedListeners(
"public:pulsar://localhost:" + pulsarPort +
",public_http:http://localhost:" + httpPort +
",public_https:https://localhost:" + httpsPort);
this.conf.setBrokerServicePort(Optional.of(pulsarPort));
this.conf.setWebServicePort(Optional.of(httpPort));
this.conf.setWebServicePortTls(Optional.of(httpsPort));
conf.setBrokerServicePort(Optional.of(pulsarPort));
conf.setWebServicePort(Optional.of(httpPort));
conf.setWebServicePortTls(Optional.of(httpsPort));
}

@Test
Expand All @@ -85,6 +87,7 @@ public void testLookup() throws Exception {
new HttpGet(pulsar.getWebServiceAddress() + "/lookup/v2/topic/persistent/public/default/my-topic");
request.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
request.addHeader(HttpHeaders.ACCEPT, "application/json");
final String topic = "my-topic";

@Cleanup
CloseableHttpClient httpClient = HttpClients.createDefault();
Expand All @@ -104,14 +107,15 @@ public void testLookup() throws Exception {
// Produce data
@Cleanup
Producer<String> p = pulsarClient.newProducer(Schema.STRING)
.topic("my-topic")
.topic(topic)
.create();

p.send("hello");

// Verify we can get the correct HTTP redirect to the advertised listener
for (PulsarAdmin a : getAllAdmins()) {
TopicStats s = a.topics().getStats("my-topic");
TopicStats s = a.topics().getStats(topic);
assertNotNull(a.lookups().lookupTopic(topic));
assertEquals(s.getPublishers().size(), 1);
}
}
Expand Down