接下来,我们将通过JMS实现消息的发送和接收功能。
我们将创建一个名为“jms-msg”的应用。在这个应用中,我们将模拟消费者、生产者、队列、订阅等功能。为了使应用正常运行,需要在项目中添加以下依赖项:
```xml
我们将使用Apache ActiveMQ和Spring框架来实现JMS。
以下是Spring基于XML的核心配置内容:
```xml
ProducerServiceImpl
的实现```java package com.waylau.spring.jms.queue;
import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator;
public class ProducerServiceImpl implements ProducerService { private JmsTemplate jmsTemplate;
// 向指定队列发送消息
public void sendMessage(Destination destination, final String msg) {
System.out.println("ProducerService向队列" + destination.toString() + "发送了消息:" + msg);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
// 向默认队列发送消息
public void sendMessage(final String msg) {
String destination = jmsTemplate.getDefaultDestination().toString();
System.out.println("ProducerService向队列" + destination + "发送了消息:" + msg);
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
// 向指定队列发送消息并设置响应队列
public void sendMessage(Destination destination, final String msg, final Destination response) {
System.out.println("ProducerService向队列" + destination + "发送了消息:" + msg);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(msg);
textMessage.setJMSReplyTo(response);
return textMessage;
}
});
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
} ```
ConsumerServiceImpl
的实现```java package com.waylau.spring.jms.queue;
import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate;
public class ConsumerServiceImpl implements ConsumerService { private JmsTemplate jmsTemplate;
// 接收消息
public void receive(Destination destination) {
TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
try {
System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:" + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
} ```
ConsumerListener
的实现```java package com.waylau.spring.jms.queue;
public class ConsumerListener { public String receiveMessage(String message) { System.out.println("ConsumerListener接收到一个文本消息:" + message); return "我是ConsumerListener的响应"; } } ```
QueueMessageListener
的实现```java package com.waylau.spring.jms.queue;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;
public class QueueMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("QueueMessageListener收到了文本消息:" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } } ```
ConsumerSessionAwareMessageListener
的实现```java package com.waylau.spring.jms.queue;
import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.jms.listener.SessionAwareMessageListener;
public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener
public void onMessage(TextMessage message, Session session) throws JMSException {
// 接收消息
System.out.println("会话感知侦听器收到一条消息:" + message.getText());
// 发送消息
MessageProducer producer = session.createProducer(destination);
TextMessage tm = session.createTextMessage("我是会话感知侦听器的响应");
producer.send(tm);
}
public void setDestination(Destination destination) {
this.destination = destination;
}
} ```
TopicProvider
的实现```java package com.waylau.spring.jms.topic;
import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator;
public class TopicProvider { private JmsTemplate topicJmsTemplate;
// 向指定的主题发布消息
public void publish(final Destination topic, final String msg) {
topicJmsTemplate.send(topic, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
System.out.println("主题提供者发布了主题:" + topic.toString() + ",发布消息内容为:" + msg);
return session.createTextMessage(msg);
}
});
}
public void setTopicJmsTemplate(JmsTemplate topicJmsTemplate) {
this.topicJmsTemplate = topicJmsTemplate;
}
} ```
TopicMessageListener
的实现```java package com.waylau.spring.jms.topic;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;
public class TopicMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("主题侦听器监听到消息:" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } } ```
TopicMessageListener2
的实现```java package com.waylau.spring.jms.topic;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;
public class TopicMessageListener2 implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("主题侦听器2监听到消息:" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } } ```
为了方便测试,我们编写了以下测试用例:
```java package com.waylau.spring.jms;
import javax.jms.Destination; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.waylau.spring.jms.queue.ConsumerService; import com.waylau.spring.jms.queue.ProducerService; import com.waylau.spring.jms.topic.TopicProvider;
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("/spring.xml") public class SpringJmsTest {
@Autowired
private Destination queueDestination;
@Autowired
private Destination queueDestination2;
@Autowired
private Destination sessionAwareQueue;
@Autowired
private Destination adapterQueue;
@Autowired
@Qualifier("topicDestination")
private Destination topic;
@Autowired
private TopicProvider topicProvider;
@Autowired
@Qualifier("producerService")
private ProducerService producer;
@Autowired
@Qualifier("consumerService")
private ConsumerService consumer;
// 测试消费者向queue1发送消息
@Test
public void testProduce() {
String msg = "Hello world!";
producer.sendMessage(msg);
}
// 测试消费者从queue1接收消息
@Test
public void testConsume() {
consumer.receive(queueDestination);
}
// 测试消息监听
// 1.消费者向队列queue2发送消息
// 2.消费者侦听队列,并消费消息
@Test
public void testSend() {
producer.sendMessage(queueDestination2, "Hello R2");
}
// 测试主题监听
// 1.消费者向主题发布消息
// 2.消费者侦听主题,并消费消息
@Test
public void testTopic() {
topicProvider.publish(topic, "Hello Topic!");
}
// 测试会话感知侦听器
// 1.消费者向队列sessionAwareQueue发送消息
// 2.会话感知侦听器接收消息,并向queue1队列发送回复消息
// 3.消费者从queue1消费消息
@Test
public void testAware() {
producer.sendMessage(sessionAwareQueue, "Hello sessionAware");
consumer.receive(queueDestination);
}
// 测试消息监听适配器
// 1.消费者向队列adapterQueue发送消息
// 2.消息监听适配器使消费者侦听消息,并向queue1队列发送回复消息
// 3.消费者从queue1消费消息
@Test
public void testAdapter() {
producer.sendMessage(adapterQueue, "Hello adapterQueue", queueDestination);
consumer.receive(queueDestination);
}
} ```
首先启动ActiveMQ服务,然后执行测试用例。可以在控制台看到以下输出信息:
INFO | Successfully connected to tcp://localhost:61616
INFO | Successfully connected to tcp://localhost:61616
INFO | Successfully connected to tcp://localhost:61616
INFO | Successfully connected to tcp://localhost:61616
INFO | Successfully connected to tcp://localhost:61616
ProducerService向队列queue://adapterQueue发送了消息: Hello adapterQueue
INFO | Successfully connected to tcp://localhost:61616
ConsumerListener接收到一个文本消息: Hello adapterQueue
INFO | Successfully connected to tcp://localhost:61616
ConsumerService从队列queue://queue1收到了消息: 我是ConsumerListener的响应
ProducerService向队列queue://sessionAwareQueue发送了消息: Hello sessionAware
INFO | Successfully connected to tcp://localhost:61616
会话感知侦听器收到一条消息: Hello sessionAware
INFO | Successfully connected to tcp://localhost:61616
ConsumerService从队列queue://queue1收到了消息: 我是会话感知侦听器的响应
INFO | Successfully connected to tcp://localhost:61616
主题提供者发布了主题: topic://guo_topic,发布消息内容为: Hello Topic!
主题侦听器监听到消息 Hello Topic!
主题侦听器2监听到消息 Hello Topic!
ProducerService向队列queue://queue2发送了消息: Hello R2
INFO | Successfully connected to tcp://localhost:61616
消费者侦听器收到了文本消息: Hello R2
ProducerService向队列queue://queue1发送了消息: Hello world!
INFO | Successfully connected to tcp://localhost:61616
INFO | Successfully connected to tcp://localhost:61616
消费者Service从队列queue://queue1收到了消息: Hello world!
本示例可以在“jms-msg”项目中找到。
为了满足实时、高并发、高可用等方面的需求,面向消息的分布式架构越来越流行。本章介绍了消息中间件的一些常用概念及常见的消息中间件产品,并总结了消息通信的常用模式。
由于Java语言的广泛使用,我们还介绍了Java领域中的消息通信标准——JMS,并通过实际操作引导大家完成了基于JMS的消息发送和接收示例。