Juconcurrent 学而不思则罔,思而不学则殆。

MQ(1)JMS规范

2016-10-24

JMS,全英文描述为,java message service,java消息服务。 最初是Sun公司提出,针对消息服务提供的api,用以规范整合java领域的消息服务,形成JMS1.0和JMS1.1规范。 后由Oracle公司在其基础上进行向后兼容和扩展,形成JMS2.0规范。 JMS是java领域下各种MQ基本都会实现的规范,方便我们根据不同领域场景选择各自合适的MQ,市面上比较流行的MQ罗列如:ActiveMQ/RabbitMQ/ZeroMQ/Kafka等等。

The Java Message Service (JMS) API provides a common way for Java programs to create, send, receive and read an enterprise messaging system’s messages.

JMS API为java程序提供一个公共的方式,用以创建、发送、接受和读取一个企业消息系统的消息。

本篇博客用比较简单的方式描述JMS的基本概念、服务模式及基本使用方式。

概念

Java消息服务,JMS的各个客户端通过此服务进行消息的传输。

消息模型

JMS提供两种消息模式:

  • 点对点模式(point-to-point,基于队列)
  • 发布订阅模式(publish-and-subscribe,基于主题)

P2P

模型图

概念

  • 消息队列(Queue)
  • 发送者(Sender)
  • 接受者(Receiver)

注意点

  1. 每个消息只能发送到特定的消息队列,接受者从消息队列中接受消息。队列消息会一直保存,直到消息被消费或超时
  2. 每个消息只能有一个接受者(一旦消息被消费,消息就不再在消息队列中)
  3. 消息发送者和接受者是异步的,不管接受者是否正在运行或接受失败,发送者都不会被影响
  4. 接受者在成功接受之后,会项队列进行消息应答(ack)

Publish/Subscribe

模型图

概念

  • 主题(Topic)
  • 发布者(Publisher)
  • 订阅者(Subscriber)

注意点

  1. 发布者发布消息到主题,主题的多个订阅者会接受并处理
  2. 每个消息可以对应多个消费者
  3. 消息发布者和订阅者之间是有依赖关系的,订阅者要想消费消息,必须保持运行状态
  4. 为了上述问题的缓解,JMS运行订阅者创建一个可持久化的消息,这样订阅者可以不必严格地激活或保持运行状态

组成部分

  • JMS提供者 实现了JMS api的消息系统
  • JMS客户端 发送和接受消息的基于java语言的程序
  • 消息 用于各个客户端直接传递消息的java对象
  • 管理者对象 提供者的特性对象,用以各个客户端和JMS提供者直接进行查找和使用的互动
  • JMS客户端(非Java) 非java语言的客户端实现,各个语言需要基于native api来实现

管理者对象

管理者对象用以将客户端和提供者进行连接,以便他们以简化的方式进行使用。JMS提供了两种对象。

  1. ConnectionFactory,客户端使用其和提供者进行连接
  2. Destination,客户端使用其进行指定消息发送的终点或者消息接受的来源

管理者对象的创建需要管理员进行维护管理,一旦创建,就不会进行变动。

JMS API概述

由于历史原因,JMS提供了4种可供替换的发送和接受消息的接口

  1. JMS1.0定义了两种基于领域的api,一种是基于队列的点对点模式,一种是基于主题的发布订阅模式,此版本是向后兼容的。
  2. JMS1.1介绍了一种新的统一的api,提供了单一的系列接口用以对点对点和发布订阅的支持。
  3. JMS2.0介绍了一种更为简约的api,使用上下文和try-catch-resource模式(自动资源释放)来简化我们的代码编写

公共的api(JMS1.0)

  • Message, BytesMessage, MapMessage, ObjectMessage, StreamMessage, TextMessage - 各种消息
  • Queue - 管理对象,点对点模式中目标的封装
  • Topic - 管理对象,发布订阅模式种目标的封装
  • Destination - 更为抽象的目标,也是QueueTopic的共同父类

经典的API(JMS1.1)

  • ConnectionFactory - 管理对象,用以客户端在提供者上创建一个连接
  • Connection - 提供者提供的激活的连接
  • Session - 会话类,基于单线程的消息发送和接受的上下文
  • MessageProducer - 使用Session创建的消息生产者,用以发送一个消息到QueueTopic
  • MessageConsumer - 使用Session创建的消息消费者,用以从QueueTopic接受一个消息

简化的API(JMS2.0)

  • ConnectionFactory - 管理对象,用以客户端创建一个JMS上下文
  • JMSContext - 一个激活的JMS连接,基于单线程的发送和接受消息
  • JMSProducer - 由JMSContext创建的对象,用以发送消息到QueueTopic
  • JMSConsumer - 由JMSContext创建的对象,用以从QueueTopic接受消息

扩展的基于领域的API

  • 尽管上述API已经能满足我们一些需求了,但是对于Queue和Topic却没有明确的细分。所以JMS却提供了更为细致的划分。
Queue
  • QueueConnectionFactory - 管理对象,用以创建一个QueueConnection
  • QueueConnection - 针对提供者的连接
  • QueueSession - 发送和接受消息的单线程上下文
  • QueueSender - 基于QueueSession的消息发送者
  • QueueReceiver - 基于QueueSession的消息接受者
Topic
  • TopicConnectionFactory - 管理对象,用以创建一个TopicConnection
  • TopicConnection - 针对提供者的连接
  • TopicSession - 发送和接受消息的单线程上下文
  • TopicPublisher - 基于TopicSession的消息发送者
  • TopicSubscriber - 基于TopicSession的消息接受者

JMS的使用

由于JMS只提供接口规范,不提供实现,所以要想使用JMS,必须寻找一个实现了此规范的类库才行,在此以activemq为例。JMS需要针对消息提供者和消费者书写两套代码。

生产者代码

class HelloWorldProducer implements Runnable {

    public void run() {
        try {
            // 创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

            // 创建连接
            Connection connection = connectionFactory.createConnection();
            // 连接开启
            connection.start();

            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 创建目的地 (队列或主题)
            Destination destination = session.createQueue("TEST.FOO");

            // 创建消息生产者
            MessageProducer producer = session.createProducer(destination);
            // 生产者生产的消息设置为非持久化模式
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            // 创建消息
            String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
            TextMessage message = session.createTextMessage(text);

            // 消息发送
            System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());
            producer.send(message);

            // 资源释放
            session.close();
            connection.close();
        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

消费者代码

class HelloWorldConsumer implements Runnable, ExceptionListener {

    public void run() {
        try {
            // 创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

            // 创建连接
            Connection connection = connectionFactory.createConnection();
            // 连接开启
            connection.start();

            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 创建目的地 (队列或主题)
            Destination destination = session.createQueue("TEST.FOO");

            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(destination);

            // 阻塞,直到获取到消息
            Message message = consumer.receive(1000);

            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                System.out.println("Received: " + text);
            } else {
                System.out.println("Received: " + message);
            }

            // 资源释放
            consumer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

参考文献


Content