Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
private MessageTransformer transformer;
private MemoryUsage producerWindow;

protected long deliveryDelay = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deliveryDelay is not used in send().

It means that it's not effective.

To be effective, the delay needs to be applied during message dispatch (so either passing it through to the session send, or setting AMQ_SCHEDULED_DELAY on the message).

Also ActiveMQProducer.setDeliveryDelay() (for JMS 2.0 JMSProducer) is not updated and still throws UnsupportedOperationException.

I think it's better to have the PR atomic and consistent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need really protected here ?
As we have getter/setter access, we can go with private and using mock in the test, right ?


protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
super(session);
this.info = new ProducerInfo(producerId);
Expand Down Expand Up @@ -219,6 +221,25 @@ protected void checkClosed() throws IllegalStateException {
* @see jakarta.jms.Session#createProducer
* @since 1.1
*/

@Override
public void setDeliveryDelay(long deliveryDelay) throws JMSException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than defining here, why not updated ActiveMQMessageProducerSupport ?

checkClosed();

// Jakarta 3.1 Compliance Guard
if (deliveryDelay < 0 && session.connection.isStrictCompliance()) {
throw new jakarta.jms.MessageFormatException("Delivery delay cannot be negative when strictCompliance is enabled.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong imho.

Jakarta Messaging 3.1 spec (MessageProducer.setDeliveryDelay) doesn't mandate MessageFormatException for invalid arguments.

MessageFormatException is for message body/property type conversion issues (like in setBody).
An IllegalArgumentException or JMSException (generic) would be more appropriate.
The JMS Spec says nothing about negative delays being illegal, the behavior is undefined and it's up to us to decide the validation logic.

}

this.deliveryDelay = deliveryDelay;
}

@Override
public long getDeliveryDelay() throws JMSException {
checkClosed();
return this.deliveryDelay;
}

@Override
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
this.send(destination, message, deliveryMode, priority, timeToLive, (AsyncCallback)null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;

import jakarta.jms.Connection;
import jakarta.jms.MessageProducer;
import org.junit.Test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.activemq.command.DataStructureTestSupport.assertEquals;
import static org.junit.Assert.fail;

public class ActiveMQMessageProducerTest {

private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageProducerTest.class);

@Test
public void testStrictComplianceDeliveryDelay() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

// Strict Mode ON (Should block negative values)
factory.setStrictCompliance(true);
try (Connection conn = factory.createConnection()) {
MessageProducer producer = conn.createSession(false, 1).createProducer(null);
try {
producer.setDeliveryDelay(-100);
fail("Should have thrown MessageFormatException");
} catch (jakarta.jms.MessageFormatException e) {
LOG.debug("Caught expected strict compliance exception");
}
}

// Strict Mode OFF (Should allow negative values (Legacy behavior))
factory.setStrictCompliance(false);
try (Connection conn = factory.createConnection()) {
MessageProducer producer = conn.createSession(false, 1).createProducer(null);
producer.setDeliveryDelay(-100);
assertEquals(-100, producer.getDeliveryDelay());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,17 @@ public void testSessionSharedDurableConsumerSelector() throws JMSException {
session.createSharedDurableConsumer(session.createTopic("test"), null, null);
}

@Test(expected = UnsupportedOperationException.class)
@Test // REMOVED (expected = UnsupportedOperationException.class)
public void testProducerDeliveryDelayGet() throws JMSException {
messageProducer.getDeliveryDelay();
// Assert that the default is 0
assertEquals(0L, messageProducer.getDeliveryDelay());
}

@Test(expected = UnsupportedOperationException.class)
@Test // REMOVED (expected = UnsupportedOperationException.class)
public void testProducerDeliveryDelaySet() throws JMSException {
messageProducer.setDeliveryDelay(1000l);
messageProducer.setDeliveryDelay(1000L);
// Verify the value was actually stored
assertEquals(1000L, messageProducer.getDeliveryDelay());
}

@Test(expected = UnsupportedOperationException.class)
Expand Down
Loading