分布式系统核心:面向服务的分布式架构,基于JMS音讯发送和接收
作者头像
  • 联e会
  • 2021-05-05 08:07:46 11

实战:基于JMS的消息发送和接收

接下来,我们将通过JMS实现消息的发送和接收功能。

项目概述

我们将创建一个名为“jms-msg”的应用。在这个应用中,我们将模拟消费者、生产者、队列、订阅等功能。为了使应用正常运行,需要在项目中添加以下依赖项:

```xml UTF-8 5.2.3.RELEASE 4.12 5.15.11

junit junit ${junit.version} test org.apache.activemq activemq-all ${activemq.version} org.springframework spring-core ${spring.version} org.springframework spring-aop ${spring.version} org.springframework spring-jms ${spring.version} org.springframework spring-test ${spring.version} ```

我们将使用Apache ActiveMQ和Spring框架来实现JMS。

项目配置

以下是Spring基于XML的核心配置内容:

```xml

queue1

guo_topic

```

编码完成

消费者服务 ProducerServiceImpl 的实现

```java package com.waylau.spring.jms.queue;

import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator;

public class ProducerServiceImpl implements ProducerService { private JmsTemplate jmsTemplate;

// 向指定队列发送消息
public void sendMessage(Destination destination, final String msg) {
    System.out.println("ProducerService向队列" + destination.toString() + "发送了消息:" + msg);
    jmsTemplate.send(destination, new MessageCreator() {
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(msg);
        }
    });
}

// 向默认队列发送消息
public void sendMessage(final String msg) {
    String destination = jmsTemplate.getDefaultDestination().toString();
    System.out.println("ProducerService向队列" + destination + "发送了消息:" + msg);
    jmsTemplate.send(new MessageCreator() {
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(msg);
        }
    });
}

// 向指定队列发送消息并设置响应队列
public void sendMessage(Destination destination, final String msg, final Destination response) {
    System.out.println("ProducerService向队列" + destination + "发送了消息:" + msg);
    jmsTemplate.send(destination, new MessageCreator() {
        public Message createMessage(Session session) throws JMSException {
            TextMessage textMessage = session.createTextMessage(msg);
            textMessage.setJMSReplyTo(response);
            return textMessage;
        }
    });
}

public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

} ```

消费者服务 ConsumerServiceImpl 的实现

```java package com.waylau.spring.jms.queue;

import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate;

public class ConsumerServiceImpl implements ConsumerService { private JmsTemplate jmsTemplate;

// 接收消息
public void receive(Destination destination) {
    TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
    try {
        System.out.println("ConsumerService从队列" + destination.toString() + "收到了消息:" + tm.getText());
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

} ```

消费者侦听器 ConsumerListener 的实现

```java package com.waylau.spring.jms.queue;

public class ConsumerListener { public String receiveMessage(String message) { System.out.println("ConsumerListener接收到一个文本消息:" + message); return "我是ConsumerListener的响应"; } } ```

消息队列侦听器 QueueMessageListener 的实现

```java package com.waylau.spring.jms.queue;

import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;

public class QueueMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("QueueMessageListener收到了文本消息:" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } } ```

会话感知侦听器 ConsumerSessionAwareMessageListener 的实现

```java package com.waylau.spring.jms.queue;

import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.jms.listener.SessionAwareMessageListener;

public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener { private Destination destination;

public void onMessage(TextMessage message, Session session) throws JMSException {
    // 接收消息
    System.out.println("会话感知侦听器收到一条消息:" + message.getText());
    // 发送消息
    MessageProducer producer = session.createProducer(destination);
    TextMessage tm = session.createTextMessage("我是会话感知侦听器的响应");
    producer.send(tm);
}

public void setDestination(Destination destination) {
    this.destination = destination;
}

} ```

主题提供者 TopicProvider 的实现

```java package com.waylau.spring.jms.topic;

import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator;

public class TopicProvider { private JmsTemplate topicJmsTemplate;

// 向指定的主题发布消息
public void publish(final Destination topic, final String msg) {
    topicJmsTemplate.send(topic, new MessageCreator() {
        public Message createMessage(Session session) throws JMSException {
            System.out.println("主题提供者发布了主题:" + topic.toString() + ",发布消息内容为:" + msg);
            return session.createTextMessage(msg);
        }
    });
}

public void setTopicJmsTemplate(JmsTemplate topicJmsTemplate) {
    this.topicJmsTemplate = topicJmsTemplate;
}

} ```

主题侦听器1 TopicMessageListener 的实现

```java package com.waylau.spring.jms.topic;

import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;

public class TopicMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("主题侦听器监听到消息:" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } } ```

主题侦听器2 TopicMessageListener2 的实现

```java package com.waylau.spring.jms.topic;

import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;

public class TopicMessageListener2 implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("主题侦听器2监听到消息:" + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } } ```

运行

为了方便测试,我们编写了以下测试用例:

```java package com.waylau.spring.jms;

import javax.jms.Destination; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.waylau.spring.jms.queue.ConsumerService; import com.waylau.spring.jms.queue.ProducerService; import com.waylau.spring.jms.topic.TopicProvider;

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("/spring.xml") public class SpringJmsTest {

@Autowired
private Destination queueDestination;

@Autowired
private Destination queueDestination2;

@Autowired
private Destination sessionAwareQueue;

@Autowired
private Destination adapterQueue;

@Autowired
@Qualifier("topicDestination")
private Destination topic;

@Autowired
private TopicProvider topicProvider;

@Autowired
@Qualifier("producerService")
private ProducerService producer;

@Autowired
@Qualifier("consumerService")
private ConsumerService consumer;

// 测试消费者向queue1发送消息
@Test
public void testProduce() {
    String msg = "Hello world!";
    producer.sendMessage(msg);
}

// 测试消费者从queue1接收消息
@Test
public void testConsume() {
    consumer.receive(queueDestination);
}

// 测试消息监听
// 1.消费者向队列queue2发送消息
// 2.消费者侦听队列,并消费消息
@Test
public void testSend() {
    producer.sendMessage(queueDestination2, "Hello R2");
}

// 测试主题监听
// 1.消费者向主题发布消息
// 2.消费者侦听主题,并消费消息
@Test
public void testTopic() {
    topicProvider.publish(topic, "Hello Topic!");
}

// 测试会话感知侦听器
// 1.消费者向队列sessionAwareQueue发送消息
// 2.会话感知侦听器接收消息,并向queue1队列发送回复消息
// 3.消费者从queue1消费消息
@Test
public void testAware() {
    producer.sendMessage(sessionAwareQueue, "Hello sessionAware");
    consumer.receive(queueDestination);
}

// 测试消息监听适配器
// 1.消费者向队列adapterQueue发送消息
// 2.消息监听适配器使消费者侦听消息,并向queue1队列发送回复消息
// 3.消费者从queue1消费消息
@Test
public void testAdapter() {
    producer.sendMessage(adapterQueue, "Hello adapterQueue", queueDestination);
    consumer.receive(queueDestination);
}

} ```

首先启动ActiveMQ服务,然后执行测试用例。可以在控制台看到以下输出信息:

INFO | Successfully connected to tcp://localhost:61616 INFO | Successfully connected to tcp://localhost:61616 INFO | Successfully connected to tcp://localhost:61616 INFO | Successfully connected to tcp://localhost:61616 INFO | Successfully connected to tcp://localhost:61616 ProducerService向队列queue://adapterQueue发送了消息: Hello adapterQueue INFO | Successfully connected to tcp://localhost:61616 ConsumerListener接收到一个文本消息: Hello adapterQueue INFO | Successfully connected to tcp://localhost:61616 ConsumerService从队列queue://queue1收到了消息: 我是ConsumerListener的响应 ProducerService向队列queue://sessionAwareQueue发送了消息: Hello sessionAware INFO | Successfully connected to tcp://localhost:61616 会话感知侦听器收到一条消息: Hello sessionAware INFO | Successfully connected to tcp://localhost:61616 ConsumerService从队列queue://queue1收到了消息: 我是会话感知侦听器的响应 INFO | Successfully connected to tcp://localhost:61616 主题提供者发布了主题: topic://guo_topic,发布消息内容为: Hello Topic! 主题侦听器监听到消息 Hello Topic! 主题侦听器2监听到消息 Hello Topic! ProducerService向队列queue://queue2发送了消息: Hello R2 INFO | Successfully connected to tcp://localhost:61616 消费者侦听器收到了文本消息: Hello R2 ProducerService向队列queue://queue1发送了消息: Hello world! INFO | Successfully connected to tcp://localhost:61616 INFO | Successfully connected to tcp://localhost:61616 消费者Service从队列queue://queue1收到了消息: Hello world!

本示例可以在“jms-msg”项目中找到。

本篇小结

为了满足实时、高并发、高可用等方面的需求,面向消息的分布式架构越来越流行。本章介绍了消息中间件的一些常用概念及常见的消息中间件产品,并总结了消息通信的常用模式。

由于Java语言的广泛使用,我们还介绍了Java领域中的消息通信标准——JMS,并通过实际操作引导大家完成了基于JMS的消息发送和接收示例。

下一步

  • 下一篇文章将介绍分布式系统核心:REST风格的架构。
  • 如果您觉得这篇文章不错,欢迎分享并关注我们。
  • 感谢您的支持!
    本文来源:图灵汇
责任编辑: : 联e会
声明:本文系图灵汇原创稿件,版权属图灵汇所有,未经授权不得转载,已经协议授权的媒体下载使用时须注明"稿件来源:图灵汇",违者将依法追究责任。
    分享
分布式音讯架构接收面向发送基于核心系统服务
    下一篇