Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
Expand Down Expand Up @@ -51,7 +52,11 @@ public static Process startServer(String artemisInstance, String serverName, int
}

public static Process startServer(String artemisInstance, String serverName, int id, int timeout, File brokerProperties) throws Exception {
final Process process = internalStartServer(artemisInstance, serverName, brokerProperties);
return startServer(artemisInstance, serverName, id, timeout, brokerProperties, null);
}

public static Process startServer(String artemisInstance, String serverName, int id, int timeout, File brokerProperties, Consumer<String> logCallback) throws Exception {
final Process process = internalStartServer(artemisInstance, serverName, brokerProperties, logCallback);

// wait for start
if (timeout > 0) {
Expand All @@ -66,7 +71,11 @@ public static Process startServer(String artemisInstance, String serverName, Str
}

public static Process startServer(String artemisInstance, String serverName, String uri, int timeout, File propertiesFile) throws Exception {
final Process process = internalStartServer(artemisInstance, serverName, propertiesFile);
return startServer(artemisInstance, serverName, uri, timeout, propertiesFile, null);
}

public static Process startServer(String artemisInstance, String serverName, String uri, int timeout, File propertiesFile, Consumer<String> logCallback) throws Exception {
final Process process = internalStartServer(artemisInstance, serverName, propertiesFile, logCallback);

// wait for start
if (timeout != 0) {
Expand All @@ -78,20 +87,30 @@ public static Process startServer(String artemisInstance, String serverName, Str

private static Process internalStartServer(String artemisInstance,
String serverName) throws IOException, ClassNotFoundException {
return internalStartServer(artemisInstance, serverName, null);
return internalStartServer(artemisInstance, serverName, null, null);
}
private static Process internalStartServer(String artemisInstance,
String serverName,
File propertiesFile) throws IOException, ClassNotFoundException {
return internalStartServer(artemisInstance, serverName, propertiesFile, null);
}
private static Process internalStartServer(String artemisInstance,
String serverName,
File propertiesFile,
Consumer<String> logCallback) throws IOException, ClassNotFoundException {

if (propertiesFile != null) {
return execute(artemisInstance, serverName, "run", "--properties", propertiesFile.getAbsolutePath());
return execute(artemisInstance, serverName, logCallback, "run", "--properties", propertiesFile.getAbsolutePath());
} else {
return execute(artemisInstance, serverName, "run");
return execute(artemisInstance, serverName, logCallback, "run");
}
}

public static Process execute(String artemisInstance, String jobName, String...args) throws IOException, ClassNotFoundException {
return execute(artemisInstance, jobName, null, args);
}

public static Process execute(String artemisInstance, String jobName, Consumer<String> logCallback, String...args) throws IOException, ClassNotFoundException {
try {
boolean IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win");

Expand All @@ -117,11 +136,11 @@ public static Process execute(String artemisInstance, String jobName, String...a
final Process process = builder.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> process.destroy()));

ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), jobName, false);
ProcessLogger outputLogger = new ProcessLogger(logCallback == null, process.getInputStream(), jobName, false, logCallback);
outputLogger.start();

// Adding a reader to System.err, so the VM won't hang on a System.err.println
ProcessLogger errorLogger = new ProcessLogger(true, process.getErrorStream(), jobName, true);
ProcessLogger errorLogger = new ProcessLogger(logCallback == null, process.getErrorStream(), jobName, true, logCallback);
errorLogger.start();
return process;
} catch (IOException e) {
Expand Down Expand Up @@ -215,14 +234,18 @@ static class ProcessLogger extends Thread {

private final boolean sendToErr;

private final Consumer<String> logCallback;

ProcessLogger(final boolean print,
final InputStream is,
final String logName,
final boolean sendToErr) throws ClassNotFoundException {
final boolean sendToErr,
final Consumer<String> logCallback) throws ClassNotFoundException {
this.is = is;
this.print = print;
this.logName = logName;
this.sendToErr = sendToErr;
this.logCallback = logCallback;
setDaemon(false);
}

Expand All @@ -240,6 +263,9 @@ public void run() {
System.out.println(logName + "-out:" + line);
}
}
if (logCallback != null) {
logCallback.accept((sendToErr ? logName + "-err:" : logName + "-out:") + line);
}
}
} catch (IOException e) {
// ok, stream closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ public void rebuildPageCounters() throws Exception {
simpleManagementVoid("broker", "rebuildPageCounters");
}

public long getAddressSize(String address) throws Exception {
return simpleManagementLong(ResourceNames.ADDRESS + address, "getAddressSize");
}

public long getMessageCountOnAddress(String address) throws Exception {
return simpleManagementLong(ResourceNames.ADDRESS + address, "getMessageCount");
}

/**
* Simple helper for management returning a string.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public Message getMessage() {

private boolean reencoded = false;

private int applicationPropertiesSize;

/**
* AMQPLargeMessagePersister will save the buffer here.
*/
Expand Down Expand Up @@ -264,7 +266,9 @@ protected void readSavedEncoding(ByteBuf buf) {
applicationPropertiesPosition = buf.readInt();
remainingBodyPosition = buf.readInt();

int applicationPropertiesInitialPosition = buf.readerIndex();
applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject();
this.applicationPropertiesSize = buf.readerIndex() - applicationPropertiesInitialPosition;

if (properties != null && properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) {
if (!expirationReload) {
Expand Down Expand Up @@ -412,6 +416,16 @@ private void genericParseLargeMessage() {
}
}

@Override
protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) {
applicationProperties = super.readApplicationProperties(data, position);
if (applicationProperties != null) {
this.applicationPropertiesSize = data.position() - position;
}
return applicationProperties;
}


protected void parseLargeMessage(ReadableBuffer data) {
MessageDataScanningStatus status = getDataScanningStatus();
if (status == MessageDataScanningStatus.NOT_SCANNED) {
Expand Down Expand Up @@ -604,8 +618,7 @@ public long getWholeMessageSize() {
@Override
public synchronized int getMemoryEstimate() {
if (memoryEstimate == -1) {
memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0);
originalEstimate = memoryEstimate;
memoryEstimate = AMQP_OFFSET + (extraProperties != null ? extraProperties.getEncodeSize() : 0) + applicationPropertiesSize * 4;
}
return memoryEstimate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.MessageReference;
Expand Down Expand Up @@ -119,6 +118,12 @@
*/
public abstract class AMQPMessage extends RefCountMessage implements org.apache.activemq.artemis.api.core.Message {

// The basic (minimal) size an AMQP message uses.
// This is an estimate, and it's based on the following test:
// By running AMQPGlobalMaxTest::testSendUntilOME, you look at the initial memory used by the broker without any messages.
// By the time you get the OME, you can do some bare calculations on how much each message uses and get an AVG.
public static final int AMQP_OFFSET = 1300;

private static final SimpleString ANNOTATION_AREA_PREFIX = SimpleString.of("m.");

protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
Expand Down Expand Up @@ -146,7 +151,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
* developing purposes.
*/
public enum MessageDataScanningStatus {
NOT_SCANNED(0), RELOAD_PERSISTENCE(1), SCANNED(2);
NOT_SCANNED(0), SCANNED(1);

private static final MessageDataScanningStatus[] STATES;

Expand Down Expand Up @@ -205,7 +210,6 @@ private static void checkCode(int code) {
protected long messageID;
protected SimpleString address;
protected volatile int memoryEstimate = -1;
protected volatile int originalEstimate = -1;
protected long expiration;
protected boolean expirationReload = false;
protected long scheduledTime = -1;
Expand Down Expand Up @@ -546,36 +550,27 @@ protected ApplicationProperties lazyDecodeApplicationProperties() {
// need to synchronize access to lazyDecodeApplicationProperties to avoid clashes with getMemoryEstimate
protected synchronized ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
if (owner != null && memoryEstimate != -1) {
// the memory has already been tracked and needs to be updated to reflect the new decoding
int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data);

// it is difficult to track the updates for paged messages
// for that reason we won't do it if paged
// we also only do the update if the message was previously routed
// so if a debug method or an interceptor changed the size before routing we would get a different size
if (!isPaged && routed) {
((PagingStore) owner).addSize(addition, false);
final int updatedEstimate = memoryEstimate + addition;
memoryEstimate = updatedEstimate;
}
}
readApplicationProperties(data, applicationPropertiesPosition);
}

return applicationProperties;
}

protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) {
applicationProperties = scanForMessageSection(data, position, ApplicationProperties.class);
return applicationProperties;
}

protected int unmarshalledApplicationPropertiesMemoryEstimateFromData(ReadableBuffer data) {
if (applicationProperties != null) {
// they have been unmarshalled, estimate memory usage based on their encoded size
if (remainingBodyPosition != VALUE_NOT_PRESENT) {
return remainingBodyPosition - applicationPropertiesPosition;
} else {
return data.capacity() - applicationPropertiesPosition;
}
// no need to rescan if it's from RELOAD_PERSISTENCE
ensureScanning();

// they have been unmarshalled, estimate memory usage based on their encoded size
if (remainingBodyPosition != VALUE_NOT_PRESENT) {
return remainingBodyPosition - applicationPropertiesPosition;
} else {
return data.capacity() - applicationPropertiesPosition;
}
return 0;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -661,9 +656,6 @@ protected synchronized void ensureMessageDataScanned() {
case NOT_SCANNED:
scanMessageData();
break;
case RELOAD_PERSISTENCE:
lazyScanAfterReloadPersistence();
break;
case SCANNED:
// NO-OP
break;
Expand All @@ -686,7 +678,6 @@ protected synchronized void resetMessageData() {
priority = DEFAULT_MESSAGE_PRIORITY;
encodedHeaderSize = 0;
memoryEstimate = -1;
originalEstimate = -1;
scheduledTime = -1;
encodedDeliveryAnnotationsSize = 0;
headerPosition = VALUE_NOT_PRESENT;
Expand Down Expand Up @@ -885,12 +876,8 @@ public final void receiveBuffer(ByteBuf buffer) {

@Override
public int getOriginalEstimate() {
if (originalEstimate < 0) {
// getMemoryEstimate should initialize originalEstimate
return getMemoryEstimate();
} else {
return originalEstimate;
}
// getMemoryEstimate should initialize originalEstimate
return getMemoryEstimate();
}

@Override
Expand Down Expand Up @@ -1033,13 +1020,9 @@ protected int internalPersistSize() {
public abstract void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools);

protected synchronized void lazyScanAfterReloadPersistence() {
assert messageDataScanned == MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
scanMessageData();
messageDataScanned = MessageDataScanningStatus.SCANNED.code;
modified = false;
// reinitialise memory estimate as message will already be on a queue
// and lazy decode will want to update
getMemoryEstimate();
}

@Override
Expand Down Expand Up @@ -1223,9 +1206,8 @@ public boolean isDurable() {
if (header != null && header .getDurable() != null) {
return header.getDurable();
} else {
// if header == null and scanningStatus=RELOAD_PERSISTENCE, it means the message can only be durable
// even though the parsing hasn't happened yet
return getDataScanningStatus() == MessageDataScanningStatus.RELOAD_PERSISTENCE;
// we will assume it's non persistent if no header
return false;
}
}

Expand Down
Loading