Spring 消息JMS

JMS简介

像RMI,Hessian/Burlap这样的远程调用机制是同步的.而JMS提供了应用之间的异步通信机制.当异步发送消息时,客户端不需要等待服务处理消息,甚至不需要等待消息被投递.客户端发送消息然后继续执行,这是因为客户端假定服务最终可以收到并处理这个消息.

在JMS中有2个主要概念:消息代理(message broker)目的地(destination).
消息代理可以确保消息被投递到指定的目的地,同时释放发送者,使其能够继续进行其它的业务.
在JMS中每条消息都带有一个消息目的地,目的地就好像一个邮箱,可以将消息放入这个邮箱,直到有人将他们取走.
但是消息不想信件地址那么具体.目的地只关注消息应该从哪里获得,而不关心是由谁取走消息的.
在JMS中,有2种类型的目的地:队列主题.每种类型都与特定的消息模型相关联,分别是应用于队列的点对点模型和应用于主题的发布/订阅模型.

  • 点对点消息模型
    在点对点模型中,每一个消息都有一个发送者和一个接收者.
    如图12.3所示,当消息代理得到消息时,它将消息放入一个队列中.当接收者请求队列中的下一条消息时,消息会从队列中取出,投递给接收者.因为消息投递后会从队列中删除,这样就可以保证消息只投递给一个接收者.但是可以使用多个接收者从队列中接收消息.只不过,每个接收者只处理自己接收的消息.
    Alt text

  • 发布订阅消息模型
    在发布订阅消息模型中,消息会发送给一个主题,与队列类似,多个接收者都可以监听一个主题.但是,与队列不同的是,消息不再是只投递给一个接收者,所有的主题订阅者都会接收到此消息,如图12.4所示.
    Alt text
    对于jms,发布者并不知道是谁订阅了它的消息,发布者仅仅知道它的消息发送到一个特定的主题,而不知道有谁在监听这个主题,也就是说,发布者不知道消息是如何被处理的.

    在Spring中搭建消息代理

    创建连接工厂

    我们需要使用Spring连接工厂通过消息代理发送消息.这里我们选择ActiveMQ作为我们的消息代理.
    在Spring中可以使用如下的方式配置:

    1
    2
    3
    4
    <bean id="connectionFactory"
    class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

也可以使用activeMQ自己的Spring配置命名空间来声明连接工厂,首先需要确保在Spring的配置xml文件中声明了amq命名空间.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.2.9.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.4.2.xsd"
default-lazy-init="true">
<!-- <bean id="connectionFactory"
class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean> -->
<amq:connectionFactory id="connectionFactory"
brokerURL="tcp://localhost:61616" />
</beans>

注意<amq:connectionFactory>元素很明显是为ActiveMQ准备的 ,如果我们使用不同的消息代理实现,它们不一定会提供Spring配置命名空间.如果spring没有提供的话,那我们就需要使用<bean>来装配连接工厂.
这个配置指明了连接工厂消息代理的位置,是在本地机器的61616端口的ActiveMQ.

####声明ActiveMQ消息目的地
目的地可以是一个队列,也可以是一个主题,这取决于应用的需求.但是,不论使用的是队列还是主题,我们都必须使用特定的消息代理实现类在Spring中配置目的地Bean.如,下面的<bean>声明定义了一个ActiveMQ队列.

1
2
3
4
<!-- 声明ActiveMQ消息目的地-队列 -->
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="spitter.queue"/>
</bean>

或者根据需求,定义为主题:

1
2
3
4
<!-- 声明ActiveMQ消息目的地-主题 -->
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="spitter.topic"/>
</bean>

或者换成下面的ActiveMQ命名空间定义的方式:

1
<amq:queue id="queue" physicalName="spitter.queue" />

1
<amq:topic id="topic" physicalName="spitter.topic" />

声明好了基本组件,我们现在已经准备好发送和接收消息了.为此没我们准备使用Spring的JmsTemplate–spring对Jms支持的核心组件.

###使用Spring的JMS模板

####处理失控的JMS代码
使用传统的Jms发送消息(不使用Spring).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 发送消息
*/
public void sendMessage(){
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616") ;
Connection conn = null ;
Session see = null ;
try {
conn = cf.createConnection() ;
see = conn.createSession(false, Session.AUTO_ACKNOWLEDGE) ;
Destination dec = new ActiveMQQueue("spitter.queue") ;
MessageProducer producer = see.createProducer(dec) ;
TextMessage message = see.createTextMessage() ;
message.setText("Hello World!");
producer.send(message);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
try {
if(see!=null){
see.close();
}
if(conn !=null){
conn.close();
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

传统的接收消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 接收消息
*/
public void reciveMessage(){
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616") ;
Connection conn = null ;
Session see = null ;
try {
conn = cf.createConnection() ;
conn.start();
see = conn.createSession(false, Session.AUTO_ACKNOWLEDGE) ;
Destination dec = new ActiveMQQueue("spitter.queue") ;
MessageConsumer consumer = see.createConsumer(dec) ;
Message message =consumer.receive() ;
TextMessage textMessage = (TextMessage) message ;
System.out.println("receive a message:"+textMessage);
conn.stop();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
try {
if(see!=null){
see.close();
}
if(conn !=null){
conn.close();
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

可以看到代码混乱,有很多样板式代码,难以维护
下面使用JmsTemplate处理失控的JMS样板式代码:

####使用JMS模板

  • 装配jms模板
    1
    2
    3
    4
    <!-- 声明JmsTemplate模板 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    </bean>

设置connectionFactory,说明如何连接到消息代理

  • 发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @(spring)Autowired
    JmsTemplate jmsTemplate;
    jmsTemplate.send("spittle.alert.queue",
    new MessageCreator(){
    public Message createMessage(Session session)
    throws JMSException{
    return session.createObjectMessage(spittle) ;
    }
    }
    );
  • 设置默认目的地
    一般情况下目的地都是相同的,这种情况下可以配置默认的目的地,就可以在发送消息时省略指定目的地.

    1
    2
    3
    4
    5
    <!-- 声明JmsTemplate模板 ,指定默认目的地-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestinationName" value="spittle.alert.queue" />
    </bean>
  • 接收消息

    1
    2
    3
    4
    5
    6
    try{
    ObjectMessage receiveMessage = (ObjectMessage)jmsTemplate.receive() ;
    receiveMessage.getObject() ; //获取对象
    }catch(JMSException jmsException){
    throw JmsUtils.convertJmsAccessException(jmsException);
    }

使用JmsTemplate接收消息的最大缺点是,receive()方法是同步的,这意味着接收者必须耐心等待消息的到来,因此,receive()方法会一直阻塞,直到有可用消息(或者直到超时).同步接收异步发送的消息,是不是感觉很怪异?
这就是消息驱动的POJO的用武之地,下面看看它是如何工作的.

消息驱动的POJO

创建 ,配置消息监听器

为POJO赋予消息接收能力的诀窍是在spring中把它配置为消息监听器.
POJO:

1
2
3
4
5
6
public class MsgListenerPojo {
public void processMessage(){
//TODO
}
}

将其配置为消息监听器:

1
2
3
4
5
6
<!--配置消息监听 -->
<bean id="msgListenerPojo" class="com.mj.springAction.activeMq.MsgListenerPojo" />
<jms:listener-container connection-factory="connectionFactory">
<jms:listener destination="spitter.alert.queue"
ref="msgListenerPojo" method="processMessage" />
</jms:listener-container>

消息监听器容器是一个特殊的bean,它会监控JMS目的地,并等待消息的到达,一旦有消息到达,它取出此消息,然后把消息传给任意一个对此感兴趣的消息监听器.当消息到达spitter.alert.queue,msgListenerPojoprocessMessage方法会被触发.

使用基于消息的RPC

除了远程服务,我们这里讨论下,如何通过队列和主题在应用之间发送消息.即如何通过jms作为传输通道进行远程调用.
Spring提供了两种基于消息的RPC方案:

  1. Spring自身提供了JmsInvokerServiceExporter ,可以把Bean导出为基于消息的服务.为客户端提供了JmsInvokerProxyFactoryBean来使用这些服务.
  2. Lingo通过它的JmsServiceExporterJmsProxyFactoryBean提供了类似的基于消息的远程方法调用.

使用Spring基于消息的RPC

导出基于JMS的服务

配置导出器的属性:

1
2
3
<bean id="alertServiceExporter" class="org.springframework.jms.remoting.JmsInvokerServiceExporter"
p:service-ref="alertService"
p:serviceInterface="com.habuma.spitter.alerts.AlertService" />

配置JMS监听:

1
2
3
4
<jms:listener-container connection-factory="connectionFactory">
<jms:listener destination="spitter.alert.queue"
ref="alertServiceExporter" />
</jms:listener-container>

访问基于JMS的服务

这个时候,基于JMS的提醒服务已经准备好了,等待队列名为spitter.alert的RPC消息到达.
在客户端JmsInvokerProxyFactoryBean用来访问服务.
为了访问提醒服务,我们可以像下面这样配置JmsInvokerProxyFactoryBean:

1
2
3
4
5
<bean id="alertService" class="org.springframework.jms.remoting.JmsInvokerProxyFactoryBean">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queueName" value="spitter.alert.queue" />
<property name="serviceInterface" value="com.habuma.spitter.alerts.AlertService" />
</bean>

使用Lingo实现异步RPC

不同于JMS invoker,Lingo充分利用了JMS的异步特性实现了异步调用服务.
Lingo并不是spring框架的一部分,它是基于Spring远程机制而建立的独立项目,提供了基于JMS的服务导出器和客户端代理.

导出异步服务
1
2
3
4
5
<bean id="alertServiceExporter" class="org.logicblaze.lingo.jms.JmsServiceExporter"
p:connectionFactory-ref="connectionFactory"
p:destination-ref="alertServiceQueue"
p:service-ref="alertService"
p:serviceInterface="com.habuma.spitter.alerts.AlertService" />

因为JmsServiceExporter在spring的监听容器中不能被用作消息监听器,所以我们必须通过connectionFactory建立连接工厂,通过destination配置目的地,这样,JmsServiceExporter才能知道如何发送消息.
注意,destination-ref属性的值是javax.jms.Destination,所以我们需要装配一个目的地bean的引用.如下:

1
<amq:queue id="alertServiceQueue" physicalName="spitter.alert.queue" />

代理异步服务

LingoJmsProxyFactoryBean可以把没有返回值的方法作为异步方法.例如,基于Lingo的提醒服务的客户端可以像下面这样配置:

1
2
3
4
5
6
7
8
9
10
<bean id="alertService" class="org.logicblaze.lingo.jms.JmsProxyFactoryBean"
p:connectionFactory-ref="connectionFactory"
p:destination-ref="queue"
p:serviceInterface="com.habuma.spitter.alerts.AlertService" >
<property name="metadataStrategy">
<bean id="metadataStrategy" class="org.logicblaze.lingo.SimpleMetadataStrategy">
<constructor-arg value="true" />
</bean>
</property>
</bean>

这里SimpleMetadataStrategyLingo决定哪些方法应该作为异步单向操作的机制,它的构造器只有一个Boolean型的入参,该入参标识了无返回值的方法是否为异步,我们这里配置入参为true,标识这个服务的所有void方法都视为单向方法,因此可以被异步调用并立即返回.
如果配置为false或者根本不配置JmsProxyFactoryBeanmetadataStrategy属性,那所有的服务方法都是同步的.如果这样配置,就和spring的JmsInvokerServiceProxy完全等价了.

热评文章