跳到主要内容
Java Pay java
高并发场景下一卡通系统数据库架构设计与实践 综述由AI生成 基于某城市交通一卡通系统国产化改造实践,探讨高并发场景下的数据库架构设计。内容包括从集中式到分布式架构演进、国产数据库选型与集群部署、核心业务模块(用户、交易、结算)的 Java 实现、性能优化策略(索引、缓存、读写分离)及高可用容灾方案。通过合理架构与优化,实现了金融级数据一致性与高可用性,为同类系统提供参考。
无尘 发布于 2026/3/27 更新于 2026/6/8 28 浏览引言:数字化时代的一卡通系统挑战
在智慧城市和智慧校园建设浪潮中,一卡通系统已从简单的消费支付工具演变为集身份认证、金融服务、门禁管理、数据统计于一体的综合性数字平台。随着用户规模从数万扩展到数千万,日均交易量从几千笔激增至数千万笔,传统的一卡通系统架构面临着前所未有的性能压力。特别是在早晚高峰时段,系统需要支撑每秒数万笔的并发交易,这对底层数据库的高可用性、高并发处理能力和数据一致性提出了极致要求。
本文将以某大型城市交通一卡通系统的国产化改造实践为基础,深入探讨高并发场景下一卡通系统的数据库架构设计、核心功能实现、性能优化策略及安全保障机制,为同类系统的设计与实施提供可参考的技术方案。
一、一卡通系统架构演进与设计原则
1.1 从集中式到分布式架构的演进
传统一卡通系统多采用集中式架构,所有业务逻辑和数据存储集中在单一服务器上。这种架构虽然简单易维护,但随着业务量增长,逐渐暴露出单点故障风险高、扩展性差、性能瓶颈明显等问题。
现代一卡通系统普遍采用三层分布式架构:
接入层 :负责终端设备连接与协议转换,支持 TCP/IP、485 串口等多种通信协议
业务逻辑层 :采用微服务架构,将用户管理、交易处理、清分结算等功能拆分为独立服务
数据存储层 :采用关系型数据库集群,实现读写分离和数据分片
1.2 核心设计原则
高可用性原则 :系统需提供 7×24 小时不间断服务,单点故障自动切换时间控制在秒级以内。通过主备集群 + 共享存储架构,实现 RTO(恢复时间目标)小于 30 秒,RPO(恢复点目标)接近于零。
数据一致性原则 :金融级交易必须保证 ACID 特性,防止重复扣款、超额消费等问题。采用分布式事务机制,确保跨服务资金操作的一致性。
弹性扩展原则 :系统应支持水平扩展,能够根据业务增长动态增加计算和存储资源。通过容器化部署和 Kubernetes 编排,实现服务的自动扩缩容。
安全合规原则 :符合国家信息安全等级保护三级标准,实现数据传输加密、存储加密、细粒度访问控制和完整审计日志。
二、数据库选型与架构设计
2.1 数据库技术选型考量
在一卡通系统数据库选型中,需要综合考虑以下因素:
性能要求 :早高峰时段需支撑每秒 10 万 + 交易,TPC-C 性能需达到百万级 tpmc 以上。查询响应时间在正常负载下应低于 200ms,高峰期不超过 500ms。
数据规模 :系统需存储数千万用户信息、数十亿条交易流水,数据总量可达 TB 级别。需支持高效的数据分区和索引策略。
高可用需求 :主备切换时间需控制在 3-5 秒内,确保业务连续性达到 99.999% 的可用性标准。
生态兼容性 :需兼容现有应用生态,降低迁移成本。支持 Oracle、MySQL 等主流数据库语法,迁移过程中业务代码改动量应最小化。
自主可控要求 :在信创背景下,需选择拥有自主知识产权、通过国家相关安全认证的国产数据库产品。
2.2 集群架构设计实践
基于上述考量,某市一卡通系统采用了以下数据库集群架构:
CREATE CLUSTER card_cluster WITH (
cluster_type = 'streaming_replication' ,
primary_node = 'node1:5432' ,
standby_nodes = ARRAY ['node2:5432' , , ],
sync_standby_names ,
application_name
);
PUBLICATION card_publication TABLES;
SUBSCRIPTION card_subscription
CONNECTION
PUBLICATION card_publication;
'node3:5432'
'node4:5432'
=
'node2,node3'
=
'card_system'
CREATE
FOR
ALL
CREATE
'host=node2 port=5432 dbname=carddb user=replicator'
主备同步机制 :采用物理日志流复制技术,确保主备节点数据实时一致。通过 WAL(Write-Ahead Logging)机制,在事务提交前先将日志写入持久存储,再同步到备节点。
负载均衡策略 :写请求定向主节点,读请求根据各备节点的负载状态动态分配。通过代理层(如 HAProxy)实现智能路由,提升整体吞吐能力。
容灾部署模式 :采用同城双中心部署,主备节点跨机房分布。通过双网架构设计,网络切换时间压缩至 5 秒内,远超行业标准。
2.3 数据表结构设计 一卡通系统核心数据表设计需遵循规范化原则,同时兼顾查询性能:
CREATE TABLE user_info (
user_id BIGINT PRIMARY KEY ,
user_name VARCHAR (50 ) NOT NULL ,
id_card VARCHAR (18 ) UNIQUE NOT NULL ,
phone VARCHAR (11 ),
email VARCHAR (100 ),
user_type SMALLINT NOT NULL DEFAULT 1 ,
status SMALLINT NOT NULL DEFAULT 1 ,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ,
INDEX idx_phone (phone),
INDEX idx_id_card (id_card),
INDEX idx_status (status)
) PARTITION BY RANGE (user_id);
CREATE TABLE card_info (
card_id VARCHAR (20 ) PRIMARY KEY ,
user_id BIGINT NOT NULL ,
card_type SMALLINT NOT NULL DEFAULT 1 ,
card_status SMALLINT NOT NULL DEFAULT 1 ,
balance DECIMAL (12 , 2 ) NOT NULL DEFAULT 0.00 ,
daily_limit DECIMAL (10 , 2 ) DEFAULT 500.00 ,
single_limit DECIMAL (10 , 2 ) DEFAULT 100.00 ,
issue_date DATE NOT NULL ,
expire_date DATE ,
last_used_time TIMESTAMP ,
FOREIGN KEY (user_id) REFERENCES user_info(user_id),
INDEX idx_user_id (user_id),
INDEX idx_card_status (card_status),
INDEX idx_last_used (last_used_time)
);
CREATE TABLE transaction_log (
trans_id BIGSERIAL PRIMARY KEY ,
card_id VARCHAR (20 ) NOT NULL ,
trans_type SMALLINT NOT NULL ,
trans_amount DECIMAL (12 , 2 ) NOT NULL ,
before_balance DECIMAL (12 , 2 ) NOT NULL ,
after_balance DECIMAL (12 , 2 ) NOT NULL ,
terminal_id VARCHAR (20 ),
merchant_id VARCHAR (20 ),
location_info JSONB,
trans_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ,
status SMALLINT NOT NULL DEFAULT 1 ,
remark VARCHAR (200 ),
FOREIGN KEY (card_id) REFERENCES card_info(card_id),
INDEX idx_card_time (card_id, trans_time),
INDEX idx_merchant_time (merchant_id, trans_time),
INDEX idx_trans_type (trans_type, trans_time)
) PARTITION BY RANGE (trans_time);
CREATE TABLE merchant_info (
merchant_id VARCHAR (20 ) PRIMARY KEY ,
merchant_name VARCHAR (100 ) NOT NULL ,
merchant_type SMALLINT NOT NULL ,
contact_phone VARCHAR (11 ),
contact_person VARCHAR (50 ),
settlement_rate DECIMAL (5 , 4 ) DEFAULT 0.0000 ,
settlement_account VARCHAR (30 ),
status SMALLINT NOT NULL DEFAULT 1 ,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ,
INDEX idx_merchant_type (merchant_type),
INDEX idx_status (status)
);
三、核心功能模块实现
3.1 用户管理模块 用户管理模块负责用户生命周期的全流程管理,包括用户注册、信息维护、状态变更等操作。
@RestController
@RequestMapping("/api/user")
@Slf4j
public class UserController {
@Autowired
private UserService userService;
@PostMapping("/add")
public Result<UserVO> addUser (@Valid @RequestBody UserDTO userDTO) {
try {
UserVO userVO = userService.addUser(userDTO);
return Result.success(userVO);
} catch (BusinessException e) {
log.error("新增用户失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
@PostMapping("/delete/{userId}")
public Result<Void> deleteUser (@PathVariable Long userId) {
try {
userService.deleteUser(userId);
return Result.success();
} catch (BusinessException e) {
log.error("删除用户失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
@PostMapping("/update")
public Result<UserVO> updateUser (@Valid @RequestBody UserUpdateDTO updateDTO) {
try {
UserVO userVO = userService.updateUser(updateDTO);
return Result.success(userVO);
} catch (BusinessException e) {
log.error("更新用户失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
@GetMapping("/query/{userId}")
public Result<UserVO> queryUser (@PathVariable Long userId) {
try {
UserVO userVO = userService.queryUser(userId);
return Result.success(userVO);
} catch (BusinessException e) {
log.error("查询用户失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
@GetMapping("/list")
public Result<PageResult<UserVO>> listUsers (
@RequestParam(defaultValue = "1") Integer pageNum,
@RequestParam(defaultValue = "10") Integer pageSize,
@RequestParam(required = false) String userName,
@RequestParam(required = false) Integer userType,
@RequestParam(required = false) Integer status) {
UserQueryDTO queryDTO = new UserQueryDTO ();
queryDTO.setPageNum(pageNum);
queryDTO.setPageSize(pageSize);
queryDTO.setUserName(userName);
queryDTO.setUserType(userType);
queryDTO.setStatus(status);
PageResult<UserVO> pageResult = userService.listUsers(queryDTO);
return Result.success(pageResult);
}
}
@Service
@Slf4j
public class UserServiceImpl implements UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private CardMapper cardMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
@Transactional(rollbackFor = Exception.class)
public UserVO addUser (UserDTO userDTO) {
UserInfo existingUser = userMapper.selectByIdCard(userDTO.getIdCard());
if (existingUser != null ) {
throw new BusinessException (ErrorCode.USER_ID_CARD_EXIST);
}
UserInfo userInfo = new UserInfo ();
BeanUtils.copyProperties(userDTO, userInfo);
userInfo.setStatus(UserStatus.NORMAL.getCode());
userInfo.setCreateTime(new Date ());
userInfo.setUpdateTime(new Date ());
userMapper.insert(userInfo);
CardInfo cardInfo = createDefaultCard(userInfo.getUserId());
cardMapper.insert(cardInfo);
cacheUserInfo(userInfo);
return convertToVO(userInfo);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deleteUser (Long userId) {
UserInfo userInfo = userMapper.selectById(userId);
if (userInfo == null ) {
throw new BusinessException (ErrorCode.USER_NOT_EXIST);
}
userInfo.setStatus(UserStatus.DELETED.getCode());
userInfo.setUpdateTime(new Date ());
userMapper.updateById(userInfo);
cardMapper.updateStatusByUserId(userId, CardStatus.DELETED.getCode());
clearUserCache(userId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public UserVO updateUser (UserUpdateDTO updateDTO) {
UserInfo userInfo = userMapper.selectById(updateDTO.getUserId());
if (userInfo == null ) {
throw new BusinessException (ErrorCode.USER_NOT_EXIST);
}
if (StringUtils.isNotBlank(updateDTO.getPhone())) {
userInfo.setPhone(updateDTO.getPhone());
}
if (StringUtils.isNotBlank(updateDTO.getEmail())) {
userInfo.setEmail(updateDTO.getEmail());
}
userInfo.setUpdateTime(new Date ());
userMapper.updateById(userInfo);
cacheUserInfo(userInfo);
return convertToVO(userInfo);
}
@Override
public UserVO queryUser (Long userId) {
String cacheKey = "user:info:" + userId;
UserVO cachedUser = (UserVO) redisTemplate.opsForValue().get(cacheKey);
if (cachedUser != null ) {
return cachedUser;
}
UserInfo userInfo = userMapper.selectById(userId);
if (userInfo == null ) {
throw new BusinessException (ErrorCode.USER_NOT_EXIST);
}
UserVO userVO = convertToVO(userInfo);
redisTemplate.opsForValue().set(cacheKey, userVO, 30 , TimeUnit.MINUTES);
return userVO;
}
@Override
public PageResult<UserVO> listUsers (UserQueryDTO queryDTO) {
PageHelper.startPage(queryDTO.getPageNum(), queryDTO.getPageSize());
List<UserInfo> userList = userMapper.selectByCondition(queryDTO);
PageInfo<UserInfo> pageInfo = new PageInfo <>(userList);
List<UserVO> voList = userList.stream()
.map(this ::convertToVO)
.collect(Collectors.toList());
return new PageResult <>(voList, pageInfo.getTotal(), pageInfo.getPageNum(), pageInfo.getPageSize());
}
private void cacheUserInfo (UserInfo userInfo) {
String cacheKey = "user:info:" + userInfo.getUserId();
UserVO userVO = convertToVO(userInfo);
redisTemplate.opsForValue().set(cacheKey, userVO, 30 , TimeUnit.MINUTES);
}
private void clearUserCache (Long userId) {
String cacheKey = "user:info:" + userId;
redisTemplate.delete(cacheKey);
}
}
3.2 交易处理模块 交易处理模块是一卡通系统的核心,负责处理消费、充值、转账等资金操作,必须保证数据的一致性和事务的原子性。
@RestController
@RequestMapping("/api/transaction")
@Slf4j
public class TransactionController {
@Autowired
private TransactionService transactionService;
@PostMapping("/consume")
public Result<TransactionVO> consume (@Valid @RequestBody ConsumeDTO consumeDTO) {
try {
TransactionVO transactionVO = transactionService.consume(consumeDTO);
return Result.success(transactionVO);
} catch (BusinessException e) {
log.error("消费扣款失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
@PostMapping("/recharge")
public Result<TransactionVO> recharge (@Valid @RequestBody RechargeDTO rechargeDTO) {
try {
TransactionVO transactionVO = transactionService.recharge(rechargeDTO);
return Result.success(transactionVO);
} catch (BusinessException e) {
log.error("账户充值失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
@GetMapping("/detail/{transId}")
public Result<TransactionVO> getDetail (@PathVariable Long transId) {
try {
TransactionVO transactionVO = transactionService.getDetail(transId);
return Result.success(transactionVO);
} catch (BusinessException e) {
log.error("查询交易详情失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
@GetMapping("/list")
public Result<PageResult<TransactionVO>> listTransactions (
@RequestParam(defaultValue = "1") Integer pageNum,
@RequestParam(defaultValue = "20") Integer pageSize,
@RequestParam(required = false) String cardId,
@RequestParam(required = false) Integer transType,
@RequestParam(required = false) String startTime,
@RequestParam(required = false) String endTime) {
TransactionQueryDTO queryDTO = new TransactionQueryDTO ();
queryDTO.setPageNum(pageNum);
queryDTO.setPageSize(pageSize);
queryDTO.setCardId(cardId);
queryDTO.setTransType(transType);
if (StringUtils.isNotBlank(startTime)) {
queryDTO.setStartTime(DateUtil.parse(startTime));
}
if (StringUtils.isNotBlank(endTime)) {
queryDTO.setEndTime(DateUtil.parse(endTime));
}
PageResult<TransactionVO> pageResult = transactionService.listTransactions(queryDTO);
return Result.success(pageResult);
}
@PostMapping("/reverse/{transId}")
public Result<Void> reverseTransaction (@PathVariable Long transId) {
try {
transactionService.reverseTransaction(transId);
return Result.success();
} catch (BusinessException e) {
log.error("交易冲正失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
}
@Service
@Slf4j
public class TransactionServiceImpl implements TransactionService {
@Autowired
private TransactionMapper transactionMapper;
@Autowired
private CardMapper cardMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
@Transactional(rollbackFor = Exception.class)
public TransactionVO consume (ConsumeDTO consumeDTO) {
CardInfo cardInfo = cardMapper.selectByCardId(consumeDTO.getCardId());
if (cardInfo == null ) {
throw new BusinessException (ErrorCode.CARD_NOT_EXIST);
}
if (cardInfo.getCardStatus() != CardStatus.NORMAL.getCode()) {
throw new BusinessException (ErrorCode.CARD_STATUS_ABNORMAL);
}
validateConsumeLimit(cardInfo, consumeDTO.getAmount());
String lockKey = "card:consume:lock:" + consumeDTO.getCardId();
RLock lock = redissonClient.getLock(lockKey);
try {
boolean locked = lock.tryLock(3 , 10 , TimeUnit.SECONDS);
if (!locked) {
throw new BusinessException (ErrorCode.SYSTEM_BUSY);
}
BigDecimal beforeBalance = cardInfo.getBalance();
BigDecimal afterBalance = beforeBalance.subtract(consumeDTO.getAmount());
if (afterBalance.compareTo(BigDecimal.ZERO) < 0 ) {
throw new BusinessException (ErrorCode.INSUFFICIENT_BALANCE);
}
cardInfo.setBalance(afterBalance);
cardInfo.setLastUsedTime(new Date ());
cardMapper.updateBalance(cardInfo);
TransactionLog transactionLog = new TransactionLog ();
transactionLog.setCardId(consumeDTO.getCardId());
transactionLog.setTransType(TransType.CONSUME.getCode());
transactionLog.setTransAmount(consumeDTO.getAmount());
transactionLog.setBeforeBalance(beforeBalance);
transactionLog.setAfterBalance(afterBalance);
transactionLog.setTerminalId(consumeDTO.getTerminalId());
transactionLog.setMerchantId(consumeDTO.getMerchantId());
transactionLog.setLocationInfo(consumeDTO.getLocationInfo());
transactionLog.setTransTime(new Date ());
transactionLog.setStatus(TransStatus.SUCCESS.getCode());
transactionMapper.insert(transactionLog);
updateCardCache(cardInfo);
sendTransactionNotification(transactionLog);
return convertToVO(transactionLog);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BusinessException (ErrorCode.SYSTEM_ERROR);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public TransactionVO recharge (RechargeDTO rechargeDTO) {
CardInfo cardInfo = cardMapper.selectByCardId(rechargeDTO.getCardId());
if (cardInfo == null ) {
throw new BusinessException (ErrorCode.CARD_NOT_EXIST);
}
validateRechargeChannel(rechargeDTO.getChannel());
BigDecimal beforeBalance = cardInfo.getBalance();
BigDecimal afterBalance = beforeBalance.add(rechargeDTO.getAmount());
cardInfo.setBalance(afterBalance);
cardMapper.updateBalance(cardInfo);
TransactionLog transactionLog = new TransactionLog ();
transactionLog.setCardId(rechargeDTO.getCardId());
transactionLog.setTransType(TransType.RECHARGE.getCode());
transactionLog.setTransAmount(rechargeDTO.getAmount());
transactionLog.setBeforeBalance(beforeBalance);
transactionLog.setAfterBalance(afterBalance);
transactionLog.setTerminalId(rechargeDTO.getTerminalId());
transactionLog.setChannel(rechargeDTO.getChannel());
transactionLog.setChannelOrderNo(rechargeDTO.getChannelOrderNo());
transactionLog.setTransTime(new Date ());
transactionLog.setStatus(TransStatus.SUCCESS.getCode());
transactionMapper.insert(transactionLog);
updateCardCache(cardInfo);
sendRechargeNotification(transactionLog);
return convertToVO(transactionLog);
}
@Override
public TransactionVO getDetail (Long transId) {
String cacheKey = "transaction:detail:" + transId;
TransactionVO cachedTrans = (TransactionVO) redisTemplate.opsForValue().get(cacheKey);
if (cachedTrans != null ) {
return cachedTrans;
}
TransactionLog transactionLog = transactionMapper.selectById(transId);
if (transactionLog == null ) {
throw new BusinessException (ErrorCode.TRANSACTION_NOT_EXIST);
}
TransactionVO transactionVO = convertToVO(transactionLog);
redisTemplate.opsForValue().set(cacheKey, transactionVO, 10 , TimeUnit.MINUTES);
return transactionVO;
}
@Override
public PageResult<TransactionVO> listTransactions (TransactionQueryDTO queryDTO) {
PageHelper.startPage(queryDTO.getPageNum(), queryDTO.getPageSize());
List<TransactionLog> transactionList = transactionMapper.selectByCondition(queryDTO);
PageInfo<TransactionLog> pageInfo = new PageInfo <>(transactionList);
List<TransactionVO> voList = transactionList.stream()
.map(this ::convertToVO)
.collect(Collectors.toList());
return new PageResult <>(voList, pageInfo.getTotal(), pageInfo.getPageNum(), pageInfo.getPageSize());
}
@Override
@Transactional(rollbackFor = Exception.class)
public void reverseTransaction (Long transId) {
TransactionLog originalTrans = transactionMapper.selectById(transId);
if (originalTrans == null ) {
throw new BusinessException (ErrorCode.TRANSACTION_NOT_EXIST);
}
if (!canReverse(originalTrans)) {
throw new BusinessException (ErrorCode.TRANSACTION_CANNOT_REVERSE);
}
CardInfo cardInfo = cardMapper.selectByCardId(originalTrans.getCardId());
BigDecimal currentBalance = cardInfo.getBalance();
BigDecimal reversedBalance = currentBalance.add(originalTrans.getTransAmount());
cardInfo.setBalance(reversedBalance);
cardMapper.updateBalance(cardInfo);
TransactionLog reverseLog = new TransactionLog ();
reverseLog.setCardId(originalTrans.getCardId());
reverseLog.setTransType(TransType.REVERSE.getCode());
reverseLog.setTransAmount(originalTrans.getTransAmount());
reverseLog.setBeforeBalance(currentBalance);
reverseLog.setAfterBalance(reversedBalance);
reverseLog.setRelatedTransId(transId);
reverseLog.setTransTime(new Date ());
reverseLog.setStatus(TransStatus.SUCCESS.getCode());
reverseLog.setRemark("冲正交易,原交易 ID:" + transId);
transactionMapper.insert(reverseLog);
originalTrans.setStatus(TransStatus.REVERSED.getCode());
transactionMapper.updateStatus(originalTrans);
updateCardCache(cardInfo);
clearTransactionCache(transId);
}
private void validateConsumeLimit (CardInfo cardInfo, BigDecimal amount) {
if (cardInfo.getSingleLimit() != null && amount.compareTo(cardInfo.getSingleLimit()) > 0 ) {
throw new BusinessException (ErrorCode.EXCEED_SINGLE_LIMIT);
}
String dailyKey = "card:daily:consume:" + cardInfo.getCardId() + ":" + DateUtil.today();
BigDecimal dailyConsume = (BigDecimal) redisTemplate.opsForValue().get(dailyKey);
if (dailyConsume == null ) {
dailyConsume = BigDecimal.ZERO;
}
BigDecimal afterDaily = dailyConsume.add(amount);
if (cardInfo.getDailyLimit() != null && afterDaily.compareTo(cardInfo.getDailyLimit()) > 0 ) {
throw new BusinessException (ErrorCode.EXCEED_DAILY_LIMIT);
}
}
}
3.3 清分结算模块 清分结算模块负责处理商户结算、资金对账、差错处理等核心财务功能,对数据准确性和事务一致性要求极高。
@RestController
@RequestMapping("/api/settlement")
@Slf4j
public class SettlementController {
@Autowired
private SettlementService settlementService;
@PostMapping("/daily/generate")
public Result<SettlementVO> generateDailySettlement (@RequestParam String settleDate) {
try {
SettlementVO settlementVO = settlementService.generateDailySettlement(settleDate);
return Result.success(settlementVO);
} catch (BusinessException e) {
log.error("生成日终结算单失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
@GetMapping("/detail/{settleId}")
public Result<SettlementDetailVO> getSettlementDetail (@PathVariable Long settleId) {
try {
SettlementDetailVO detailVO = settlementService.getSettlementDetail(settleId);
return Result.success(detailVO);
} catch (BusinessException e) {
log.error("查询结算单详情失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
@PostMapping("/confirm/{settleId}")
public Result<Void> confirmSettlement (@PathVariable Long settleId) {
try {
settlementService.confirmSettlement(settleId);
return Result.success();
} catch (BusinessException e) {
log.error("确认结算单失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
@GetMapping("/list")
public Result<PageResult<SettlementVO>> listSettlements (
@RequestParam(defaultValue = "1") Integer pageNum,
@RequestParam(defaultValue = "20") Integer pageSize,
@RequestParam(required = false) String merchantId,
@RequestParam(required = false) Integer settleStatus,
@RequestParam(required = false) String startDate,
@RequestParam(required = false) String endDate) {
SettlementQueryDTO queryDTO = new SettlementQueryDTO ();
queryDTO.setPageNum(pageNum);
queryDTO.setPageSize(pageSize);
queryDTO.setMerchantId(merchantId);
queryDTO.setSettleStatus(settleStatus);
queryDTO.setStartDate(startDate);
queryDTO.setEndDate(endDate);
PageResult<SettlementVO> pageResult = settlementService.listSettlements(queryDTO);
return Result.success(pageResult);
}
@PostMapping("/delete/{settleId}")
public Result<Void> deleteSettlement (@PathVariable Long settleId) {
try {
settlementService.deleteSettlement(settleId);
return Result.success();
} catch (BusinessException e) {
log.error("删除结算单失败:{}" , e.getMessage(), e);
return Result.error(e.getCode(), e.getMessage());
}
}
}
@Service
@Slf4j
public class SettlementServiceImpl implements SettlementService {
@Autowired
private SettlementMapper settlementMapper;
@Autowired
private SettlementDetailMapper settlementDetailMapper;
@Autowired
private TransactionMapper transactionMapper;
@Autowired
private MerchantMapper merchantMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
@Transactional(rollbackFor = Exception.class)
public SettlementVO generateDailySettlement (String settleDate) {
if (!DateUtil.isValidDate(settleDate)) {
throw new BusinessException (ErrorCode.INVALID_DATE_FORMAT);
}
Settlement existing = settlementMapper.selectBySettleDate(settleDate);
if (existing != null ) {
throw new BusinessException (ErrorCode.SETTLEMENT_ALREADY_EXISTS);
}
List<MerchantInfo> merchantList = merchantMapper.selectAllActive();
Settlement settlement = new Settlement ();
settlement.setSettleDate(settleDate);
settlement.setSettleStatus(SettleStatus.DRAFT.getCode());
settlement.setTotalAmount(BigDecimal.ZERO);
settlement.setTotalFee(BigDecimal.ZERO);
settlement.setSettleAmount(BigDecimal.ZERO);
settlement.setCreateTime(new Date ());
settlementMapper.insert(settlement);
List<SettlementDetail> detailList = new ArrayList <>();
BigDecimal totalAmount = BigDecimal.ZERO;
BigDecimal totalFee = BigDecimal.ZERO;
for (MerchantInfo merchant : merchantList) {
TransactionSummary summary = transactionMapper.selectDailySummary(
merchant.getMerchantId(), settleDate);
if (summary == null || summary.getTotalAmount().compareTo(BigDecimal.ZERO) == 0 ) {
continue ;
}
BigDecimal fee = calculateFee(summary.getTotalAmount(), merchant.getSettlementRate());
BigDecimal settleAmount = summary.getTotalAmount().subtract(fee);
SettlementDetail detail = new SettlementDetail ();
detail.setSettleId(settlement.getSettleId());
detail.setMerchantId(merchant.getMerchantId());
detail.setMerchantName(merchant.getMerchantName());
detail.setTransCount(summary.getTransCount());
detail.setTotalAmount(summary.getTotalAmount());
detail.setFeeAmount(fee);
detail.setSettleAmount(settleAmount);
detail.setSettleStatus(SettleStatus.DRAFT.getCode());
detail.setCreateTime(new Date ());
settlementDetailMapper.insert(detail);
detailList.add(detail);
totalAmount = totalAmount.add(summary.getTotalAmount());
totalFee = totalFee.add(fee);
}
settlement.setTotalAmount(totalAmount);
settlement.setTotalFee(totalFee);
settlement.setSettleAmount(totalAmount.subtract(totalFee));
settlement.setDetailCount(detailList.size());
settlement.setUpdateTime(new Date ());
settlementMapper.updateById(settlement);
sendSettlementGeneratedNotification(settlement);
return convertToVO(settlement);
}
@Override
public SettlementDetailVO getSettlementDetail (Long settleId) {
Settlement settlement = settlementMapper.selectById(settleId);
if (settlement == null ) {
throw new BusinessException (ErrorCode.SETTLEMENT_NOT_EXIST);
}
List<SettlementDetail> detailList = settlementDetailMapper.selectBySettleId(settleId);
SettlementDetailVO detailVO = new SettlementDetailVO ();
detailVO.setSettlement(convertToVO(settlement));
detailVO.setDetails(detailList.stream()
.map(this ::convertDetailToVO)
.collect(Collectors.toList()));
return detailVO;
}
@Override
@Transactional(rollbackFor = Exception.class)
public void confirmSettlement (Long settleId) {
Settlement settlement = settlementMapper.selectById(settleId);
if (settlement == null ) {
throw new BusinessException (ErrorCode.SETTLEMENT_NOT_EXIST);
}
if (settlement.getSettleStatus() != SettleStatus.DRAFT.getCode()) {
throw new BusinessException (ErrorCode.SETTLEMENT_STATUS_ERROR);
}
settlement.setSettleStatus(SettleStatus.CONFIRMED.getCode());
settlement.setConfirmTime(new Date ());
settlement.setUpdateTime(new Date ());
settlementMapper.updateById(settlement);
settlementDetailMapper.updateStatusBySettleId(settleId, SettleStatus.CONFIRMED.getCode());
triggerFundTransfer(settlement);
sendSettlementConfirmedNotification(settlement);
}
@Override
public PageResult<SettlementVO> listSettlements (SettlementQueryDTO queryDTO) {
PageHelper.startPage(queryDTO.getPageNum(), queryDTO.getPageSize());
List<Settlement> settlementList = settlementMapper.selectByCondition(queryDTO);
PageInfo<Settlement> pageInfo = new PageInfo <>(settlementList);
List<SettlementVO> voList = settlementList.stream()
.map(this ::convertToVO)
.collect(Collectors.toList());
return new PageResult <>(voList, pageInfo.getTotal(), pageInfo.getPageNum(), pageInfo.getPageSize());
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deleteSettlement (Long settleId) {
Settlement settlement = settlementMapper.selectById(settleId);
if (settlement == null ) {
throw new BusinessException (ErrorCode.SETTLEMENT_NOT_EXIST);
}
if (settlement.getSettleStatus() != SettleStatus.DRAFT.getCode()) {
throw new BusinessException (ErrorCode.SETTLEMENT_CANNOT_DELETE);
}
settlementDetailMapper.deleteBySettleId(settleId);
settlementMapper.deleteById(settleId);
sendSettlementDeletedNotification(settlement);
}
private BigDecimal calculateFee (BigDecimal amount, BigDecimal rate) {
if (rate == null || rate.compareTo(BigDecimal.ZERO) == 0 ) {
return BigDecimal.ZERO;
}
return amount.multiply(rate).setScale(2 , RoundingMode.HALF_UP);
}
}
四、性能优化策略
4.1 数据库层面优化
CREATE INDEX idx_card_trans ON transaction_log(card_id, trans_time DESC );
CREATE INDEX idx_merchant_trans ON transaction_log(merchant_id, trans_time DESC );
CREATE INDEX idx_user_name_lower ON user_info(LOWER (user_name));
CREATE INDEX idx_trans_time_local ON transaction_log(trans_time) LOCAL ;
SELECT user_id, user_name, phone FROM user_info WHERE status = 1 ;
SELECT card_id FROM transaction_log WHERE trans_time >= '2024-01-01' AND trans_time < '2024-02-01' ;
SELECT * FROM transaction_log WHERE trans_id > ? ORDER BY trans_id ASC LIMIT 20 ;
4.2 缓存策略设计
@Configuration
@EnableCaching
public class CacheConfig {
@Bean
public CacheManager cacheManager (RedisConnectionFactory factory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30 ))
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(new StringRedisSerializer ()))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer ()));
Map<String, RedisCacheConfiguration> cacheConfigs = new HashMap <>();
cacheConfigs.put("userInfo" , config.entryTtl(Duration.ofMinutes(30 )));
cacheConfigs.put("cardInfo" , config.entryTtl(Duration.ofMinutes(10 )));
cacheConfigs.put("transaction" , config.entryTtl(Duration.ofMinutes(5 )));
cacheConfigs.put("merchant" , config.entryTtl(Duration.ofHours(1 )));
return RedisCacheManager.builder(factory)
.cacheDefaults(config)
.withInitialCacheConfigurations(cacheConfigs)
.build();
}
@Bean
public RedisTemplate<String, Object> redisTemplate (RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate <>();
template.setConnectionFactory(factory);
template.setKeySerializer(new StringRedisSerializer ());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer ());
template.setHashKeySerializer(new StringRedisSerializer ());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer ());
return template;
}
}
4.3 读写分离与分库分表 为应对海量数据和高并发读写压力,系统在数据库架构层面采用了读写分离和分库分表策略。
spring:
datasource:
dynamic:
primary: master
strict: true
datasource:
master:
url: jdbc:kingbase8://master-host:5432/carddb?currentSchema=public
username: ${MASTER_DB_USER}
password: ${MASTER_DB_PASSWORD}
driver-class-name: com.kingbase8.Driver
slave1:
url: jdbc:kingbase8://slave1-host:5432/carddb?currentSchema=public
username: ${SLAVE_DB_USER}
password: ${SLAVE_DB_PASSWORD}
driver-class-name: com.kingbase8.Driver
slave2:
url: jdbc:kingbase8://slave2-host:5432/carddb?currentSchema=public
username: ${SLAVE_DB_USER}
password: ${SLAVE_DB_PASSWORD}
driver-class-name: com.kingbase8.Driver
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
@Component
public class UserShardingStrategy implements PreciseShardingAlgorithm <Long> {
@Override
public String doSharding (Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long userId = shardingValue.getValue();
int shardIndex = Math.abs(userId.hashCode() % 4 );
for (String each : availableTargetNames) {
if (each.endsWith("_" + shardIndex)) {
return each;
}
}
throw new IllegalArgumentException ("未找到匹配的数据源" );
}
}
@Component
public class TimeRangeShardingStrategy implements RangeShardingAlgorithm <Date> {
@Override
public Collection<String> doSharding (Collection<String> availableTargetNames, RangeShardingValue<Date> shardingValue) {
Range<Date> range = shardingValue.getValueRange();
Date lower = range.lowerEndpoint();
Date upper = range.upperEndpoint();
List<String> result = new ArrayList <>();
Set<String> shardNames = getShardNamesBetween(lower, upper);
for (String each : availableTargetNames) {
if (shardNames.contains(each)) {
result.add(each);
}
}
return result;
}
private Set<String> getShardNamesBetween (Date start, Date end) {
Set<String> shardNames = new HashSet <>();
Calendar calendar = Calendar.getInstance();
calendar.setTime(start);
while (!calendar.getTime().after(end)) {
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH) + 1 ;
String shardName = String.format("trans_log_%d_%02d" , year, month);
shardNames.add(shardName);
calendar.add(Calendar.MONTH, 1 );
}
return shardNames;
}
}
五、高可用与容灾设计
5.1 多级故障转移机制
CREATE OR REPLACE FUNCTION trigger_failover() RETURNS void AS $$
DECLARE
current_primary VARCHAR (100 );
new_primary VARCHAR (100 );
BEGIN
SELECT node_name INTO current_primary FROM pg_stat_replication WHERE state = 'streaming' LIMIT 1 ;
IF current_primary IS NULL THEN
SELECT node_name INTO new_primary FROM pg_stat_wal_receiver WHERE status = 'streaming' ORDER BY last_msg_send_time DESC LIMIT 1 ;
PERFORM pg_promote(new_primary);
UPDATE system_config SET config_value = new_primary WHERE config_key = 'primary_db_node' ;
INSERT INTO failover_log (old_primary, new_primary, failover_time, reason) VALUES (current_primary, new_primary, NOW(), 'primary node failure' );
END IF;
END ;
$$ LANGUAGE plpgsql;
CREATE EVENT TRIGGER monitor_db_health ON SCHEDULE EVERY 30 SECOND DO BEGIN
IF NOT check_primary_health() THEN
PERFORM trigger_failover();
END IF;
END ;
@Component
@Slf4j
public class DatabaseHealthChecker {
@Autowired
private DataSource dataSource;
@Value("${health.check.interval:5000}")
private long checkInterval;
@PostConstruct
public void init () {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1 );
scheduler.scheduleAtFixedRate(this ::checkDatabaseHealth, 0 , checkInterval, TimeUnit.MILLISECONDS);
}
private void checkDatabaseHealth () {
try (Connection conn = dataSource.getConnection(); Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("SELECT 1" );
if (rs.next()) {
HealthMonitor.setDatabaseStatus(HealthStatus.UP);
}
} catch (SQLException e) {
log.error("数据库健康检查失败" , e);
HealthMonitor.setDatabaseStatus(HealthStatus.DOWN);
triggerDegradation();
}
}
private void triggerDegradation () {
SystemConfig.setReadOnlyMode(true );
notifyAllInstances("database_down" , System.currentTimeMillis());
FaultEvent event = new FaultEvent ();
event.setEventType(FaultType.DATABASE_UNAVAILABLE);
event.setSeverity(Severity.CRITICAL);
event.setOccurTime(new Date ());
event.setDescription("数据库连接失败,已切换到只读降级模式" );
faultEventService.recordEvent(event);
}
}
5.2 数据同步与一致性保障
CREATE PUBLICATION card_publication FOR ALL TABLES;
CREATE SUBSCRIPTION beijing_subscription
CONNECTION 'host=bj-db-host port=5432 dbname=carddb user=replicator password=xxxxxx'
PUBLICATION card_publication WITH (
copy_data = true ,
create_slot = true ,
enabled = true ,
slot_name = 'beijing_slot'
);
SELECT client_addr, application_name, state, sync_state,
pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn) AS sent_lag,
pg_wal_lsn_diff(pg_current_wal_lsn(), write_lsn) AS write_lag,
pg_wal_lsn_diff(pg_current_wal_lsn(), flush_lsn) AS flush_lag,
pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS replay_lag
FROM pg_stat_replication;
@Service
public class CacheServiceImpl implements CacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedissonClient redissonClient;
@Override
public <T> T queryWithPenetrationProtection (String key, Class<T> clazz, Supplier<T> dbSupplier, Duration ttl) {
T value = (T) redisTemplate.opsForValue().get(key);
if (value != null ) {
if (value instanceof NullValue) {
return null ;
}
return value;
}
String lockKey = "cache:lock:" + key;
RLock lock = redissonClient.getLock(lockKey);
try {
boolean locked = lock.tryLock(100 , 5000 , TimeUnit.MILLISECONDS);
if (locked) {
value = (T) redisTemplate.opsForValue().get(key);
if (value != null ) {
if (value instanceof NullValue) {
return null ;
}
return value;
}
value = dbSupplier.get();
if (value == null ) {
redisTemplate.opsForValue().set(key, NullValue.INSTANCE, Duration.ofMinutes(5 ));
} else {
redisTemplate.opsForValue().set(key, value, ttl);
}
return value;
} else {
Thread.sleep(50 );
return queryWithPenetrationProtection(key, clazz, dbSupplier, ttl);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BusinessException (ErrorCode.SYSTEM_ERROR);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
@Override
public <T> Map<String, T> batchQuery (List<String> keys, Class<T> clazz, Function<List<String>, Map<String, T>> dbSupplier) {
if (keys.isEmpty()) {
return Collections.emptyMap();
}
List<Object> cachedValues = redisTemplate.opsForValue().multiGet(keys);
Map<String, T> result = new HashMap <>();
List<String> missingKeys = new ArrayList <>();
for (int i = 0 ; i < keys.size(); i++) {
String key = keys.get(i);
Object value = cachedValues.get(i);
if (value == null ) {
missingKeys.add(key);
} else if (value instanceof NullValue) {
continue ;
} else {
result.put(key, (T) value);
}
}
if (!missingKeys.isEmpty()) {
Map<String, T> dbResult = dbSupplier.apply(missingKeys);
result.putAll(dbResult);
Map<String, Object> cacheMap = new HashMap <>();
for (String key : missingKeys) {
T value = dbResult.get(key);
if (value == null ) {
cacheMap.put(key, NullValue.INSTANCE);
} else {
cacheMap.put(key, value);
}
}
if (!cacheMap.isEmpty()) {
redisTemplate.opsForValue().multiSet(cacheMap);
for (String key : cacheMap.keySet()) {
if (cacheMap.get(key) instanceof NullValue) {
redisTemplate.expire(key, Duration.ofMinutes(5 ));
} else {
redisTemplate.expire(key, Duration.ofMinutes(30 ));
}
}
}
}
return result;
}
}
六、结语 本次一卡通系统国产化改造实践表明,基于国产数据库构建大规模、高并发、高可用的核心业务系统是完全可行的。通过合理的架构设计、精细的性能优化、完善的容灾方案和严谨的迁移策略,不仅实现了技术自主可控,更在性能、稳定性和可扩展性等方面获得了显著提升。
随着数字化转型的深入推进和信息技术应用创新产业的快速发展,国产基础软件将在更多关键业务场景中发挥重要作用。本案例的技术方案和实践经验,可为类似系统的建设和改造提供有价值的参考,共同推动我国数字基础设施的自主创新发展。
相关免费在线工具 Keycode 信息 查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
Escape 与 Native 编解码 JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
JavaScript / HTML 格式化 使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
JavaScript 压缩与混淆 Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online