-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Expand file tree
/
Copy pathActiveMQConnection.java
More file actions
2684 lines (2403 loc) · 105 KB
/
ActiveMQConnection.java
File metadata and controls
2684 lines (2403 loc) · 105 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionConsumer;
import jakarta.jms.ConnectionMetaData;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.IllegalStateException;
import jakarta.jms.InvalidDestinationException;
import jakarta.jms.JMSException;
import jakarta.jms.Queue;
import jakarta.jms.QueueConnection;
import jakarta.jms.QueueSession;
import jakarta.jms.ServerSessionPool;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import jakarta.jms.TopicSession;
import jakarta.jms.XAConnection;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.management.JMSConnectionStatsImpl;
import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.state.CommandVisitorAdapter;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.RequestTimedOutIOException;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
public static int DEFAULT_THREAD_POOL_SIZE = 1000;
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
public final ConcurrentMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<>();
protected boolean dispatchAsync=true;
protected boolean alwaysSessionAsync = true;
private TaskRunnerFactory sessionTaskRunner;
private final ThreadPoolExecutor executor;
// Connection state variables
private final ConnectionInfo info;
private ExceptionListener exceptionListener;
private ClientInternalExceptionListener clientInternalExceptionListener;
private boolean clientIDSet;
private boolean isConnectionInfoSentToBroker;
private boolean userSpecifiedClientID;
// Configuration options variables
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
private BlobTransferPolicy blobTransferPolicy;
private RedeliveryPolicyMap redeliveryPolicyMap;
private MessageTransformer transformer;
private boolean disableTimeStampsByDefault;
private boolean optimizedMessageDispatch = true;
private boolean copyMessageOnSend = true;
private boolean useCompression;
private boolean objectMessageSerializationDefered;
private boolean useAsyncSend;
private boolean optimizeAcknowledge;
private long optimizeAcknowledgeTimeOut = 0;
private long optimizedAckScheduledAckInterval = 0;
private boolean nestedMapAndListEnabled = true;
private boolean useRetroactiveConsumer;
private boolean exclusiveConsumer;
private boolean alwaysSyncSend;
private int closeTimeout = 15000;
private boolean watchTopicAdvisories = true;
private long warnAboutUnstartedConnectionTimeout = 500L;
private int sendTimeout =0;
private boolean sendAcksAsync=true;
private boolean checkForDuplicates = true;
private boolean queueOnlyConnection = false;
private boolean consumerExpiryCheckEnabled = true;
private final Transport transport;
private final IdGenerator clientIdGenerator;
private final JMSStatsImpl factoryStats;
private final JMSConnectionStatsImpl stats;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean closing = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean transportFailed = new AtomicBoolean(false);
private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<>();
// Maps ConsumerIds to ActiveMQConsumer objects
private final ConcurrentMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<>();
private final ConcurrentMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<>();
private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
private final SessionId connectionSessionId;
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
private AdvisoryConsumer advisoryConsumer;
private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
private BrokerInfo brokerInfo;
private IOException firstFailureError;
private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
// Assume that protocol is the latest. Change to the actual protocol
// version when a WireFormatInfo is received.
private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private final AtomicLong maxFrameSize = new AtomicLong(Long.MAX_VALUE);
private final long timeCreated;
private final ConnectionAudit connectionAudit = new ConnectionAudit();
private DestinationSource destinationSource;
private final Object ensureConnectionInfoSentMutex = new Object();
private boolean useDedicatedTaskRunner;
private boolean useVirtualThreadTaskRunner;
protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
private long consumerFailoverRedeliveryWaitPeriod;
private volatile Scheduler scheduler;
private final Object schedulerLock = new Object();
private boolean messagePrioritySupported = false;
private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false;
private boolean rmIdFromConnectionId = false;
private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
private RejectedExecutionHandler rejectedTaskHandler = null;
private List<String> trustedPackages = new ArrayList<>();
private boolean trustAllPackages = false;
private int connectResponseTimeout;
/**
* Construct an <code>ActiveMQConnection</code>
*
* @param transport
* @param factoryStats
* @throws Exception
*/
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
this.transport = transport;
this.clientIdGenerator = clientIdGenerator;
this.factoryStats = factoryStats;
// Configure a single threaded executor who's core thread can timeout if
// idle
executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
//Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
//thread.setDaemon(true);
return thread;
}
});
// asyncConnectionThread.allowCoreThreadTimeOut(true);
String uniqueId = connectionIdGenerator.generateId();
this.info = new ConnectionInfo(new ConnectionId(uniqueId));
this.info.setManageable(true);
this.info.setFaultTolerant(transport.isFaultTolerant());
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
this.transport.setTransportListener(this);
this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
}
protected void setUserName(String userName) {
this.info.setUserName(userName);
}
protected void setPassword(String password) {
this.info.setPassword(password);
}
/**
* A static helper method to create a new connection
*
* @return an ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
return (ActiveMQConnection)factory.createConnection();
}
/**
* A static helper method to create a new connection
*
* @param uri
* @return and ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
return (ActiveMQConnection)factory.createConnection();
}
/**
* A static helper method to create a new connection
*
* @param user
* @param password
* @param uri
* @return an ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
return (ActiveMQConnection)factory.createConnection();
}
/**
* @return a number unique for this connection
*/
public JMSConnectionStatsImpl getConnectionStats() {
return stats;
}
/**
* Creates a <CODE>Session</CODE> object.
*
* @throws JMSException if the <CODE>Connection</CODE> object fails to
* create a session due to some internal error or lack of
* support for the specific transaction and acknowledgement
* mode.
* @since 2.0
*/
@Override
public Session createSession() throws JMSException {
return createSession(false, Session.AUTO_ACKNOWLEDGE);
}
/**
* Creates a <CODE>Session</CODE> object.
*
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
* session is transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created session
* @throws JMSException if the <CODE>Connection</CODE> object fails to
* create a session due to some internal error or lack of
* support for the specific transaction and acknowledgement
* mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
* @since 2.0
*/
@Override
public Session createSession(int acknowledgeMode) throws JMSException {
return createSession(acknowledgeMode == Session.SESSION_TRANSACTED, acknowledgeMode);
}
/**
* Creates a <CODE>Session</CODE> object.
*
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
* session is transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created session
* @throws JMSException if the <CODE>Connection</CODE> object fails to
* create a session due to some internal error or lack of
* support for the specific transaction and acknowledgement
* mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
* @since 1.1
*/
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
if (!transacted) {
if (acknowledgeMode == Session.SESSION_TRANSACTED) {
throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
} else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
"Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
}
}
return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : acknowledgeMode, isDispatchAsync(), isAlwaysSessionAsync());
}
/**
* @return sessionId
*/
protected SessionId getNextSessionId() {
return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
}
/**
* Gets the client identifier for this connection.
* <P>
* This value is specific to the JMS provider. It is either preconfigured by
* an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
* dynamically by the application by calling the <code>setClientID</code>
* method.
*
* @return the unique client identifier
* @throws JMSException if the JMS provider fails to return the client ID
* for this connection due to some internal error.
*/
@Override
public String getClientID() throws JMSException {
checkClosedOrFailed();
return this.info.getClientId();
}
/**
* Sets the client identifier for this connection.
* <P>
* The preferred way to assign a JMS client's client identifier is for it to
* be configured in a client-specific <CODE>ConnectionFactory</CODE>
* object and transparently assigned to the <CODE>Connection</CODE> object
* it creates.
* <P>
* Alternatively, a client can set a connection's client identifier using a
* provider-specific value. The facility to set a connection's client
* identifier explicitly is not a mechanism for overriding the identifier
* that has been administratively configured. It is provided for the case
* where no administratively specified identifier exists. If one does exist,
* an attempt to change it by setting it must throw an
* <CODE>IllegalStateException</CODE>. If a client sets the client
* identifier explicitly, it must do so immediately after it creates the
* connection and before any other action on the connection is taken. After
* this point, setting the client identifier is a programming error that
* should throw an <CODE>IllegalStateException</CODE>.
* <P>
* The purpose of the client identifier is to associate a connection and its
* objects with a state maintained on behalf of the client by a provider.
* The only such state identified by the JMS API is that required to support
* durable subscriptions.
* <P>
* If another connection with the same <code>clientID</code> is already
* running when this method is called, the JMS provider should detect the
* duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
*
* @param newClientID the unique client identifier
* @throws JMSException if the JMS provider fails to set the client ID for
* this connection due to some internal error.
* @throws jakarta.jms.InvalidClientIDException if the JMS client specifies an
* invalid or duplicate client ID.
* @throws jakarta.jms.IllegalStateException if the JMS client attempts to set
* a connection's client ID at the wrong time or when it has
* been administratively configured.
*/
@Override
public void setClientID(String newClientID) throws JMSException {
checkClosedOrFailed();
if (this.clientIDSet) {
throw new IllegalStateException("The clientID has already been set");
}
if (this.isConnectionInfoSentToBroker) {
throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
}
this.info.setClientId(newClientID);
this.userSpecifiedClientID = true;
ensureConnectionInfoSent();
}
/**
* Sets the default client id that the connection will use if explicitly not
* set with the setClientId() call.
*/
public void setDefaultClientID(String clientID) throws JMSException {
this.info.setClientId(clientID);
this.userSpecifiedClientID = true;
}
/**
* Gets the metadata for this connection.
*
* @return the connection metadata
* @throws JMSException if the JMS provider fails to get the connection
* metadata for this connection.
* @see jakarta.jms.ConnectionMetaData
*/
@Override
public ConnectionMetaData getMetaData() throws JMSException {
checkClosedOrFailed();
return ActiveMQConnectionMetaData.INSTANCE;
}
/**
* Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
* every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
* associated with it.
*
* @return the <CODE>ExceptionListener</CODE> for this connection, or
* null, if no <CODE>ExceptionListener</CODE> is associated with
* this connection.
* @throws JMSException if the JMS provider fails to get the
* <CODE>ExceptionListener</CODE> for this connection.
* @see jakarta.jms.Connection#setExceptionListener(ExceptionListener)
*/
@Override
public ExceptionListener getExceptionListener() throws JMSException {
checkClosedOrFailed();
return this.exceptionListener;
}
/**
* Sets an exception listener for this connection.
* <P>
* If a JMS provider detects a serious problem with a connection, it informs
* the connection's <CODE> ExceptionListener</CODE>, if one has been
* registered. It does this by calling the listener's <CODE>onException
* </CODE>
* method, passing it a <CODE>JMSException</CODE> object describing the
* problem.
* <P>
* An exception listener allows a client to be notified of a problem
* asynchronously. Some connections only consume messages, so they would
* have no other way to learn their connection has failed.
* <P>
* A connection serializes execution of its <CODE>ExceptionListener</CODE>.
* <P>
* A JMS provider should attempt to resolve connection problems itself
* before it notifies the client of them.
*
* @param listener the exception listener
* @throws JMSException if the JMS provider fails to set the exception
* listener for this connection.
*/
@Override
public void setExceptionListener(ExceptionListener listener) throws JMSException {
checkClosedOrFailed();
this.exceptionListener = listener;
}
/**
* Gets the <code>ClientInternalExceptionListener</code> object for this connection.
* Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
* associated with it.
*
* @return the listener or <code>null</code> if no listener is registered with the connection.
*/
public ClientInternalExceptionListener getClientInternalExceptionListener() {
return clientInternalExceptionListener;
}
/**
* Sets a client internal exception listener for this connection.
* The connection will notify the listener, if one has been registered, of exceptions thrown by container components
* (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
* It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
* describing the problem.
*
* @param listener the exception listener
*/
public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
this.clientInternalExceptionListener = listener;
}
/**
* Starts (or restarts) a connection's delivery of incoming messages. A call
* to <CODE>start</CODE> on a connection that has already been started is
* ignored.
*
* @throws JMSException if the JMS provider fails to start message delivery
* due to some internal error.
* @see jakarta.jms.Connection#stop()
*/
@Override
public void start() throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
if (started.compareAndSet(false, true)) {
for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
ActiveMQSession session = i.next();
session.start();
}
}
}
/**
* Temporarily stops a connection's delivery of incoming messages. Delivery
* can be restarted using the connection's <CODE>start</CODE> method. When
* the connection is stopped, delivery to all the connection's message
* consumers is inhibited: synchronous receives block, and messages are not
* delivered to message listeners.
* <P>
* This call blocks until receives and/or message listeners in progress have
* completed.
* <P>
* Stopping a connection has no effect on its ability to send messages. A
* call to <CODE>stop</CODE> on a connection that has already been stopped
* is ignored.
* <P>
* A call to <CODE>stop</CODE> must not return until delivery of messages
* has paused. This means that a client can rely on the fact that none of
* its message listeners will be called and that all threads of control
* waiting for <CODE>receive</CODE> calls to return will not return with a
* message until the connection is restarted. The receive timers for a
* stopped connection continue to advance, so receives may time out while
* the connection is stopped.
* <P>
* If message listeners are running when <CODE>stop</CODE> is invoked, the
* <CODE>stop</CODE> call must wait until all of them have returned before
* it may return. While these message listeners are completing, they must
* have the full services of the connection available to them.
*
* @throws JMSException if the JMS provider fails to stop message delivery
* due to some internal error.
* @see jakarta.jms.Connection#start()
*/
@Override
public void stop() throws JMSException {
for (final ActiveMQSession session : sessions) {
session.checkNotInCompletionListenerCallback("stop");
session.checkNotInMessageListenerCallback("stop");
}
doStop(true);
}
/**
* @see #stop()
* @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed,
* <tt>false</tt> to skip this check
* @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
*/
void doStop(boolean checkClosed) throws JMSException {
if (checkClosed) {
checkClosedOrFailed();
}
if (started.compareAndSet(true, false)) {
synchronized(sessions) {
for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.stop();
}
}
}
}
/**
* Closes the connection.
* <P>
* Since a provider typically allocates significant resources outside the
* JVM on behalf of a connection, clients should close these resources when
* they are not needed. Relying on garbage collection to eventually reclaim
* these resources may not be timely enough.
* <P>
* There is no need to close the sessions, producers, and consumers of a
* closed connection.
* <P>
* Closing a connection causes all temporary destinations to be deleted.
* <P>
* When this method is invoked, it should not return until message
* processing has been shut down in an orderly fashion. This means that all
* message listeners that may have been running have returned, and that all
* pending receives have returned. A close terminates all pending message
* receives on the connection's sessions' consumers. The receives may return
* with a message or with null, depending on whether there was a message
* available at the time of the close. If one or more of the connection's
* sessions' message listeners is processing a message at the time when
* connection <CODE>close</CODE> is invoked, all the facilities of the
* connection and its sessions must remain available to those listeners
* until they return control to the JMS provider.
* <P>
* Closing a connection causes any of its sessions' transactions in progress
* to be rolled back. In the case where a session's work is coordinated by
* an external transaction manager, a session's <CODE>commit</CODE> and
* <CODE> rollback</CODE> methods are not used and the result of a closed
* session's work is determined later by the transaction manager. Closing a
* connection does NOT force an acknowledgment of client-acknowledged
* sessions.
* <P>
* Invoking the <CODE>acknowledge</CODE> method of a received message from
* a closed connection's session must throw an
* <CODE>IllegalStateException</CODE>. Closing a closed connection must
* NOT throw an exception.
*
* @throws JMSException if the JMS provider fails to close the connection
* due to some internal error. For example, a failure to
* release resources or to close a socket connection can
* cause this exception to be thrown.
*/
@Override
public void close() throws JMSException {
for (final ActiveMQSession session : sessions) {
session.checkNotInCompletionListenerCallback("close");
session.checkNotInMessageListenerCallback("close");
}
try {
// If we were running, lets stop first.
if (!closed.get() && !transportFailed.get()) {
// do not fail if already closed as according to JMS spec we must not
// throw exception if already closed
doStop(false);
}
synchronized (this) {
if (!closed.get()) {
closing.set(true);
if (destinationSource != null) {
destinationSource.stop();
destinationSource = null;
}
if (advisoryConsumer != null) {
advisoryConsumer.dispose();
advisoryConsumer = null;
}
Scheduler scheduler = this.scheduler;
if (scheduler != null) {
try {
scheduler.stop();
} catch (Exception e) {
JMSException ex = JMSExceptionSupport.create(e);
throw ex;
}
}
long lastDeliveredSequenceId = -1;
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.dispose();
lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
}
for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = i.next();
c.dispose();
}
this.activeTempDestinations.clear();
try {
if (isConnectionInfoSentToBroker) {
// If we announced ourselves to the broker.. Try to let the broker
// know that the connection is being shutdown.
RemoveInfo removeCommand = info.createRemoveCommand();
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
try {
syncSendPacket(removeCommand, closeTimeout);
} catch (JMSException e) {
if (e.getCause() instanceof RequestTimedOutIOException) {
// expected
} else {
throw e;
}
}
doAsyncSendPacket(new ShutdownInfo());
}
} finally { // release anyway even if previous communication fails
started.set(false);
// TODO if we move the TaskRunnerFactory to the connection
// factory
// then we may need to call
// factory.onConnectionClose(this);
if (sessionTaskRunner != null) {
sessionTaskRunner.shutdown();
}
closed.set(true);
closing.set(false);
}
}
}
} finally {
try {
if (executor != null) {
ThreadPoolUtils.shutdown(executor);
}
} catch (Throwable e) {
LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
}
ServiceSupport.dispose(this.transport);
factoryStats.removeConnection(this);
}
}
/**
* Tells the broker to terminate its VM. This can be used to cleanly
* terminate a broker running in a standalone java process. Server must have
* property enable.vm.shutdown=true defined to allow this to work.
*/
// TODO : org.apache.activemq.message.BrokerAdminCommand not yet
// implemented.
/*
* public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
* command = new BrokerAdminCommand();
* command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
* asyncSendPacket(command); }
*/
/**
* Create a durable connection consumer for this connection (optional
* operation). This is an expert facility not used by regular JMS clients.
*
* @param topic topic to access
* @param subscriptionName durable subscription name
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
* empty string indicates that there is no message selector
* for the message consumer.
* @param sessionPool the server session pool to associate with this durable
* connection consumer
* @param maxMessages the maximum number of messages that can be assigned to
* a server session at one time
* @return the durable connection consumer
* @throws JMSException if the <CODE>Connection</CODE> object fails to
* create a connection consumer due to some internal error
* or invalid arguments for <CODE>sessionPool</CODE> and
* <CODE>messageSelector</CODE>.
* @throws jakarta.jms.InvalidDestinationException if an invalid destination
* is specified.
* @throws jakarta.jms.InvalidSelectorException if the message selector is
* invalid.
* @see jakarta.jms.ConnectionConsumer
* @since 1.1
*/
@Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
throws JMSException {
return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
}
/**
* Create a durable connection consumer for this connection (optional
* operation). This is an expert facility not used by regular JMS clients.
*
* @param topic topic to access
* @param subscriptionName durable subscription name
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
* empty string indicates that there is no message selector
* for the message consumer.
* @param sessionPool the server session pool to associate with this durable
* connection consumer
* @param maxMessages the maximum number of messages that can be assigned to
* a server session at one time
* @param noLocal set true if you want to filter out messages published
* locally
* @return the durable connection consumer
* @throws JMSException if the <CODE>Connection</CODE> object fails to
* create a connection consumer due to some internal error
* or invalid arguments for <CODE>sessionPool</CODE> and
* <CODE>messageSelector</CODE>.
* @throws jakarta.jms.InvalidDestinationException if an invalid destination
* is specified.
* @throws jakarta.jms.InvalidSelectorException if the message selector is
* invalid.
* @see jakarta.jms.ConnectionConsumer
* @since 1.1
*/
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
boolean noLocal) throws JMSException {
checkClosedOrFailed();
if (queueOnlyConnection) {
throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
}
ensureConnectionInfoSent();
SessionId sessionId = new SessionId(info.getConnectionId(), -1);
ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
info.setSubscriptionName(subscriptionName);
info.setSelector(messageSelector);
info.setPrefetchSize(maxMessages);
info.setDispatchAsync(isDispatchAsync());
// Allows the options on the destination to configure the consumerInfo
if (info.getDestination().getOptions() != null) {
Map<String, String> options = new HashMap<>(info.getDestination().getOptions());
IntrospectionSupport.setProperties(this.info, options, "consumer.");
}
return new ActiveMQConnectionConsumer(this, sessionPool, info);
}
/**
*
* @see jakarta.jms.ConnectionConsumer
* @since 2.0
*/
@Override
public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
throw new UnsupportedOperationException("createSharedConnectionConsumer() is not supported");
}
/**
*
* @see jakarta.jms.ConnectionConsumer
* @since 2.0
*/
@Override
public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
throw new UnsupportedOperationException("createSharedConnectionConsumer() is not supported");
}
// Properties
// -------------------------------------------------------------------------
/**
* Returns true if this connection has been started
*
* @return true if this Connection is started
*/
public boolean isStarted() {
return started.get();
}
/**
* Returns true if the connection is closed
*/
public boolean isClosed() {
return closed.get();
}
/**
* Returns true if the connection is in the process of being closed
*/
public boolean isClosing() {
return closing.get();
}
/**
* Returns true if the underlying transport has failed
*/
public boolean isTransportFailed() {
return transportFailed.get();
}
/**
* @return Returns the prefetchPolicy.
*/
public ActiveMQPrefetchPolicy getPrefetchPolicy() {
return prefetchPolicy;
}
/**
* Sets the <a
* href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
* policy</a> for consumers created by this connection.
*/
public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
this.prefetchPolicy = prefetchPolicy;
}
/**
*/
public Transport getTransportChannel() {
return transport;
}
/**
* @return Returns the clientID of the connection, forcing one to be
* generated if one has not yet been configured.
*/
public String getInitializedClientID() throws JMSException {
ensureConnectionInfoSent();
return info.getClientId();
}
/**
* @return Returns the timeStampsDisableByDefault.
*/
public boolean isDisableTimeStampsByDefault() {
return disableTimeStampsByDefault;
}
/**
* Sets whether or not timestamps on messages should be disabled or not. If
* you disable them it adds a small performance boost.
*/
public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
this.disableTimeStampsByDefault = timeStampsDisableByDefault;
}
/**
* @return Returns the dispatchOptimizedMessage.
*/
public boolean isOptimizedMessageDispatch() {
return optimizedMessageDispatch;
}
/**
* If this flag is set then an larger prefetch limit is used - only
* applicable for durable topic subscribers.
*/
public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
this.optimizedMessageDispatch = dispatchOptimizedMessage;
}
/**
* @return Returns the closeTimeout.
*/
public int getCloseTimeout() {
return closeTimeout;
}