JMS 2.0 - How to receive messages from topic with shared consumers?
I am using ActiveMQ Artemis and JMS 2.0 for reading topic messages with shared consumers. I have two questions:
- Is there any way to use configuration with xml format.
- When I set the message listener on the consumer is it mandatory to use a
whileloop? If I don't usewhile (true)loop the program will terminate when topic has no messages.
SharedConsumer.java
public class SharedConsumer {
@Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
ConnectionFactory connectionFactory;
public String maxConnectionForJSON;
public void readFromTopicAndSendToQueue()throws Exception{
Context initialContext = null;
JMSContext jmsContext = null;
int maxConnectionCount = 0;
maxConnectionForJSON = "30";
if (!StringUtils.isBlank(maxConnectionForJSON)){
try{
maxConnectionCount = Integer.parseInt(maxConnectionForJSON);
}catch (Exception e){
//logging
}
}
if (maxConnectionCount != 0) {
try {
List<JMSConsumer> jmsConsumerList = new ArrayList<>();
initialContext = new InitialContext();
Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
jmsContext = cf.createContext("admin", "admin");
for (int i = 0; i < maxConnectionCount; i++){
JMSConsumer jmsConsumer = jmsContext.createSharedDurableConsumer(topic, "ct");
MessageListener listener = new Listener();
jmsConsumer.setMessageListener(listener);
}
while (true) {
Thread.sleep(30000);
}
} catch (Exception e) {
System.err.println(e.getMessage());
} finally {
if (initialContext != null) {
initialContext.close();
}
if (jmsContext != null) {
jmsContext.close();
}
}
}
}
public static void main(final String args) throws Exception {
SharedConsumer sharedConsumer = new SharedConsumer();
sharedConsumer.readFromTopicAndSendToQueue();
}
}
SharedConsumerListener.java
public class Listener implements MessageListener {
public static int count = 0;
@Override
public void onMessage(Message message) {
System.out.println(message.toString() + "ncount :" + count);
count++;
}
}
I could use xml file for reading Queue in JMS 1.1 (ActiveMQ). I thought we could use with a config file like below in JMS 2.0 Artemis but I was wrong. Thank you so much for your help Justin Bertram.
in JMS 1.1 Configuration File
<bean id="brokerUrl" class="java.lang.String">
<constructor-arg value="#{appProperties.queueUrl}"/>
</bean>
<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>
<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="amqConnectionFactory"/>
<property name="maxConnections" value="#{appProperties.maxConnections}"/>
<property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
<property name="maximumActiveSessionPerConnection" value = "10"/>
</bean>
<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory1"/>
</bean>
<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="#{appProperties.queueName}"/>
</bean>
<task:executor id="mainExecutorForJSON" pool-size="#{appProperties.mainExecutorForJSONPoolSize}"
queue-capacity="0" rejection-policy="CALLER_RUNS"/>
<int:channel id="jmsInChannelForJSON" >
<int:dispatcher task-executor="mainExecutorForJSON"/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsInForJSON" destination="jSONNrtQueue" channel="jmsInChannelForJSON"
concurrent-consumers="#{appProperties.concurrentConsumerCountForJSON}" />
<int:service-activator input-channel="jmsInChannelForJSON" ref="dataServiceJMS" />
spring-mvc jms jms-topic activemq-artemis
add a comment |
I am using ActiveMQ Artemis and JMS 2.0 for reading topic messages with shared consumers. I have two questions:
- Is there any way to use configuration with xml format.
- When I set the message listener on the consumer is it mandatory to use a
whileloop? If I don't usewhile (true)loop the program will terminate when topic has no messages.
SharedConsumer.java
public class SharedConsumer {
@Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
ConnectionFactory connectionFactory;
public String maxConnectionForJSON;
public void readFromTopicAndSendToQueue()throws Exception{
Context initialContext = null;
JMSContext jmsContext = null;
int maxConnectionCount = 0;
maxConnectionForJSON = "30";
if (!StringUtils.isBlank(maxConnectionForJSON)){
try{
maxConnectionCount = Integer.parseInt(maxConnectionForJSON);
}catch (Exception e){
//logging
}
}
if (maxConnectionCount != 0) {
try {
List<JMSConsumer> jmsConsumerList = new ArrayList<>();
initialContext = new InitialContext();
Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
jmsContext = cf.createContext("admin", "admin");
for (int i = 0; i < maxConnectionCount; i++){
JMSConsumer jmsConsumer = jmsContext.createSharedDurableConsumer(topic, "ct");
MessageListener listener = new Listener();
jmsConsumer.setMessageListener(listener);
}
while (true) {
Thread.sleep(30000);
}
} catch (Exception e) {
System.err.println(e.getMessage());
} finally {
if (initialContext != null) {
initialContext.close();
}
if (jmsContext != null) {
jmsContext.close();
}
}
}
}
public static void main(final String args) throws Exception {
SharedConsumer sharedConsumer = new SharedConsumer();
sharedConsumer.readFromTopicAndSendToQueue();
}
}
SharedConsumerListener.java
public class Listener implements MessageListener {
public static int count = 0;
@Override
public void onMessage(Message message) {
System.out.println(message.toString() + "ncount :" + count);
count++;
}
}
I could use xml file for reading Queue in JMS 1.1 (ActiveMQ). I thought we could use with a config file like below in JMS 2.0 Artemis but I was wrong. Thank you so much for your help Justin Bertram.
in JMS 1.1 Configuration File
<bean id="brokerUrl" class="java.lang.String">
<constructor-arg value="#{appProperties.queueUrl}"/>
</bean>
<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>
<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="amqConnectionFactory"/>
<property name="maxConnections" value="#{appProperties.maxConnections}"/>
<property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
<property name="maximumActiveSessionPerConnection" value = "10"/>
</bean>
<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory1"/>
</bean>
<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="#{appProperties.queueName}"/>
</bean>
<task:executor id="mainExecutorForJSON" pool-size="#{appProperties.mainExecutorForJSONPoolSize}"
queue-capacity="0" rejection-policy="CALLER_RUNS"/>
<int:channel id="jmsInChannelForJSON" >
<int:dispatcher task-executor="mainExecutorForJSON"/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsInForJSON" destination="jSONNrtQueue" channel="jmsInChannelForJSON"
concurrent-consumers="#{appProperties.concurrentConsumerCountForJSON}" />
<int:service-activator input-channel="jmsInChannelForJSON" ref="dataServiceJMS" />
spring-mvc jms jms-topic activemq-artemis
I don't understand what you want to configure via xml. JMS is just a Java API. Can you clarify that point?
– Justin Bertram
Nov 19 '18 at 16:04
add a comment |
I am using ActiveMQ Artemis and JMS 2.0 for reading topic messages with shared consumers. I have two questions:
- Is there any way to use configuration with xml format.
- When I set the message listener on the consumer is it mandatory to use a
whileloop? If I don't usewhile (true)loop the program will terminate when topic has no messages.
SharedConsumer.java
public class SharedConsumer {
@Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
ConnectionFactory connectionFactory;
public String maxConnectionForJSON;
public void readFromTopicAndSendToQueue()throws Exception{
Context initialContext = null;
JMSContext jmsContext = null;
int maxConnectionCount = 0;
maxConnectionForJSON = "30";
if (!StringUtils.isBlank(maxConnectionForJSON)){
try{
maxConnectionCount = Integer.parseInt(maxConnectionForJSON);
}catch (Exception e){
//logging
}
}
if (maxConnectionCount != 0) {
try {
List<JMSConsumer> jmsConsumerList = new ArrayList<>();
initialContext = new InitialContext();
Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
jmsContext = cf.createContext("admin", "admin");
for (int i = 0; i < maxConnectionCount; i++){
JMSConsumer jmsConsumer = jmsContext.createSharedDurableConsumer(topic, "ct");
MessageListener listener = new Listener();
jmsConsumer.setMessageListener(listener);
}
while (true) {
Thread.sleep(30000);
}
} catch (Exception e) {
System.err.println(e.getMessage());
} finally {
if (initialContext != null) {
initialContext.close();
}
if (jmsContext != null) {
jmsContext.close();
}
}
}
}
public static void main(final String args) throws Exception {
SharedConsumer sharedConsumer = new SharedConsumer();
sharedConsumer.readFromTopicAndSendToQueue();
}
}
SharedConsumerListener.java
public class Listener implements MessageListener {
public static int count = 0;
@Override
public void onMessage(Message message) {
System.out.println(message.toString() + "ncount :" + count);
count++;
}
}
I could use xml file for reading Queue in JMS 1.1 (ActiveMQ). I thought we could use with a config file like below in JMS 2.0 Artemis but I was wrong. Thank you so much for your help Justin Bertram.
in JMS 1.1 Configuration File
<bean id="brokerUrl" class="java.lang.String">
<constructor-arg value="#{appProperties.queueUrl}"/>
</bean>
<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>
<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="amqConnectionFactory"/>
<property name="maxConnections" value="#{appProperties.maxConnections}"/>
<property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
<property name="maximumActiveSessionPerConnection" value = "10"/>
</bean>
<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory1"/>
</bean>
<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="#{appProperties.queueName}"/>
</bean>
<task:executor id="mainExecutorForJSON" pool-size="#{appProperties.mainExecutorForJSONPoolSize}"
queue-capacity="0" rejection-policy="CALLER_RUNS"/>
<int:channel id="jmsInChannelForJSON" >
<int:dispatcher task-executor="mainExecutorForJSON"/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsInForJSON" destination="jSONNrtQueue" channel="jmsInChannelForJSON"
concurrent-consumers="#{appProperties.concurrentConsumerCountForJSON}" />
<int:service-activator input-channel="jmsInChannelForJSON" ref="dataServiceJMS" />
spring-mvc jms jms-topic activemq-artemis
I am using ActiveMQ Artemis and JMS 2.0 for reading topic messages with shared consumers. I have two questions:
- Is there any way to use configuration with xml format.
- When I set the message listener on the consumer is it mandatory to use a
whileloop? If I don't usewhile (true)loop the program will terminate when topic has no messages.
SharedConsumer.java
public class SharedConsumer {
@Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
ConnectionFactory connectionFactory;
public String maxConnectionForJSON;
public void readFromTopicAndSendToQueue()throws Exception{
Context initialContext = null;
JMSContext jmsContext = null;
int maxConnectionCount = 0;
maxConnectionForJSON = "30";
if (!StringUtils.isBlank(maxConnectionForJSON)){
try{
maxConnectionCount = Integer.parseInt(maxConnectionForJSON);
}catch (Exception e){
//logging
}
}
if (maxConnectionCount != 0) {
try {
List<JMSConsumer> jmsConsumerList = new ArrayList<>();
initialContext = new InitialContext();
Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
jmsContext = cf.createContext("admin", "admin");
for (int i = 0; i < maxConnectionCount; i++){
JMSConsumer jmsConsumer = jmsContext.createSharedDurableConsumer(topic, "ct");
MessageListener listener = new Listener();
jmsConsumer.setMessageListener(listener);
}
while (true) {
Thread.sleep(30000);
}
} catch (Exception e) {
System.err.println(e.getMessage());
} finally {
if (initialContext != null) {
initialContext.close();
}
if (jmsContext != null) {
jmsContext.close();
}
}
}
}
public static void main(final String args) throws Exception {
SharedConsumer sharedConsumer = new SharedConsumer();
sharedConsumer.readFromTopicAndSendToQueue();
}
}
SharedConsumerListener.java
public class Listener implements MessageListener {
public static int count = 0;
@Override
public void onMessage(Message message) {
System.out.println(message.toString() + "ncount :" + count);
count++;
}
}
I could use xml file for reading Queue in JMS 1.1 (ActiveMQ). I thought we could use with a config file like below in JMS 2.0 Artemis but I was wrong. Thank you so much for your help Justin Bertram.
in JMS 1.1 Configuration File
<bean id="brokerUrl" class="java.lang.String">
<constructor-arg value="#{appProperties.queueUrl}"/>
</bean>
<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>
<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="amqConnectionFactory"/>
<property name="maxConnections" value="#{appProperties.maxConnections}"/>
<property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
<property name="maximumActiveSessionPerConnection" value = "10"/>
</bean>
<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory1"/>
</bean>
<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="#{appProperties.queueName}"/>
</bean>
<task:executor id="mainExecutorForJSON" pool-size="#{appProperties.mainExecutorForJSONPoolSize}"
queue-capacity="0" rejection-policy="CALLER_RUNS"/>
<int:channel id="jmsInChannelForJSON" >
<int:dispatcher task-executor="mainExecutorForJSON"/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsInForJSON" destination="jSONNrtQueue" channel="jmsInChannelForJSON"
concurrent-consumers="#{appProperties.concurrentConsumerCountForJSON}" />
<int:service-activator input-channel="jmsInChannelForJSON" ref="dataServiceJMS" />
spring-mvc jms jms-topic activemq-artemis
spring-mvc jms jms-topic activemq-artemis
edited Nov 21 '18 at 6:49
S.Balaban
asked Nov 19 '18 at 7:40
S.BalabanS.Balaban
345
345
I don't understand what you want to configure via xml. JMS is just a Java API. Can you clarify that point?
– Justin Bertram
Nov 19 '18 at 16:04
add a comment |
I don't understand what you want to configure via xml. JMS is just a Java API. Can you clarify that point?
– Justin Bertram
Nov 19 '18 at 16:04
I don't understand what you want to configure via xml. JMS is just a Java API. Can you clarify that point?
– Justin Bertram
Nov 19 '18 at 16:04
I don't understand what you want to configure via xml. JMS is just a Java API. Can you clarify that point?
– Justin Bertram
Nov 19 '18 at 16:04
add a comment |
1 Answer
1
active
oldest
votes
In short, yes it is normal to prevent the program from terminating once you set a JMS consumer's message listener.
When you create a JMS consumer and set its message listener the JMS client implementation will create new threads in the background to listen for messages asynchronously from the thread which created the consumer and set the listener. Therefore the thread which creates the consumer and sets the listener will simply carry on. In your case you need to somehow stop the thread from exiting and terminating the application therefore you need the while loop.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53370236%2fjms-2-0-how-to-receive-messages-from-topic-with-shared-consumers%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
In short, yes it is normal to prevent the program from terminating once you set a JMS consumer's message listener.
When you create a JMS consumer and set its message listener the JMS client implementation will create new threads in the background to listen for messages asynchronously from the thread which created the consumer and set the listener. Therefore the thread which creates the consumer and sets the listener will simply carry on. In your case you need to somehow stop the thread from exiting and terminating the application therefore you need the while loop.
add a comment |
In short, yes it is normal to prevent the program from terminating once you set a JMS consumer's message listener.
When you create a JMS consumer and set its message listener the JMS client implementation will create new threads in the background to listen for messages asynchronously from the thread which created the consumer and set the listener. Therefore the thread which creates the consumer and sets the listener will simply carry on. In your case you need to somehow stop the thread from exiting and terminating the application therefore you need the while loop.
add a comment |
In short, yes it is normal to prevent the program from terminating once you set a JMS consumer's message listener.
When you create a JMS consumer and set its message listener the JMS client implementation will create new threads in the background to listen for messages asynchronously from the thread which created the consumer and set the listener. Therefore the thread which creates the consumer and sets the listener will simply carry on. In your case you need to somehow stop the thread from exiting and terminating the application therefore you need the while loop.
In short, yes it is normal to prevent the program from terminating once you set a JMS consumer's message listener.
When you create a JMS consumer and set its message listener the JMS client implementation will create new threads in the background to listen for messages asynchronously from the thread which created the consumer and set the listener. Therefore the thread which creates the consumer and sets the listener will simply carry on. In your case you need to somehow stop the thread from exiting and terminating the application therefore you need the while loop.
answered Nov 19 '18 at 16:10
Justin BertramJustin Bertram
3,2581517
3,2581517
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53370236%2fjms-2-0-how-to-receive-messages-from-topic-with-shared-consumers%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
I don't understand what you want to configure via xml. JMS is just a Java API. Can you clarify that point?
– Justin Bertram
Nov 19 '18 at 16:04