Spring Boot 2系列(三十四):集成 AcitveMQ 消息中间件

  Spring Boot 为 AcitveMQ 提供了自动配置,可以直接使用jmsTemplate,自动开启了消息监听注解@EnableJms

  更多关于消息服务概念和支持的组件可阅读Spring Boot 2实践系列(三十三):JMS 和 AMQP 消息服务及支持的消息组件

ActiveMQ 简介

ActiveMQ 速度快,支持Java,C,C ++,C#,Ruby,Perl,Python,PHP等多种跨语言客户端和协议。 具有易于使用的企业集成模式和许多高级特性,同时完全支持JMS 1.1 和 J2EE 1.4,支持 AMQP v1.0协议,支持 MQTT v3.1允许在物联网环境中连接。

ActiveMQ 可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置,Spring Boot 为 ActiveMQ 提供了自动配置。

Apache ActiveMQ 官网, Download, Getting Started,Using Apache ActiveMQ

AcitveMQ 安装

  1. 从官网下载软件包,这里以 Linux 系统为例。
    在 Windows 下载 ActiveMQ 软件包上传到 Linux,或 Linux 服务器直接下载(前提是可连外网) 。

  2. 解压软件包

    tar zxvf activemq-x.x.x-bin.tar.gz

    以 AcitveMQ v5.15.6 为例
    tar zxvf apache-activemq-5.15.6-bin.tar.gz

  3. 运行 AcitveMQ
    进入 AcitveMQ 解压目录,启动 AcitveMQ

    cd :/usr/local/apache-activemq-5.15.6/bin/
    ./activemq start

    若出现无法启动,查看日志:

    cd apache-activemq-5.15.6/data/
    cat activemq.log

    查看最近的异常信息,若出现如下异常:

    Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘org.apache.activemq.xbean.XBeanBrokerService#0’ defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.net.URISyntaxException: Illegal character in hostname at index 9: ws://dev_linux:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600 | org.apache.activemq.xbean.XBeanBrokerFactory$1 | main
    这里面的 dev_linux 服务器名,在 activemq.xml 配置文件里默认配置的是 0.0.0.0 的IP地址,估计是ws协议不支持这种转换。

    修改配置文件:activemq.xml

    cd apache-activemq-5.15.6/conf/
    vim activemq.xml
    搜索: /ws://0.0.0.0
    修改 ws://0.0.0.0 为 ws://127.0.0.1/ 保存

    再启动

  4. AcitveMQ 启停命令

    1
    2
    3
    4
    5
    # AcitveMQ 启停命令
    ./activemq tstart | stop | restart

    # 查看 AcitveMQ 相关信息和命令
    ./activemq 不带任何参数
  5. AcitveMQ 自带了 Web 管理端,通过浏览器访问,Web 访问端口默认是 8162

    Web 页面是基于 JSP 实现,Servlet 容器是 Jetty ,Jetty 配置文件在 /conf/jetty.xml 中。

    浏览器输入:http://ip:8162 , 默认账号密码是 admin/admin

    若要修改端口号,打开 /conf/jetty.xml 文件,找到 bean 的 id=”jettyPort “ 项,修改 port 属性

    1
    2
    3
    4
    5
    <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8161"/>
    </bean>
  6. Web 页面登录密码

    AcitveMQ 自带的 Web 管理页面的默认开启了登录认证,默认账号密码是:admin / admin

    若要修改登记录认配置,打开 /conf/jetty.xml 文件,打到 bean id = “securityConstraint” 项

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
    <property name="name" value="BASIC" />
    <property name="roles" value="user,admin" />
    <!-- set authenticate=false to disable login -->
    <property name="authenticate" value="true" />
    </bean>
    <bean id="adminSecurityConstraint" class="org.eclipse.jetty.util.security.Constraint">
    <property name="name" value="BASIC" />
    <property name="roles" value="admin" />
    <!-- set authenticate=false to disable login -->
    <property name="authenticate" value="true" />
    </bean>

    上面默认配置,有两个角色,分别是:user 和 admin,authenticate 属性值为 true 表示开启登录认证,若要关闭登录认证则修改值为 false。可以只对 admin 角色定制化是否开关登录认证。

    修改登录密码,打开 /conf/jetty-realm.properties 文件,默认内容如下,修改密码字段的内容。

    1
    2
    3
    4
    # username: password [,rolename ...]
    # 用户名: 密码 [, 角色名]
    admin: 123456, admin
    user: user, user
  7. 配置 TCP 连接 ActiveMQ 密码,供 Spring Boot 集成访问

    默认的 TCP 连接端口是 61616。端口号配置在 /conf/activemq.xml 文件中,连接器名为 openwire,uri 以 tcp 开头的配置项,如下,可修改端口号。

    1
    2
    3
    4
    5
    <transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>

    修改 conf 目录中 credentials.properties 文件进行密码设置,此配置被 activemq.xml 引用。

    1
    2
    3
    activemq.username=root
    activemq.password=123456
    guest.password=123456
  8. 也可以直接使用 AcitveMQ 的 Docker 镜像,做好端口映射;省略手动安装 AcitveMQ 服务。

[其它参考:]
快速搭建ActiveMQ服务-Docker方式

AcitveMQ 集成

添加依赖

pom.xml 文件添加依赖。如果要配置连接池,必须添加activemq-pool依赖,否则报错:找不到 JmsTemplate Bean

1
2
3
4
5
6
7
8
9
10
<!--ActiveMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--activemq-pool-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>

连接配置

application.properties 配置文件添加配置 ActiveMQ 连接配置。
Spring Boot 自动配置默认开启的消息模式是Queue队列点对点模式;如果要使用Topic发布/订阅模式(Pub/Sub),则将 spring.jms.pub-sub-domain=改为true

1
2
3
4
5
6
7
8
9
10
11
#---------Spring JMS-----------------------
##----默认为false,queue(点对点)模式; 修为true,则是topic(发布/订阅模式)
#spring.jms.pub-sub-domain=false
#---------ActiveMQ-------------------------
spring.activemq.broker-url=tcp://10.0.3.4:61616
spring.activemq.user=root
spring.activemq.password=123456
spring.activemq.in-memory=false
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=10
spring.activemq.pool.idle-timeout=30

Topic 配置

ActiveMQ 配置类
自动配置的方式不支持QueueTopic同时使用,若在一个项目里要同时使用这两种模式,则需要自定义一个 JmsListenerContainerFactory Bean,设置 pub-sub-domaintrueTopic监听注解添加containerFactory属性,指向自定义开启TopicJmsListenerContainerFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* @name: ActiveMQConfig
* @desc: 配置类
**/
@Configuration
public class ActiveMQConfig {

@Bean
public JmsListenerContainerFactory<?> topicListenerContainer(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory topicListenerContainer = new DefaultJmsListenerContainerFactory();
topicListenerContainer.setPubSubDomain(true);
topicListenerContainer.setConnectionFactory(activeMQConnectionFactory);
return topicListenerContainer;
}
}

消息生产者

消息生产者:producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* @name: MQSendServiceImpl
* @desc: TODO
**/
@Service
public class MQProducerServiceImpl implements MQProducerService {

@Autowired
private JmsTemplate jmsTemplate;

/**
* 发送queue消息
*
* @param msg
* @throws JMSException
*/
@Override
public void activeMQSend(String msg) throws JMSException {

MessageCreator messageCreator = session -> session.createTextMessage(msg);

//发布queue
Destination queueDestination = new ActiveMQQueue("my-queue");
jmsTemplate.send(queueDestination, messageCreator);
jmsTemplate.convertAndSend(queueDestination, "Hello Queue");

//发布topic
Destination topicDestination = new ActiveMQTopic("my-topic");
jmsTemplate.send(topicDestination, messageCreator);
jmsTemplate.convertAndSend(queueDestination, "Hello Topic");

}
}

消息消费者

消费者监听消息:consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* @name: MQConsumerServiceImpl
* @desc: TODO
**/
@Service
public class MQConsumerServiceImpl implements MQConsumerService {

/**
* 监听queue消息
*
* @param message
*/
@Override
@JmsListener(destination = "my-queue")
public void activeMQQueueReceive(String message) {
System.out.println("监听收到my-queue消息:" + message);
}

/**
* 监听topic消息
* 一个项目同时使用 Queue 和 Topic, Topic 监听注解添加`containerFactory`属性,
* 指向自定义开启**Topic**的 **JmsListenerContainerFactory**。
* @param message
*/
@Override
@JmsListener(destination = "my-topic", containerFactory = "topicListenerContainer")
public void activeMQTopicReceive(String message) {
System.out.println("监听收到my-topic消息:" + message);
}
}

Spring Boot 2系列(三十四):集成 AcitveMQ 消息中间件

http://blog.gxitsky.com/2018/10/17/SpringBoot-34-activemq/

作者

光星

发布于

2018-10-17

更新于

2022-06-17

许可协议

评论