16boke - 一路博客

JMS应用服务器工具

8.1 概述

本章描述用于并发处理订阅消息的JMS工具。也定义了JMS提供商如何支持JTS可感知的会话。这些工具主要由JMS提供商使用。

如果JMS客户端使用JTS可感知工具来进行客户端编程,则它可能是不可移植的,因为不要求JMS提供商支持这些接口。

在本章中描述的工具是JMS的一个特殊类别。它们是可选的,可能只有部分JMS提供商对它们提供支持。

8.2 并发处理订阅的消息

JMS提供了一个特殊的工具用于创建MessageConsumer,它可以并发的消费消息。

这个工具将这项工作分成三个角色:

• JMS提供商——用于转发消息。

• 应用服务器——用于创建消费者和管理由并发MessageListener对象使用的线程。

• 应用——用目的地和可选的消息选择器定义一个订阅,并提供一个单线程的MessageListener类来消费它的消息。应用服务器将构造这个类的多个对象来并发消费消息。

8.2.1 Session

会话提供了应用服务器使用的三个方法:

• setMessageListener()和getMessageListener()——会话的MessageListener通过ConnectionConsumer消费分配到这个会话的消息,如后面的几个段落描述。

• run()——会话的MessageListener依次处理由ConnectionConsumer分配到会话的消息。当监听器从最后一个消息处理返回时,run()返回。

应用服务器通常被给一个MessageListener类,它包含了由应用程序员编写的处理消息的单线程代码。应用服务器也被给监听器要消费消息的目的地和消息选择器。

应用服务器将负责创建处理消息处理的JMS Connection,ConnectionConsumer和Session。应用服务器将按需创建MessageListener实例并将它们注册到它自己的会

话中。

由于许多监听器需要使用会话的服务,因此监听器很可能要求将它的会话传入到它的构造器中。

8.2.2 ServerSession

ServerSession是一个由应用服务器实现的对象。应用服务器使用它来将一个线程和一个JMS会话关联起来。

ServerSession实现了两个方法:

• getSession()——返回ServerSession的JMS会话。

• start()——启动ServerSession线程的执行,并执行关联的JMS会话的run方法。

8.2.3 ServerSessionPool

ServerSessionPool是一个由应用服务器实现的对象,它提供了用于处理ConnectionConsumer的消息的ServerSession池。

它只有一个方法是getServerSession()。这个方法从池中删除一个ServerSession并将它返回给调用者(假定是ConnectionConsumer),用于消费一个或多个消息。

JMS没有说明如何实现池。它可能是一个静态的ServerSession池,或者它可以使用更专业的算法来按需动态创建ServerSession。

如果ServerSessionPool没有了ServerSession,那么getServerSession()方法可能阻塞。如果ConnectionConsumer被阻塞,那么它不能转发新的消息直到ServerSession返回。

8.2.4 ConnectionConsumer

对于应用服务器来说,连接提供了创建ConnectionConsumer的专用工具。它消费的消息由目的地和消息选择器指定。另外,ConnectionConsumer 必须交给ServerSessionPool用于处理它的消息。指定maxMessages值用于限制ConnectionConsumer一次可以加载到ServerSession会话的消息数量。

通常,当拥堵轻微时,ConnectionConsumer从池中得到一个ServerSession,用一个消息加载它的会话,然后启动它。当拥堵严重时,消息可能被退回。如果发生退回,ConnectionConsumer可以用多个消息来加载每个Session。这减少了线程上下文的切换,减小了某些消息处理排序时资源的使用。

8.2.5 ConnectionConsumer如何使用ServerSession

由JMS提供商实现的ConnectionConsumer使用ServerSession来处理一到多个到达的消息。按以下方式做这项工作:

• 从ServerSessionPool中得到一个ServerSession。

• 得到ServerSession的Session。

• 用一个或多个消息加载Session。

• 然后其他ServerSession来消费这些消息。

用于QueueConnection的ConnectionConsumer将它的消息加载到QueueSession,同样,用于TopicConnection的ConnectionConsumer加载TopicSession。

注意,JMS没有说明ConnectionConsumer如何用消息加载Session.由于ConnectionConsumer和Session都由同一个JMS提供商实现,因此它们可以用私有机制完成这种加载。

8.2.6 应用服务器如何实现ServerSession

JMS没有说明ServerSession的实现。下面出现的普通实现解释了这个概念:

• 应用服务器为ServerSession创建一个线程,注册ServerSession的runObject。runObject的实现是应用服务器私有的。

• ServerSession的start方法调用线程的start方法。和所有的java线程一样,调用start初始化线程的执行并调用线程的runObject。ServerSession.start(ConnectionConsumer)的调用者和

ServerSession的runObject现在运行在不同的线程中。

• runObject将做一些内部处理然后调用会话的run()方法。在返回时,runObject将ServerSession返回给ServerSessionPool然后返回。这终止了ServerSession线程的执行,并再次循环启动。

8.2.7 结果

JMS定义了灵活的机制将并发消息消费的工作拆分成适合于每个参与者的角色。

应用程序员提供一种易于书写单线程的MessageListener实现。

JMS提供者保持对消息的控制直到它们被转发到MessageListener。这保证它是在消息确认的直接控制之下。

应用服务器控制ConnectionConsumer的设置和管理用于执行MessageListener的线程。

下图解释了三个角色和它们实现的对象间的关系。

图 8.1.

下图解释了ConnectionConsumer将消息转发给MessageListener的过程。

图 8.2.

8.3 XAConnectionFactory

某些应用服务器提供对用于分布式事务的资源分组提供支持。为了在分布式事务中包含JMS事务,应用服务器要求JTA兼容JMS提供商。JMS提供商用JMS

XAConnectionFactory暴露它的JTA支持,应用服务器使用XAConnectionFactory来创建XAConnection。

XAConnectionFactory提供了和ConnectionFactory一样的授权选项。

XAConnectionFactory对象是JMS受管理对象,就像ConnectionFactory对象。期望用应用服务器使用JNDI找到它们。

8.4 XAConnection

XAConnection通过提供创建XASession扩展了Connection的能力。

8.5 XASession

XASession提供了获取看起来象普通Session对象的对象和控制会话事务上下文的javax.transaction.xa.XAResource对象。XAResource的功能非常类似于标准X/Open XA资源接口定义的功能。

应用服务器通过获取XAResource来控制XASession的事务分配。它使用XAResource来将会话分配给分布式事务,准备和提交事务上的工作等等。

XAResource为多事务上的交叉工作、恢复处理中的事务列表等等提供了公平的专业工具。JTA感知的JMS提供上必须全部实现这个功能。这可以通过使用支持XA的数据库服务来做到,或者JMS提供商可以选择从基础开始来实现这个功能。

将XASession的Session赋给应用服务器的客户端。之后,应用服务器控制后台XASession的事务管理。

但必须注意的是,分布式事务上下文不会随着消息流动;也就是说,接收消息的事务和和生产消息的事务不能是同一个。这是异步消息和同步处理间的基本差别。消息生产者和消费者使用两种方式来建立JMS提供商保证一次只有一个消息被转发的能力。

再次重申,在Session中生产和/或消费消息的行为都可以是事务性的。在不同会话间生产和消费一个特定消息的行为不能是事务性的。

8.6 JMS应用服务器接口

PTP和Pub/Sub两个域都提供了它们自己的JTS感知的JMS工具。

但是,应当优先使用公共接口。表8-1列出了JMS的公共接口。

表 8.1. 域内可选接口的关系

JMS公共接口PTP接口Pub/Sub接口
ServerSessionPool 没有域特定的接口没有域特定的接口
ServerSession 没有域特定的接口没有域特定的接口
ConnectionConsumer 没有域特定的接口没有域特定的接口
XAConnectionFactory XAQueueConnectionFactoryXATopicConnectionFactory
XAConnection XAQueueConnection XATopicConnection
XASession XAQueueSession XATopicSession