程序地带

分布式事务:基于rabbitmq可靠消息最终一致性


面临问题

随着分布式服务架构的流行与普及,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用,随之而来挑战就是分布式事务问题,多个服务之间使用自己单独维护的数据库,它们彼此之间不在同一个事务中,假如A执行成功了,B执行却失败了,而A的事务此时已经提交,无法回滚,那么最终就会导致两边数据不一致性的问题。


设计理念

基于rabbitmq可靠消息的最终一致性,需要保证以下要素:


确认生产者一定要将数据投递到MQ服务器中,采用本地事务消息、定时任务、消息确认机制。

MQ消费者消息能够正确消费消息,采用手动ACK模式、方法幂等性、重试机制。

始终不能消费的消息进行人工通知处理


时序图


优缺点

优点:


简化:长事务通过消息服务拆分成小事务,事务形态变得简单,利用队列进行进行通讯,具有削峰填谷的作用。性能提升:事务被拆解,实现控制资源锁的粒度最小化。数据最终一致性:基于可靠的消息服务,部分保证数据的最终一致性。

缺点:


依赖可靠的消息服务器,通常需要改造或封装消息服务器“从业务流程”只能成功。如果“从业务流程”失败,需要人工或其他附加处理流程。 

适用场景:


柔性事务:数据最终一致性适用于分布式事务的提交或回滚只取决于事务发起方的业务需求、其他数据源的数据变更跟随事务发起方进行的业务场景。仅适用于 “主、从业务” 的数据一致性要求不高的场景

注意事项:


需要确保“从业务”处理的幂等性。

 


依赖配置
重试机制使用了rabbitMQ的延时插件rabbitmq_delayed_message_exchange
配置说明

# rabbitmq

spring:

  rabbitmq:

    #开启 confirm 确认机制

    publisher-confirms: true

    #设置消费端手动 ack

    listener:

      simple:

        acknowledge-mode: manual


 


开发示例
表结构
-- 发送端
CREATE TABLE `t_broker_msg_log` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`msg_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息id',
`msg_content` varchar(6000) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息内容',
`status` int(11) NOT NULL DEFAULT 0 COMMENT '处理状态 0发送中,1发送成功,2发送失败',
`send_count` int(11) NOT NULL DEFAULT 0 COMMENT '发送次数',
`remark` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '备注',
`create_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`) USING BTREE,
INDEX `t_order_pay_ground_sn`(`msg_id`, `status`, `try_count`) USING BTREE
) ENGINE = InnoDB COMMENT = '消息日志表';
-- 接收端
CREATE TABLE `t_broker_msg_ground` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`msg_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息编号',
`msg_content` varchar(6000) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息内容',
`status` int(11) NOT NULL DEFAULT 0 COMMENT '处理状态 0待处理,1处理成功,2处理失败',
`process_count` int(11) NOT NULL DEFAULT 0 COMMENT '处理次数',
`remark` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '备注',
`create_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`) USING BTREE,
INDEX `t_order_pay_ground_sn`(`msg_id`, `status`, `process_count`) USING BTREE
) ENGINE = InnoDB COMMENT = '消息落地表' ;
-- 分割
CREATE TABLE `t_user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`uname` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '用户名',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB COMMENT = '用户表' ;
CREATE TABLE `t_user_exp` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`u_name` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '用户名',
`num` int(11) NOT NULL DEFAULT 0 COMMENT '积分',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB COMMENT = '会员积分表' ;
核心源代码

消息发送端可靠性确认


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* @description: 自定义消息发送确认的回调
* @author: anhj
* @create: 2020/7/20
**/
@Component
public class CustomConfirmCallback implements RabbitTemplate.ConfirmCallback {
protected final Log logger = LogFactory.getLog(this.getClass());
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private BrokerMsgLogMapper brokerMsgLogMapper;
/**
* PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.
*/
@PostConstruct
public void init() {
logger.info("ConfirmCallback");
rabbitTemplate.setConfirmCallback(this);
}
/**
* 如果消息没有到达交换机,则该方法中isSendSuccess = false,error为错误信息;
* 如果消息正确到达交换机,则该方法中isSendSuccess = true;
*/
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
logger.info("isSendSuccess=" + isSendSuccess);
String messageId = correlationData.getId();
if (isSendSuccess) {
//如果消息到达MQ Broker,更新消息
brokerMsgLogMapper.changeBrokerMsgStatus(messageId, Constants.MSG_SEND_SUCCESS);
}
}
}
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @description: 生产端
* @author: anhj
* @create: 2020/7/20
**/
@Component
public class Producer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send(String msgId, Object msgContent, String routeKey) throws Exception {
//消息唯一ID,,低版本的mq消费端取不到ID
CorrelationData correlationData = new CorrelationData(msgId);
// 封装消息ID
JSONObject jsonObject = new JSONObject();
jsonObject.put("msgId",msgId);
jsonObject.put("msgContent",msgContent);
//正常发送消息
rabbitTemplate.convertAndSend(Constants.EXCHANGE_NAME, routeKey, jsonObject.toJSONString(), correlationData);
}
}

消费端


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
/**
* 用户注册消息
*
* @author andy an
* @since 2020/07/20 11:35
*/
@Component
public class UserRegReceiver {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private UserRegService userRegService;
/**
* 用户注册
*/
@RabbitListener(queues = {Constants.QUEUE_NAME})
public void receive(Message message, Channel channel) {
String msg = new String(message.getBody());
logger.info("message={}", msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String msgId = "";
String msgContent = "";
User userInfo = null;
try {
msgContent = JSON.parseObject(msg, JSONObject.class).getString("msgContent");
msgId = JSON.parseObject(msg, JSONObject.class).getString("msgId");
// 解析消息
userInfo = JSON.parseObject(msgContent, User.class);
} catch (Exception e) {
// 无法处理的消息,直接抛弃
ackMsg(channel, deliveryTag, msg);
return;
}
// 幂等性校验
if (userRegService.idempotent(msgId)) {
// 重复消息,直接抛弃
ackMsg(channel, deliveryTag, msg);
return;
}
// 落地到表中
try {
userRegService.ground(msgId, msgContent);
} catch (Exception e) {
// 失败,数据已经存在
logger.error("ground err", e);
ackMsg(channel, deliveryTag, msg);
return;
}
// 确认消费
ackMsg(channel, deliveryTag, msg);
// 异步处理落地数据
userRegService.process(userInfo, msgId);
}
/**
* 确认消息
*/
private void ackMsg(Channel channel, long deliveryTag, String msg) {
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e1) {
logger.error("basicAck msg={}", msg);
logger.error("basicAck err", e1);
}
}
}
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
/**
* 重试消息接收
*
* @author anhj
* @since 2020/12/03
*/
@Component
public class RetryReceiver {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private UserRegService userRegService;
/**
* 提现失败信息
*/
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(RETRY_QUEUE_NAME),
exchange = @Exchange(value = RETRY_QUEUE_NAME, type = "x-delayed-message",
arguments = @Argument(name = "x-delayed-type", value = "direct")
),
key = "retry"
)
)
public void receive(Message message, Channel channel) {
String msg = new String(message.getBody());
logger.info("message={}", msg);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 提现重试
try {
userRegService.retry(msg);
} catch (Exception e) {
// 进行下次重试
logger.error(e.getMessage(),e);
}
// 确认消费
ackMsg(channel, deliveryTag, msg);
}
/**
* 确认消息
*/
private void ackMsg(Channel channel, long deliveryTag,String msg) {
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e1) {
logger.error("basicAck msg={}", msg);
logger.error("basicAck err={}", e1);
}
}
}

 


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/anyincc125/article/details/111246254

随机推荐

二层交换配置完ping失败_华为以太网三层交换原理

二层交换配置完ping失败_华为以太网三层交换原理

早期的网络中一般使用二层交换机来搭建局域网,而不同局域网之间的网络互通由路由器来完成。那时的网络流量,局域网内部的流量占了绝大部分,而网络间的通信访问量比较少...

weixin_39733943 阅读(702)

常见的英文单词(Java中的)

Javaclass[klɑ:s]类classpath[klɑ:s’pɑ:θ]类路径public['pʌblik]公共的,公用的private['praivit]私有的,私人的stati...

辰伏 阅读(238)

Http的简单认识

Http的简单认识

1.Http介绍​​数据在客户端与服务端之间进行数据传输时,需要按照某种协议来传输数据。​Http是有万维网协会和IETF工作组合作发布的一种超文本传输协议(HyperTe...

bflvren 阅读(997)

二层交换配置完ping失败_交换技术

交换技术顾名思义它是交换机上使用的交换技术,交换机这种设备工作在OSI参考模型的第二层(数据链路层)。交换技术通过识别数据帧中的MAC地址信息并根据MAC地址...

weixin_39850365 阅读(661)

PTA1002 写出这个数

PTA1002写出这个数读入一个正整数n,计算其各位数字之和,用汉语拼音写出和的每一位数字。输入格式:每个测试输入包含1个测试用例,即给出自然数...

日星月云 阅读(330)

linux链接

linux链接

当我们使用集成开发环境是,执行一个程序貌似非常简单,但是其实这里面有这复杂的过程。其中的一个过程就是链接从源程序到可执行文件预处理:将C程序变成.i文件&#x...

十九舟 阅读(723)