Table of contents


本文中內容,主要以ActiveMQ 5.4為主,若不是此版本,程式碼不一定可以執行

JMS 2 於2013年Q1 release,使用者更會簡單,也內建訊息延遲發送功能(目前每一個使用方式都不一樣),如果沒有相容性問題,建議採 JMS 2.0

基本觀念

JMS 基本元素

  • provider :
  • client : 產出(傳送)或消費(接收)訊息
  • consumer : 消費(接收)訊息
  • message : 在client間被傳遞的訊息物件
  • destination : 信息的目的或來源
    1. queue : 訊息佇列,destination在PTP叫queue
    2. topic : 一個散佈訊息給多個訂閱者的機制,destination在pub/sub叫topic
  • session : thread context,用來producing/consuming messages
  • Administered Objects

Messaging Domains

Point-to-Point Messaging Domain (PTP)
  • 基於queues, senders, and receivers的模型
  • 每一個message同時只有一個Consumer
  • Consumer跟Producer沒有時間的依賴性,Producer發送訊息後,Consumer可以隨時上去接收
  • Consumer回應訊息處理狀態

jms_101_001.gif

Publish/Subscribe Messaging Domain (pub/sub)
  • 主要就三個要素
    1. Publisher 發佈者:發佈訊息
    2. Subscriber 訂閱者: 訂閱訊息
    3. Topic 訊息的閱者,為發佈者或訂閱者之間的channel
  • Broadcast型的結構
  • 每一個Topic可以多個Subscribers
  • Subscriber跟Publisher時間的依賴性,Subscriber可以必須先進行訂閱後,才會收到Producer發佈的Topic
  • Subscriber 依連線方式可以分為
    1. Nondurable Subscribers : 必須一直保持在接收狀態^1)^
    2. Durable Subscribers : 建立後,如以inactive,當active後,會將累積的訊息一起收下來
  • Subscriber 依建立方式可分為
    1. Dynamic Subscribers : 程式動態建立
    2. Administered Subscribers : 由provider (server)建立

jms_101_002.gif

JMS API

在JMS裡,上述的觀念,主要是透過JMS API來定義的,JMS API只是定義規格(類似於JDBC),至於實作會由其他實作的vendors提供,JMS API主要的類似如下:

  • ConnectionFactory
  • Destination
  • Connection
  • Session
  • Message
  • MessageProducer
  • MessageConsumer

建立Connection/Session的流程

不論是PTP或是pub/sub模組,要跟MessageQuery Server溝通的流程是差不多的,流程如下

jms_101_003.gif

  1. 建立ConnectionFactory
  2. 由ConnectionFactory建立Connection
  3. 由Connection建立Session
  4. 由Session建立訊息(包含訊息內容,目的(Destination),模型,…)

與Message Server建立連線

由ActiveMQ提供的ActiveMQConnectionFactory建立connection

    // ActiveMQConnectionFactory是ActiveMQ專屬寫法,會有平台跟lib依賴性,可用jndi來解決這個問題
    factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    connection = factory.createConnection();
    connection.start(); 

剛建立好後的connection是在停止狀態的,必需先啟動才有辦法使用,一般啟動後,不用特別去stop,因為stop也會一併停止接受來自mq server的訊息,但是如果不希望connection一直連著,可以把他close掉

用JNDI的方式建立connection

這種方式比較通用,不會依賴特定平台的API

    Properties env = new Properties();
    env.put(Context.SECURITY_PRINCIPAL, "system");
    env.put(Context.SECURITY_CREDENTIALS, "manager");
    env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
    env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
    InitialContext ctx = new InitialContext(env);
    TopicConnectionFactory conFactory = (TopicConnectionFactory)ctx.lookup(topicFactory);

建立Session

用傳統的方式建立Session

connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

用Annotation的方式建立connection

這種方式,需要DI Container的支援

    @Resource(lookup = "jms/Queue")
    private static Queue queue;
     
    @Resource(lookup = "jms/Topic")
    private static Topic topic;

Hello JMS

以下是幾個比較完整(可執行)的範例,可以在ActiveMQ的web admin(ex: http://localhost:8161/admin/queues.jsp )畫面上看到息的狀態

傳送訊息 (PTP)

    @Test
    public void sendMessagePTP() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 記得先確定61616 port的service有enabled
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("queue1"); // 建立PTP message,Destination為"queue1"
        MessageProducer sender = session.createProducer(destination);
        TextMessage message = session.createTextMessage("Hello World!");
        sender.send(message);
    }

執行後可以在ActiveMQ的”Queue”看到”queue1”的message

jms_101_005.png

  • Number Of Pending Messages: 在queue等待處理的訊息的數量
  • Number Of Consumers: 連線中,正在等待訊息的Consumers的數量(正在listening的client的數量)
  • Messages Enqueued : 進入佇列的訊息數量
  • Messages Dequeued : 離開佇列的訊息數量

取得訊息 (PTP)

    @Test
    public void retrieveMessagePTP() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 記得先確定61616 port的service有enabled
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("queue1"); // 建立PTP message,Destination為"queue1"
        MessageConsumer consumer = session.createConsumer(destination);
        System.out.println(consumer.receive());
        // 取回的訊息如下(內容會依執行環境的不同而不同)
        // ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:kent-PC-4384-1294370677008-0:0:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:kent-PC-4384-1294370677008-0:0:1:1, destination = queue://queue1, transactionId = null, expiration = 0, timestamp = 1294370677215, arrival = 0, brokerInTime = 1294370677215, brokerOutTime = 1294370677264, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = Hello World!}
    }

建立Topic (Pub/Sub)

    @Test
    public void createTopic() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        TopicConnection connection = factory.createTopicConnection();
        connection.start();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("MyTopic");
        TopicPublisher publisher = session.createPublisher(topic);
    }

執行後可以在ActiveMQ的”Topic”看到”MyTopic”

jms_101_006.png

建立Topic並發佈訊息 (Pub/Sub)

    @Test
    public void createTopicAndPublishMessage() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        TopicConnection connection = factory.createTopicConnection();
        connection.start();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("MyTopic2");
        TopicPublisher publisher = session.createPublisher(topic);
        TextMessage message = session.createTextMessage("MyTopic2 Message Body");
        publisher.publish(message);
    }

執行後可以在ActiveMQ的”Topic”看到”MyTopic2”,”Messages Enqueued”會加1

jms_101_006.png

訂閱Topic

    @Test
    public void subscribeMessage() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        TopicConnection connection = factory.createTopicConnection();
        connection.start();
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("MyTopic2");
        TopicSubscriber subscriber = session.createSubscriber(topic);
        System.out.println(subscriber.receive()); // 執行後,會停在這一行,直到有訊息被取回
        // result
        // ActiveMQTextMessage {commandId = 7, responseRequired = false, messageId = ID:kent-PC-2524-1294364408121-4:5:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:kent-PC-2524-1294364408121-4:5:1:1, destination = topic://MyTopic2, transactionId = null, expiration = 0, timestamp = 1294375382476, arrival = 0, brokerInTime = 1294375382477, brokerOutTime = 1294375382478, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = MyMessage}
     
        // 也可以設定timeout,如果timeout前都沒有取到訊息,會傳回null
        // System.out.println(subscriber.receive(10 * 1000)); // 執行後,會停在這一行,直到有訊息被取回或10秒後timeout,並傳回null
        // result
        // null
    }

以上面的例子來說,如果是同步接受的,一收完訊息,程式會馬上往下執行,並結速,這樣便無法一直接到MQ Server的訊息, 如果想持續一直收到MQ的訊息,可以recive()後加一個無限迴圈,讓程式不會結束,即可以一直收到訊息

TopicSubscriber subscriber = session.createSubscriber(topic);
subscriber.recive();
connection.start();
while (true) {
    // prevent main thread stop 
}

執行後可以在ActiveMQ的”Topic”看到”MyTopic2”,”Number Of Consumers”會加1

jms_101_007.png

程式會一直hold住(如果沒設timeout的話),此時,可以透過”Send To”的hyperlink,讓”MyTopic2”發佈一個訊息,這樣”subscriber.receive()“就會取得”MyTopic2”發佈的訊息了.

jms_101_008.png

JMS API使用說明

Destination

Destination的結構如下

jms_101_004.png

Destination本身是一個空的Interface,它主要是要標明,Destination的概念,底下有Queue(用在PTP)跟Topic(用在Pub/Sub)兩個介面繼承自Destination

package javax.jms;
 
public interface Destination {
}

PTP model

Producing / Consuming Message

    MessageConsumer consumer = session.createConsumer(dest);
    MessageConsumer consumer = session.createConsumer(queue);
    MessageConsumer consumer = session.createConsumer(topic);
     
    connection.start();
    Message m = consumer.receive();
    connection.start();
    Message m = consumer.receive(1000); // time out after a second

Queue Browsers

QueueBrowser browser = session.createBrowser(queue);

Pub/Sub model

Message

Message Header

Header Field Set By description
JMSDestination send or publish method 訊息的目的地
JMSDeliveryMode send or publish method DeliveryMode.NON_PERSISTENT (不存到DB) or DeliveryMode.PERSISTENT (存到DB)
JMSExpiration send or publish method 訊息過期的時間
JMSPriority send or publish method 0-9的值(9最高),0-4是一般,5-9是 “加急”
JMSMessageID send or publish method 一個唯一的訊息id字串
JMSTimestamp send or publish method 訊息傳送的時間
JMSCorrelationID Client 相關的id (參閱下面的說明)
JMSReplyTo Client 讓client知道,要”回信”給誰
JMSType Client Message Type
JMSRedelivered JMS provider 值為true或false,指明message之前是否沒有被consumer成功取回

只有 Set By Client的header需要進行設定,其他的大部份是由send或publish的method設定.

JMSCorrelationID

在大多數的情況下,JMSCorrelationID會是指回應那一個JMSMessageID,但JMSCorrelationID可以是任何的值,不見得要是JMSMessageID. 一般也會被拿來放Application-specific ID.

Message Type
Message Type Body Contains
TextMessage A java.lang.String object (for example, the contents of an XML file).
MapMessage A set of name-value pairs, with names as String objects and values as primitive types in the Java programming language. The entries can be accessed sequentially by enumerator or randomly by name. The order of the entries is undefined.
BytesMessage A stream of uninterpreted bytes. This message type is for literally encoding a body to match an existing message format.
StreamMessage A stream of primitive values in the Java programming language, filled and read sequentially.
ObjectMessage A Serializable object in the Java programming language.
Message Nothing. Composed of header fields and properties only. This message type is useful when a message body is not required.

Message Property

Property像是message的metadata,可以放入一些特定的屬性,之後可以透過Message Selectors的功能來做查詢。^2)^

    TextMessage message = pubSession.createTextMessage();
    message.setText(text);
    message.setStringProperty("username",username);
    publisher.publish(message);

JMS-Defined Properties & Provider-Specific Properties

另外,還會有JMS訂義的JMS-Defined Properties及Provider-Specific Properties

JMS-Defined Properties

  1. JMSXUserID
  2. JMSXAppID
  3. JMSXProducerTXID
  4. JMSXConsumerTXID
  5. JMSXRcvTimestamp
  6. JMSXDeliveryCount
  7. JMSXState
  8. JMSXGroupID
  9. JMSXGroupSeq

Message Selectors

Message Selectors的功能,是在Messages中過濾出需要的部份,Messages只對針對property進行過濾,不能針對message body做過濾,但可以透過簡單的expression來操作properties.

String filter = "(currentRate - newRate) >= 1.0";
TopicSubscriber subscriber = session.createSubscriber(topic, filter, true);

currentRate範例中的currentRatenewRate是message的User定義的properties

Message Selectors的語法是SQL 92的子集,可以用許多SQL 92的運算符來作為expression的一部份,ex:

Shares > 1000.0 AND Symbol LIKE 'A%C'

Message Filtering approach VS Multiple Destination approach

在設計時,可以採用Message Filtering或Multiple Destination的方式進行訊息的分類

Message Filtering approach :是所有的訊息進到同一個Destination後透過selector進行分類選取。

Multiple Destination approach:是不同類的訊息,一開始就分派到不同的Destination。

Message Persistent

Message可以選擇Persistent或Nonpersistent,Persistent的訊息是存在db或file,Nonpersistent只是放在memory 所以Nonpersistent的訊息,在server重啟後就不見了,如果是Persistent,server重啟後,還是會存在。

Persistent或Nonpersistent是由JMSDeliveryMode決定。

Acknowledge Mode

Acknowledge Mode是用來確定訊息傳遞的可靠性的,需要做Acknowledge(回應,確認)的是在傳送或接收(發佈或訂閱)訊息時,client用來回應provider訊息有確實收到的機制

Acknowledge Mode是在建立session時決定的。

    Session = Connect.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
    Session = Connect.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);

Acknowledge Mode有以下幾種

  1. AUTO_ACKNOWLEDGE 自動回應
  2. CLIENT_ACKNOWLEDGE 由客戶端自行回應
  3. DUPS_OK_ACKNOWLEDGE 可重覆的回應

AUTO_ACKNOWLEDGE

只要訊息成功被傳送或接收,就會產生回應,而且最多只會回應一次(once-and-only-once)

CLIENT_ACKNOWLEDGE

如果客戶端需在傳送時接受訊息後,先進行一些額外的處理,等這些額外的處理直正的成功後,才進行回應,如果失敗的話(或沒回應的話),訊息就不會被產生出來(或消化掉)

DUPS_OK_ACKNOWLEDGE

如果不介意訊息被重覆的傳送或接收,可以用這個模式,DUPS_OK_ACKNOWLEDGE mode,其最大的特色就是效能會有顯著的提昇。 要使用這個模式,需要把 JMSRedelivered屬性設為true

Transactions

Transactions是用來做批量messages的處理,用來保證所有的messages都有被成功的同時送出或取回。

Transactions的啟用方式是將createSession的第一個參數設為true,第二個參數將會被忽略,當所以有訊息被處理後,可以透過session.commit()提交或session.rollback()取消。

Session session = connection.createSession(true, Session.SESSION_TRANSACTED);;

Transport Connectors (ActiveMQ)

ActiveMQ提供許多連線的Protocol叫Transport Connectors,Transport Connectors所使用的ports是透過ActiveMQ_HOME/conf/activemq.xml進行設定。

<!-- The transport connectors ActiveMQ will listen to -->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"
discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="xmpp" uri="xmpp://localhost:61222"/>
</transportConnectors>

連線格式

JMS是透過URIs當做連線格式

<scheme>:<scheme-specific-part>
<scheme>://<authority><path><?query>

ex:

tcp://localhost:61616
tcp://hostname:port?key=value&key=value

特別需要注意的是 JMS 亦提供composite URIs

static://(tcp://host1:61616, tcp://host2:61616)

Protocol
  1. tcp
  2. nio
  3. ssl (需要認證)
  4. http/https (80 port)
  5. VM (with other JVM)

如果有許多clients需要連線到MQ, 連線數可能會受到OS限制,這時可以採用NIO的protocol

nio://hostname:port?key=value

遠端監控ActiveMQ

有時需要取得MQ上面的連線資訊或管理MQ Server,此時可以透過JMX的功能進行監控。 要進行JMX前,可以需先開啟設定

    <managementContext>
        <managementContext createConnector="true"/> 
    </managementContext>

把managementContext/managementContext的createConnector屬性設成true即可

完成後,重新啟動mq,然後,可以先用jdk內建的jconsole(jdk/bin/jconsole.exe)做連線測試

啟動jconsole後,remote process填入

service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi

填入後,按連線即可(本例中,mq跟jconsole在同一台,所以用localhost,如果不同一台,需先確認1099的port沒被防火牆擋住), 連線成功後,即可取得MQ的相關資訊,需要的內容會在MBeans的tab頁上

jms_101_009.png

確定可以用jconsole連線後,便可以使用程式取得連線資訊

RemoteJMXBrokerFacade createConnector = new RemoteJMXBrokerFacade();
System.setProperty("webconsole.jmx.url", "service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
SystemPropertiesConfiguration configuration = new SystemPropertiesConfiguration();
createConnector.setConfiguration(configuration);
// 可透過brokerAdmin取得broker上的資訊
BrokerViewMBean brokerAdmin = createConnector.getBrokerAdmin();
String brokerName = brokerAdmin.getBrokerName();
brokerAdmin.getTotalConsumerCount();  
brokerAdmin.getTotalMessageCount();

Resources

[java

^1)^ Durable Subscribe可以不用一直保持連線狀態

^2)^ message body本身是沒辦法做查詢的