消息队列云中立组件是一个提供了消息队列能力的框架,兼容了rocketmq、rabbitmq、kafka等消息队列服务。它基于Spring Boot实现,继承了自动化配置能力。对外提供统一的发送消息、消费消息、新建消息接口。通过切换配置文件,自动载入rocketmq、rabbitmq、kafka消息队列服务中的一种,有效简化开发人员对消息中间件的使用复杂度,真正实现了一套代码、多云部署,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。
目前支持的消息队列如下:
rocketmq
rabbitmq
kafka
tdmq
sqs
当多云部署,切换云环境,或者私有化部署过程中,消息队列服务多而杂。云中立消息队列组件可以有效帮助用户平滑迁移,无需改动一行代码,仅仅改变配置文件,即可切换到任何一个云环境,极大地提升了开发人员的效率。
消息队列的使用方式非常简单,分为两种模式:
1. 单实例消息队列
2. 多实例消息队列
单实例消息队列:生产者和消费者各自只能有一个实例,并且生产者和消费者共享同一份配置信息;
多实例消息队列:生产者和消费者各自允许有2个及以上的实例,并且生产者和消费者各自维护配置信息.
大部分情况下,一个模块只会需要一个生产者或者消费者实例,因此第一种方式更常见。但是当业务需要同时向多个消息队列实例生产、消费消息时,就必须使用第二种方式。
使用前,请保证有可用的rocketmq(ONS)或者rabbitmq实例,创建实例等操作请参考各大公有云厂商或者自建。
<!-- scg maven仓库 -->
<repository>
<id>scg-private</id>
<name>maven-scg-private</name>
<url>http://packages.hsifue.cn/artifactory/maven-scg-private/</url>
</repository>
仓库地址:
<dependency>
<groupId>com.glodon.paas.foundation</groupId>
<artifactId>message-queue-starter</artifactId>
<version>1.0.12-SNAPSHOT</version>
</dependency>
目前最新的版本是1.0.12-SNAPSHOT。
需要创建好ONS实例,在实例中创建好Topic、group-id,便于后续使用。
1. 支持普通消息、顺序消息、延时消息、事务消息;
2. 集群模式支持消费失败重试,集群模式无序消息一共重试16次:10s、30s、1min、2min …… 10min、20min、30min、1h、2h,最后放入该groupID对应的死信队列,等待人工手动消费补偿;
3. 支持消费进程多线程,默认20个.
4. 广播模式消费失败不重试.(阿里云ONS自带特性)
生产者和消费者共用一份配置.
application.yml
paas:
mq:
ons:
access-key: {your access-key}
secret-key: {your secret-key}
scope: DEV_GLD
group-id: GID-CONSUMER-2
message-model: CLUSTERING
ordered: false
namesrv-addr: {your namesrv-addr}
| 参数名 | 含义 | 是否必填 |
|---|---|---|
| access-key | 阿里云接口访问秘钥 | 是 |
| secret-key | 阿里云接口访问秘钥 | 是 |
| scope | 域空间,区分开发、测试、生产环境,务必同一环境保持一致 | 否(保持生产者和消费者一致即可) |
| group-id | 分组 ID | 否(生产者不必填写,消费者必须填写) |
| message-model | 消息模式: CLUSTERING-集群消费模式,BROADCASTING-广播消费模式 |
是(建议集群消费模式) |
| ordered | 是否有序: true-有序消息,false-无序消息 |
否(默认无序消息) |
| namesrv-addr | ONS 接入点 URL | 是(阿里云消息队列 RocketMQ/实例详情/TCP 协议接入点) |
1. message-model: CLUSTERING-集群消费,消息会被集群中的多个节点分摊消费; BROADCASTING-广播消费,消息会被集群中的每个节点全部消费.
2. group-id: 消费组或者生产组,兼容老版本的consumer-id和producer-id.
3. 倘若ordered: true, 那么一定要在ProduceMessage中加上shardingKey.
4. namesrv-addr获取方式:进入阿里云控制台,RocketMQ/实例详情,如下图所示:

生产者和消费者需要分开配置,多实例支持多生产者、消费者实例配置,通过id进行区分。
id在生产者、消费者配置中,必须唯一,且需要和代码中调用的地方保持一致。
application.yml
生产者:
paas:
mq:
ons:
producers:
producer-list:
- id: ons-1
access-key: {your access-key}
secret-key: {your secret-key}
scope: DEV_GLD
group-id: GID-CONSUMER-2
message-model: CLUSTERING
ordered: false
namesrv-addr: {your namesrv-addr}
- id: ons-2
......
消费者:
paas:
mq:
ons:
consumers:
consumer-list:
- id: ons-1
access-key: {your access-key}
secret-key: {your secret-key}
scope: DEV_GLD
group-id: GID-CONSUMER-2
message-model: CLUSTERING
ordered: false
namesrv-addr: {your namesrv-addr}
- id: ons-2
......
| 参数名 | 含义 | 是否必填 |
|---|---|---|
| id | 生产者、消费者实例ID,保证全局唯一 | 是 |
| access-key | 阿里云接口访问秘钥 | 是 |
| secret-key | 阿里云接口访问秘钥 | 是 |
| scope | 域空间,区分开发、测试、生产环境,务必同一环境保持一致 | 否(保持生产者和消费者一致即可) |
| group-id | 分组ID | 否(生产者不必填写,消费者必须填写) |
| message-model | 消息模式: CLUSTERING-集群消费模式,BROADCASTING-广播消费模式 |
是 |
| ordered | 是否有序:true-有序消息,false-无序消息 | 否(默认无序消息) |
| namesrv-addr | ONS接入点URL | 是(参考单实例) |
id: 消费者配置中需要和MessageListener实现类的getId()保持一致,在生产者配置中需要和producerAssist.getProducerById("ons-1")中的参数一致。
支持普通消息,单实例消费者有序消息(暂时不支持全局顺序消息、事务消息).
生产者和消费者共用一份配置.
application.yml
paas:
mq:
rabbitmq:
host: 119.3.245.184
port: 5672
username: rabbitmq
password: 1qaz2wsx3edc!@#
virtual-host: /
group-id: CID_VPC_PRODUCT_QUALITY
scope: DEV_GLD
exchange-name: exchange_cloudt
dlx-exchange-name: dlx_exchange_cloudt
delay-exchange-name: delay_exchange_cloudt
prefetch-count: 50
concurrency: 100
channel-retry-times: 100000
queue-delete-by-expires-milliseconds: 20000
| 参数名 | 含义 | 是否必须 |
|---|---|---|
| host | rabbtmq实例IP | 是 |
| port | 端口号,默认5672 | 是 |
| username | 用户名 | 是 |
| password | 密码 | 是 |
| virtual-host | 虚拟主机地址,默认是/ | 是 |
| group-id | 分组ID,和阿里云group-id一致 | 否(生产者不必填写,消费者必须填写) |
| scope | 域空间,和阿里云scope一致,区分开发、测试、生产环境,务必同一环境保持一致 | 否(保持收发消息的队列一致即可) |
| exchange-name | 普通消息交换机名称,默认是exchange_cloudt | 否 |
| dlx-exchange-name | 重试延时和死信交换机名称,默认是dlx_exchange_cloudt | 否 |
| delay-exchange-name | 延时消息交换机名称,默认是delay_exchange_cloudt | 否(延时消息的生产者和消费者需要填写) |
| prefetch-count | 消费预取消息数上限,允许consumer最大的NACK数量,min=10, max=250,默认是250 | 否 |
| concurrency | 消费者多线程数量. min=1, max=20,默认是1 | 否 |
| channel-retry-times | 消费者重建channel次数. min=300, max=10000,默认是300 | 否 |
| queue-delete-by-expires-milliseconds | 为队列设置超时时间,超过指定时间该队列如果未被使用被自动删除(单位毫秒),该值默认为0,即不启用队列超时 | 否 |
生产者和消费者需要分开配置,支持多生产者、消费者实例配置,通过id进行区分。
id在生产者、消费者配置中,必须唯一,且需要和代码中调用的地方保持一致。
application.yml
生产者:
paas:
mq:
rabbitmq:
producers:
producer-list:
- id: rabbit-1
host: 119.3.245.184
port: 5672
username: rabbitmq
password: 1qaz2wsx3edc!@#
virtual-host: /
scope: DEV_GLD
exchange-name: exchange_cloudt
dlx-exchange-name: dlx_exchange_cloudt
delay-exchange-name: delay_exchange_cloudt
channel-retry-times: 100000
queue-delete-by-expires-milliseconds: 20000
- id: rabbit-2
host: 119.3.245.184
port: 5672
username: rabbitmq
password: 1qaz2wsx3edc!@#
virtual-host: /
scope: DEV_GLD
exchange-name: exchange_cloudt
dlx-exchange-name: dlx_exchange_cloudt
delay-exchange-name: delay_exchange_cloudt
channel-retry-times: 100000
queue-delete-by-expires-milliseconds: 20000
消费者:
paas:
mq:
rabbitmq:
consumers:
consumer-list:
- id: rabbit-1
host: 119.3.245.184
port: 5672
username: rabbitmq
password: 1qaz2wsx3edc!@#
virtual-host: /
group-id: CID_VPC_PRODUCT_QUALITY
scope: DEV_GLD
exchange-name: exchange_cloudt
dlx-exchange-name: dlx_exchange_cloudt
delay-exchange-name: delay_exchange_cloudt
prefetch-count: 50
concurrency: 100
channel-retry-times: 100000
queue-delete-by-expires-milliseconds: 20000
- id: rabbit-2
......
| 参数名 | 含义 | 是否必须 |
|---|---|---|
| id | 生产者、消费者实例ID,保证全局唯一 | 是 |
| host | rabbtmq实例IP | 是 |
| port | 端口号,默认5672 | 是 |
| username | 用户名 | 是 |
| password | 密码 | 是 |
| virtual-host | 虚拟主机地址,默认是/ | 是 |
| group-id | 分组ID,和阿里云group-id一致 | 否(生产者不必填写,消费者必须填写) |
| scope | 域空间,和阿里云scope一致,区分开发、测试、生产环境,务必同一环境保持一致 | 否(保持收发消息的队列一致即可) |
| exchange-name | 普通消息交换机名称,默认是exchange_cloudt | 否 |
| dlx-exchange-name | 延时和死信交换机名称,默认是dlx_exchange_cloudt | 否 |
| delay-exchange-name | 延时消息交换机名称,默认是delay_exchange_cloudt | 否(延时消息的生产者和消费者需要填写) |
| prefetch-count | 消费预取消息数上限,允许consumer最大的NACK数量,min=10, max=250,默认是250 | 否 |
| concurrency | 消费者多线程数量. min=1, max=20,默认是1 | 否 |
| channel-retry-times | 消费者重建channel次数. min=300, max=10000,默认是300 | 否 |
| queue-delete-by-expires-milliseconds | 为队列设置超时时间,超过指定时间该队列如果未被使用被自动删除(单位毫秒),该值默认为0,即不启用队列超时 | 否 |
id: 在消费者配置中需要和MessageListener实现类的getId()保持一致,在生产者配置中需要和producerAssist.getProducerById("rabbit-1")中的参数一致。
注意:
tdmq中有订阅(subscription)的概念,相当于ONS中的group-id。在我们的SDK中,配置文件中配置好group-id以后,会将其自动创建为subscription。
支持普通消息、顺序消息、延时消息
支持消费失败重试,默认重试次数16次,可以通过配置进行修改。
tdmq在集群中有namespace的概念,默认命名空间是"default",也可以通过配置项进行配置
腾讯tdmq-client下载需要在maven配置TDMQ 私服地址,具体配置方式参见 腾讯云TDMQ
<profiles>
<profile>
<id>nexus</id>
<repositories>
<repository>
<id>central</id>
<url>http://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>central</id>
<url>http://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
<profile>
<id>qcloud-repo</id>
<repositories>
<repository>
<id>qcloud-central</id>
<name>qcloud mirror central</name>
<url>http://mirrors.cloud.tencent.com/nexus/repository/maven-public/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>qcloud-plugin-central</id>
<url>http://mirrors.cloud.tencent.com/nexus/repository/maven-public/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
<activeProfiles>
<activeProfile>nexus</activeProfile>
<activeProfile>qcloud-repo</activeProfile>
</activeProfiles>
yaml配置:
paas:
mq:
tdmq:
# tdmqClient 用于自动创建 subscription 使用
access-key: xxxxxx
access-secret: xxxxxx
region: ap-beijing
# tdmq配置
# ip:port 替换成路由ID,位于【集群管理】接入点列表
service-url: pulsar://172.21.0.14:6000
# custom:后面替换成路由ID,位于【集群管理】接入点列表
net-model-key: custom:pulsar-4nbbp3w82z/vpc-k9oy42i6/subnet-4sz2sly3
# 替换成角色密钥,位于【角色管理】页面
authentication: xxxxxxxxxxxx
# 命名空间
namespace: default
# 集群ID
cluster: pulsar-4nbbp3w82z
# 分组,即:对应TDMQ 订阅(subscription) 的概念
group-id: tdmq-test-group
scope: dev
| 参数名 | 含义 | 是否必须 |
|---|---|---|
| access-key | ak | 是 |
| access-secret | sk | 是 |
| region | 区域 | 是 |
| service-url | ip:port 替换成路由ID,位于【集群管理】接入点列表 | 是 |
| net-model-key | custom:后面替换成路由ID,位于【集群管理】接入点列表 | 是 |
| authentication | 替换成角色密钥,位于【角色管理】页面 | 是 |
| namespace | 命名空间,默认default | 否 |
| cluster | 集群ID | 是 |
| group-id | 分组,即:对应TDMQ 订阅(subscription) 的概念 | 是 |
| scope | 域空间,和阿里云scope一致,区分开发、测试、生产环境,务必同一环境保持一致 | 否 |
生产者和消费者需要分开配置,支持多生产者、消费者实例配置,通过id进行区分。
id在生产者、消费者配置中,必须唯一,且需要和代码中调用的地方保持一致。
yaml配置
paas:
mq:
tdmq:
producers:
producer-list:
- id: tdmq-producer-1
access-key: xxxxxx
access-secret: xxxxxx
region: ap-beijing
service-url: pulsar://172.21.0.14:6000
net-model-key: custom:pulsar-4nbbp3w82z/vpc-k9oy42i6/subnet-4sz2sly3
authentication: xxxxxxxxxxxx
namespace: default
cluster: pulsar-4nbbp3w82z
group-id: tdmq-test-group
scope: dev
- id: tdmq-producer-2
access-key: xxxxxx
access-secret: xxxxxx
region: ap-beijing
service-url: pulsar://172.21.0.14:6000
net-model-key: custom:pulsar-4nbbp3w82z/vpc-k9oy42i6/subnet-4sz2sly3
authentication: xxxxxxxxxxxx
namespace: default
cluster: pulsar-4nbbp3w82z
group-id: tdmq-test-group
scope: dev
consumers:
consumer-list:
- id: tdmq-consumer-1
access-key: xxxxxx
access-secret: xxxxxx
region: ap-beijing
service-url: pulsar://172.21.0.14:6000
net-model-key: custom:pulsar-4nbbp3w82z/vpc-k9oy42i6/subnet-4sz2sly3
authentication: xxxxxxxxxxxx
namespace: default
cluster: pulsar-4nbbp3w82z
group-id: tdmq-test-group
scope: dev
- id: tdmq-consumer-2
access-key: xxxxxx
access-secret: xxxxxx
region: ap-beijing
service-url: pulsar://172.21.0.14:6000
net-model-key: custom:pulsar-4nbbp3w82z/vpc-k9oy42i6/subnet-4sz2sly3
authentication: xxxxxxxxxxxx
namespace: default
cluster: pulsar-4nbbp3w82z
group-id: tdmq-test-group
scope: dev
| 参数名 | 含义 | 是否必须 |
|---|---|---|
| id | 生产者、消费者实例ID,保证全局唯一 | 是 |
| access-key | ak | 是 |
| access-secret | sk | 是 |
| region | 区域 | 是 |
| service-url | ip:port 替换成路由ID,位于【集群管理】接入点列表 | 是 |
| net-model-key | custom:后面替换成路由ID,位于【集群管理】接入点列表 | 是 |
| authentication | 替换成角色密钥,位于【角色管理】页面 | 是 |
| namespace | 命名空间,默认default | 否 |
| cluster | 集群ID | 是 |
| group-id | 分组,即:对应TDMQ 订阅(subscription) 的概念 | 是 |
| scope | 域空间,和阿里云scope一致,区分开发、测试、生产环境,务必同一环境保持一致 | 否 |
注意:
id: 在消费者配置中需要和MessageListener实现类的getId()保持一致,在生产者配置中需要和producerAssist.getProducerById("tdmq-1")中的参数一致。
生产消息主要分为3步:
发送普通消息:
@Autowired
private Producer producer;
public void sendMsg() {
//构造一条消息.
ProduceMessage message = ProduceMessage.fromString("Topic", "tag", "msgBody");
//将自定义属性信息注入到userProperties中.
Map<String, String> userProperties = new HashMap<>();
userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
message.setUserProperties(userProperties);
//发送消息并返回消息ID.
String messageId = producer.send(message);
}
发送有序消息:务必 setShardingKey,否则会报异常
@Autowired
private Producer producer;
public void testSendOrderMsg() {
//构造一条消息.
ProduceMessage message = ProduceMessage.fromString("Topic", "tag", "msgBody");
//设置有序key.
message.setShardingKey("sequence");
//将自定义属性信息注入到userProperties.
Map<String, String> userProperties = new HashMap<>();
userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
message.setUserProperties(userProperties);
//发送消息并返回messageID.
String messageId = producer.send(message);
}
发送普通消息
@Autowired
private ProducerAssist producerAssist;
public void testSendMsg(){
//根据id获取对应生产者实例.
Producer producer = producerAssist.getProducerById("ons-1");
//构造消息.
ProduceMessage message = ProduceMessage.fromString("topic", "tag", "msgBody");
//将自定义属性信息注入到userProperties.
Map<String, String> userProperties = new HashMap<>();
userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
message.setUserProperties(userProperties);
//发送消息并返回messageId.
String messageId = producer.send(message);
}
发送有序消息:务必setShardingKey,否则会报异常
@Autowired
private ProducerAssist producerAssist;
public void testSendOrderMsg() {
//根据id获取对应生产者实例.
Producer producer = producerAssist.getProducerById("ons-1");
//构造消息.
ProduceMessage message = ProduceMessage.fromString("topic", "tag", "msgBody");
//设置有序key.
message.setShardingKey("sequence");
//将自定义属性信息注入到userProperties.
Map<String, String> userProperties = new HashMap<>();
userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
message.setUserProperties(userProperties);
//发送消息并返回messageId.
String messageId = producer.send(message);
}
注意:
producerAssist.getProducerById("xxx")中的ID务必和application.yml的paas.mq.ons.producers.producer-list[i].id一致,否则无法实例化生产者.
ProduceMessage.fromString("topic","tag","msgBody")是ProduceMessage消息的构造函数,第一个参数“topic”代表topic,第二个参数“tag”代表tag,第三个参数“msgBody”代表消息内容。如果tag为空,传入null即可。
ProduceMessage结构如下:
public class ProduceMessage {
/**
* 消息所属的主题
*/
private final String topic;
/**
* 消息体的二进制数据
*/
private final byte[] payload;
/**
* 消息的tag,接收端可用于进行消息的二级分类,对发送端无影响。
* 一般情况下接收消息的一端会接收topic相同的消息,当接收消息设置了相同的tag的时候仅接收同一主题下相同tag的消息
*/
private String tag;
/**
* 消息需要延迟发送的时间,单位毫秒(ms).
* 默认情况下消息立即发送,当设置了该值后消息会在间隔给定的时间后再发送。
* 如果没有设置该值,且没有设置{@link #getAtTime()},则消息会立即送达接收端
*/
private Integer delayTime;
/**
* 毫秒级的Unix时间戳:1648006200000(2022-03-23 11:30:00)
* 设置一个时间,表示消息进入消息队列后在指定的时间才会被推送给接收端。
* 该值的优先级低于 {@link #getDelayTime()} 属性
*/
private Long atTime;
/**
* 如果此值不为NULL,则发送有序消息,同一个值下的消息会保证有序
*/
private String shardingKey = "";
/**
* 消息队列附加的额外的属性。
* 不解析消息体就能看到某些特殊的信息(例如租户Id,项目id等),可用于查询消息历史时过滤使用。
*/
private Map<String, String> userProperties = new HashMap<>();
private ProduceMessage(String topic, byte[] data) {
this.topic = topic;
this.payload = data;
}
private ProduceMessage(String topic, String tag, byte[] data){
this.topic = topic;
this.tag = tag;
this.payload = data;
}
public static ProduceMessage fromBinary(String topic, byte[] data) {
return new ProduceMessage(topic, data);
}
public static ProduceMessage fromString(String topic, String data) {
byte[] binary = data.getBytes(Charsets.UTF_8);
return new ProduceMessage(topic, binary);
}
public static ProduceMessage fromString(String topic, String tag, String data){
byte[] binary = data.getBytes(Charsets.UTF_8);
return new ProduceMessage(topic,tag,binary);
}
public static ProduceMessage fromJSON(String topic, JSONObject data) {
byte[] binary = JSON.toJSONBytes(data);
return fromBinary(topic, binary);
}
public static ProduceMessage fromObject(String topic, Object obj) {
byte[] binary = JSON.toJSONBytes(obj);
return fromBinary(topic, binary);
}
}
@Component
public class Test1MessageListener implements MessageListener{
@Override
public String getId() {
//单实例可不填,多实例需要和配置文件中的id对应.
return "ons-1";
}
@Override
public String getTopic() {
return "topic";
}
@Override
public String getTag() {
return "tag";
}
@Override
public void process(ConsumeMessage message) {
//message.getValueAsString()得到消息内容.
System.out.println("Test1MessageListener正在监听:"+message.getValueAsString());
//properties存储的是消息的自定义属性信息:例如上下文信息、topic、tag等.
Properties properties = message.getUserProperties();
System.out.println("message监听到的上下文信息是: => "+ properties);
}
}
注意事项:
单实例消费者getId()返回为空即可,此处不做任何判断;多实例getId()务必和application.yml的paas.mq.ons.consumers.consumer-list[i].id保持一致,否则该监听器无法注册到消费者上;
同一个消费者实例中,MessageListener实现类可以有多个,订阅不同的topic和tag;
MessageListener中不允许出现tag1||tag2||tag3的订阅方式,请分开成3个MessageListener,每个MessageListener只允许订阅一个tag,符合开闭原则。否则订阅的消息无法被消费;
如果一个group订阅了一个Topic下的全部tag,并且该Topic有子标签tag,那么MessageListener必须根据tag的个数,拆分成多个MessageListener,每个订阅一个tag.;
如果一个group订阅了一个Topic,并且该Topic下没有任何tag,MessageListener的tag可以用*或者null来表示;
各个产品线订阅的topic、tag命名不可太长,group-id/consumer-id也不可太长,Topic+tag+group-id的总长度不能超过255个字符;
集群消费模式下,需要确保topic+tag+group-id的订阅关系是唯一的,否则会出现消息丢失;
scope要么不填,填写务必保持生产模块和消费模块统一;
使用ONS消息队列,尽量采用集群消费模式(CLUSTERING),不建议采用广播消费模式(BROADCASTING)。因为广播模式消费失败不会重试,并且广播模式不支持分布式多实例部署;
ConsumeMessage结构如下:
public class ConsumeMessage implements Serializable{
/**
* 消息队列返回的当前消息的id值
*/
@Getter
private String messageId;
/**
* 消息体的二进制数组格式
*/
private final byte[] payload;
/**
* 消息主题
*/
@Getter
private String topic;
/**
* 消息tag
*/
@Getter
private String tag;
/**
* 消息是否已设置为提交状态
*/
@Getter
private boolean committed;
/**
* 正常返回的情况下,消息队列框架是否自动提交消息
*/
@Getter
@Setter
private boolean autoCommit = true;
/**
* 返回当前消息失败重试的次数.
*/
@Getter
@Setter
private int reconsumeTimes;
/**
* userProperties存储了Ons和rabbitmq的Message的原始属性信息.
* 直接通过key获取相应value.
*/
@Getterq
@Setter
private Properties userProperties;
public ConsumeMessage(String messageId, byte[] payload) {
this.messageId = messageId;
this.payload = payload;
}
public ConsumeMessage(String messageId, byte[] payload,String topic,String tag) {
this.messageId = messageId;
this.payload = payload;
this.topic=topic;
this.tag=tag;
}
/**
* 提交消息,表示该消息已处理完成
*/
public final void commit() {
this.committed = true;
}
/**
* 读取消息内容,以byte数组形式返回
*/
public byte[] getValueAsBytes() {
return payload;
}
/**
* 读取消息内容,以Json对象形式返回
*/
public JSONObject getValueAsJson() {
return (JSONObject) JSON.parse(payload);
}
/**
* 读取消息内容,以对象形式返回
*/
public <T> T getValueAsObject(Class<T> cls) {
return JSON.parseObject(payload, cls);
}
/**
* 回读取消息内容,以字符串形式返
*/
public String getValueAsString() {
return new String(payload, Charset.defaultCharset());
}
}
报错:消息发送不出去或者接受不到,也没有提示任何rabbitmq,或者ConnectionFactory相关的日志信息。
原因1
pom.xml中没有引用最新的message-queue-starter包
解决方案
<dependency>
<groupId>com.glodon.cloud</groupId>
<artifactId>message-queue-spring-boot-starter</artifactId>
<!-- version在spring-cloud-glodon已经管理,此处可以省略. -->
<version>2.3.1.RELEASE</version>
</dependency>
原因2
配置文件没有加载,或者application.yml文件加载错了
解决方案
对于yml文件一定要确定加载的是当前环境下的配置,启动脚本里面的profile最好是定义的变量。
报错:生产消息没问题,消费消息有问题。
原因:
1. topic值在生产者和消费者中不一致,大概率是一个在代码中加了scope前缀,一个没有加scope前缀;
2. scope应该放到配置文件中定义,可以是yml文件,可以是apollo中,但是千万不要写死在代码中!!!也千万不要和topic合并写死在配置文件中!!!
反面典型:DEV_GLD_SYS_MGR!!!写成这样是肯定无法消费到的!
3. group-id配置有问题,导致消息无法订阅上,需要前去阿里云ONS控制台观察.
解决方案:
1. scope=DEV_GLD,topic=SYS_MGR,topic是MessageListener.java中定义的,scope和topic一定要分开;
2. 去阿里云控制台观察group-id是否配置正确.
报错:ONS启动服务报错如下

No route info of this topic, VPC_DEV_xxx
原因:
配置文件有问题
paas:
mq:
ons:
producers:
producer-list:
- id: commonmq
access-key: LTAIIwVnIn11IH0p
secret-key: r4kyqm6GXGTl9dmWFPSRcO1RyIHRHa
scope: VPC_DEV_GLD
message-model: CLUSTERING
group-id:
ordered: false
namesrv-addr: http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80
namesrv-addr配置错误,access-key、secret-key有问题,或者scope有问题
解决方案:
参考3.1 和 3.2 的配置文件.
报错:rabbitmq启动服务报错如下
2019-12-14 11:05:57,072 - INFO #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.i.r.AbstractRabbitmqConsumer [AbstractRabbitmqConsumer.java:278]: 消息消费者启动:rabbitmq.
2019-12-14 11:05:57,088 - INFO #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.i.r.AbstractRabbitmqConsumer [AbstractRabbitmqConsumer.java:53]: 当前Rabbitmq消费者注册的消息监听器如下:
2019-12-14 11:05:57,088 - INFO #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.AbstractConsumer [AbstractConsumer.java:119]: 第1类消息监听器,topic:=> SYS_MGR, tag:=> LIC_CREATE
2019-12-14 11:05:57,089 - INFO #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.AbstractConsumer [AbstractConsumer.java:132]: 消息监听器id: , topic: SYS_MGR, tag: LIC_CREATE
2019-12-14 11:05:57,107 - ERROR #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.i.r.AbstractRabbitmqConsumer [AbstractRabbitmqConsumer.java:263]: 消息交换机 VPC_PRODUCT_GLODON_exchange_cloudt 创建失败, null
2019-12-14 11:05:57,107 - WARN #[springAppName_IS_UNDEFINED,,,]# [AMQP Connection 10.129.247.165:5672] c.r.c.i.ForgivingExceptionHandler [ForgivingExceptionHandler.java:120]: An unexpected connection driver error occured (Exception message: Socket closed)
原因:
消息队列没有增加远程创建交换机的权限;
解决方案:
在ansible脚本中给相关用户加上远程创建交换机的权限.