系统概述
1.1 系统定位
TMS(Transport Management System)物流管理系统是跨境电商供应链 SaaS 平台的核心模块之一,负责管理从订单发货到签收的全流程物流业务。
在供应链中的位置:
- 上游系统: WMS 仓储管理系统(发货出库后推送运单需求)、OMS 订单管理系统(订单状态同步)
- 下游系统: 多家物流承运商(DHL、FedEx、UPS、顺丰国际等)
- 平行系统: FMS 财务结算系统(物流费用对账)
核心价值:
- 统一物流管理: 对接 20+ 家物流承运商,提供统一的运单创建、轨迹追踪、费用核算接口
- 智能渠道推荐: 基于规则引擎自动推荐最优物流渠道,综合考虑成本、时效、可靠性
- 实时轨迹追踪: 定时拉取物流轨迹,异常检测与预警,提升客户体验
- 自动费用对账: 自动核算物流费用,与物流商账单对账,发现差异并分析原因
1.2 业务规模
- 日均运单量: 8 万+ 单
- 物流承运商: 20+ 家(DHL、FedEx、UPS、顺丰国际、燕文、递四方等)
- 物流渠道: 100+ 个(不同承运商的不同产品线)
- 轨迹节点: 日均 50 万+ 条
- 峰值 QPS: 大促期间 5000+(运单创建)
- 数据库存储: 运单表 2000 万+ 条,轨迹表 5 亿+ 条
1.3 技术栈
- 后端框架: Spring Boot 3.2、MyBatis-Plus 3.5
- 数据库: MySQL 8.0(主从架构)、分库分表(ShardingSphere)
- 缓存: Redis 7.0(物流规则缓存、热点轨迹缓存)
- 消息队列: RocketMQ 5.0(运单状态通知、轨迹推送)
- 定时任务: XXL-Job 2.4(轨迹拉取、费用对账)
- 限流熔断: Sentinel 1.8(物流商 API 调用治理)
- 文件处理: EasyExcel 3.3(账单批量导入)
- 监控告警: Prometheus + Grafana + 钉钉机器人
核心业务流程
2.1 运单创建完整流程
运单创建是 TMS 最核心的业务流程,涉及智能渠道推荐、物流商 API 调用、运单状态管理等多个环节。
完整流程(10 步):
第一步:接收发货需求
WMS 仓储系统出库完成后,通过 RocketMQ 发送消息到 TMS,包含订单号、SKU、数量、收件人信息、包裹重量体积等。
技术上,我们用 MQ 异步处理,因为运单创建比较耗时(需要调用物流商 API),不适合同步调用。消息体包含完整的发货信息,TMS 消费消息后创建运单。
第二步:智能渠道推荐
系统根据发货需求,调用智能规则引擎推荐最优物流渠道。规则引擎分两步:
-
过滤候选渠道: 根据目的国、重量范围、是否带电、是否液体等条件过滤。比如美国线、重量 0-2kg、不带电,可能有 15 个候选渠道。
-
计算综合评分: 对每个候选渠道计算综合评分,公式是:
综合评分 = 成本得分 × 40% + 时效得分 × 35% + 可靠性得分 × 25%。成本得分根据预估运费计算,时效得分根据平均妥投天数计算,可靠性得分根据近 30 天妥投率计算。
最终返回 TOP3 渠道供商家选择,或者自动选择评分最高的渠道。
第三步:创建运单记录
在数据库中创建运单记录,状态为”待创建”(status=0),记录订单号、SKU、收件人信息、选择的物流渠道、预估运费等。
技术上,运单号采用雪花算法生成,格式是 TMS + 19 位数字,全局唯一。运单记录包含 tenant_id(租户 ID)、order_no(订单号)、channel_id(渠道 ID)、tracking_no(物流单号,此时为空)等字段。
第四步:幂等性校验(三层防护)
在调用物流商 API 之前,先进行幂等性校验,防止重复创建运单。我们采用三层防护机制:
第一层:Redis 分布式锁(防止并发创建)
- 使用 Redis SETNX 实现分布式锁,Key 是
waybill:create:{order_no},TTL 60 秒 - 如果 SETNX 返回 false,说明已经有线程在创建运单,直接返回”创建中”
- 如果返回 true,继续执行后续步骤
- 优点:响应快,可以防止并发创建
- 缺点:如果 Redis 挂了,锁就失效了
第二层:业务层查重(Redis 挂了也能防重)
- 查询数据库,判断该订单号是否已经创建过运单
- 如果已创建,直接返回已有运单信息
- 优点:不依赖 Redis,即使 Redis 挂了也能防止重复创建
- 缺点:查询数据库有一定性能开销
第三层:数据库唯一索引(最后兜底)
- 在
tms_waybill表创建唯一索引uk_order_no(order_no) - 如果前两层都失败,数据库唯一索引会拒绝重复插入
- 优点:数据库层面保证唯一性,最可靠
- 缺点:只能在插入时才能发现重复,无法提前拦截
三层防护结合:
- 正常情况下,Redis 分布式锁就能防止重复创建,响应快
- 如果 Redis 挂了,业务层查重可以防止重复创建
- 如果业务层查重失败,数据库唯一索引可以兜底
- 三层防护确保幂等性 100%
第五步:调用物流商 API 创建运单
通过统一适配层调用物流商 API 创建运单。适配层屏蔽了不同物流商的 API 差异,提供统一的接口。
技术上,我们用适配器模式(Adapter Pattern)实现。定义统一接口 LogisticsCarrierAdapter,包含 createWaybill()、queryTrack()、cancelWaybill() 等方法。每个物流商实现自己的适配器,比如 DHLAdapter、FedExAdapter、SFExpressAdapter。
调用物流商 API 时,需要传递收件人信息、包裹重量体积、申报信息等。物流商返回物流单号(tracking_no)和面单 PDF 文件。
第六步:保存物流单号和面单
将物流商返回的物流单号保存到数据库,更新运单状态为”已创建”(status=2)。同时,将面单 PDF 文件上传到 OSS 对象存储,保存文件 URL。
技术上,这一步在事务中执行,保证数据一致性。如果物流商 API 调用成功,但保存数据库失败,需要调用物流商的取消接口,避免产生无效运单。
第七步:写入运单流水
写入运单流水表,记录运单状态变更历史。流水表只增不改不删,用于审计追溯。
流水表包含 waybill_no(运单号)、old_status(变更前状态)、new_status(变更后状态)、operate_time(操作时间)、operator(操作人)等字段。
第八步:通知 OMS 更新订单状态
运单创建成功后,通过 RocketMQ 发送消息通知 OMS,更新订单状态为”已发货”,同时推送物流单号和面单 URL。
技术上,我们用本地消息表保证消息可靠性。在运单创建的事务中,同时写入本地消息表。事务提交后,定时任务扫描本地消息表,发送 MQ 消息。消息发送成功后,更新本地消息表状态为”已发送”。
第九步:缓存运单信息
将运单信息写入 Redis 缓存,Key 是 waybill:info:{waybill_no},TTL 7 天。缓存包含运单号、物流单号、当前状态、物流渠道等信息。
这样,后续查询运单信息时,可以直接从缓存读取,减少数据库压力。
第十步:释放分布式锁
删除 Redis 中的分布式锁,允许其他请求创建运单。
技术上,我们用 Lua 脚本保证删除锁的原子性,只有持有锁的线程才能删除锁,避免误删其他线程的锁。
2.2 物流轨迹追踪完整流程
物流轨迹追踪是 TMS 的第二核心流程,需要定时拉取物流商的轨迹数据,并进行异常检测与预警。
完整流程(7 步):
第一步:XXL-Job 定时任务触发
我们用 XXL-Job 定时任务,每 30 分钟触发一次,拉取需要更新轨迹的运单。
技术上,XXL-Job 支持分片执行,可以将任务分配到多个执行器节点并行处理。比如有 4 个执行器节点,每个节点负责 1/4 的运单,提高处理效率。
第二步:分片查询待拉取运单
根据分片参数查询待拉取运单。查询条件是:状态为”运输中”(status IN (3,4,5,6,7))、距离上次拉取时间超过 30 分钟、按运单 ID 取模分片。
技术上,我们用游标分页(Cursor Pagination)而不是传统的 OFFSET 分页,避免深分页性能问题。每次查询 500 条,记录最后一条的 ID,下次查询从这个 ID 开始。
SQL 示例:
SELECT * FROM tms_waybill
WHERE status IN (3,4,5,6,7)
AND last_track_pull_time < DATE_SUB(NOW(), INTERVAL 30 MINUTE)
AND id > #{lastId}
AND MOD(id, #{shardTotal}) = #{shardIndex}
ORDER BY id ASC
LIMIT 500;
第三步:调用物流商 API 查询轨迹
通过统一适配层调用物流商 API 查询轨迹。传入物流单号,物流商返回轨迹节点列表。
每个轨迹节点包含:节点时间、节点描述、节点位置、节点状态码等。比如 DHL 返回的节点:“2026-05-20 10:30:00, Departed from facility, Hong Kong, DEPARTED”。
技术上,不同物流商的轨迹格式不同,适配器需要将其转换为统一格式。我们定义了标准的 TrackNode 对象,包含 node_time、node_desc、node_location、node_status 等字段。
第四步:轨迹去重与增量保存
将新拉取的轨迹节点与数据库中已有的轨迹节点对比,只保存新增的节点,避免重复存储。
技术上,我们用 (tracking_no, node_time, node_desc) 三元组作为唯一标识,判断节点是否已存在。如果不存在,插入数据库;如果存在,跳过。
轨迹表按月分表,单表数据量控制在 2000 万以内,保证查询性能。表名格式是 tms_track_202605、tms_track_202606。
第五步:更新运单状态
根据最新的轨迹节点,更新运单状态。比如轨迹显示”已签收”,运单状态更新为”已签收”(status=9)。
技术上,我们维护了一个物流状态映射表,将物流商的状态码映射到 TMS 的标准状态。比如 DHL 的 “DELIVERED” 映射到 TMS 的 status=9(已签收)。
同时,我们用状态机(State Machine)控制状态流转,只允许合法的状态转换。比如”已签收”不能回退到”运输中”,如果出现这种情况,记录异常日志并告警。
第六步:异常检测与预警
对轨迹数据进行异常检测,发现问题及时预警。我们定义了三种异常规则:
-
超时未更新: 运单创建后 72 小时仍无轨迹,或运输中 7 天无新轨迹节点,判定为超时异常。
-
轨迹停滞: 连续 3 个轨迹节点位置相同且时间跨度超过 48 小时,判定为停滞异常。比如连续 3 天都显示”清关中”,可能卡关了。
-
异常节点: 轨迹中出现”退回”、“拒收”、“丢失”、“损坏”等关键词,判定为异常节点。
检测到异常后,系统自动创建异常工单,推送钉钉告警,通知运营人员处理。同时,通过 MQ 通知 OMS,OMS 可以主动联系客户说明情况。
第七步:推送轨迹给 OMS 和商家
将最新轨迹通过 RocketMQ 推送给 OMS,OMS 再推送给商家后台和终端客户。
技术上,我们只推送增量轨迹,不推送全量轨迹,减少消息体积。消息体包含运单号、新增轨迹节点列表、当前运单状态。
同时,我们将热点运单的轨迹缓存到 Redis,Key 是 waybill:track:{waybill_no},Value 是轨迹节点列表(JSON 数组),TTL 7 天。商家查询轨迹时,优先从 Redis 读取,Redis 未命中再查数据库。
2.3 物流费用核算完整流程
物流费用核算是 TMS 的第三核心流程,需要根据实际重量体积计算运费,并与物流商账单对账。
完整流程(7 步):
第一步:获取实际重量体积
运单创建时,使用的是预估重量体积。货物实际发出后,物流商会称重测量,得到实际重量体积。
技术上,物流商通过 API 或账单文件提供实际重量体积。我们定时拉取或批量导入,更新运单的 actual_weight(实际重量)和 actual_volume(实际体积)字段。
第二步:计算计费重量
根据实际重量和体积,计算计费重量。计费重量取实际重量和材积重的较大值。
材积重计算公式:材积重 = 长 × 宽 × 高 / 5000(单位:cm 和 g)。
举个例子:包裹实际重量 800g,尺寸 30×20×15cm,材积重 = 30×20×15/5000 = 1800g。计费重量取较大值 1800g。
第三步:查询物流渠道价格规则
根据运单的物流渠道,查询价格规则。价格规则包含首重、首重价格、续重、续重价格等。
技术上,价格规则存储在 MySQL 中,同时缓存到 Redis。Key 是 channel:price:{channel_id}:{dest_country},Value 是价格规则对象(JSON),TTL 24 小时。
价格规则示例:
- 首重:500g,首重价格:50 元
- 续重:每 100g,续重价格:8 元
第四步:计算应付运费
根据计费重量和价格规则,计算应付运费。
计算公式:
- 如果计费重量 ≤ 首重:
应付运费 = 首重价格 - 如果计费重量 > 首重:
应付运费 = 首重价格 + CEIL((计费重量 - 首重) / 续重单位) × 续重价格
举个例子:计费重量 1800g,首重 500g 价格 50 元,续重每 100g 价格 8 元。 应付运费 = 50 + CEIL((1800 - 500) / 100) × 8 = 50 + 13 × 8 = 154 元。
技术上,我们用 Java BigDecimal 进行金额计算,避免浮点数精度问题。
第五步:保存费用核算结果
将计算出的应付运费保存到运单表的 freight_amount 字段,同时记录计费重量、实际重量、材积重等信息。
技术上,这一步在事务中执行,同时写入费用流水表,记录费用计算的详细过程,用于审计和对账。
第六步:批量导入物流商账单
物流商每月提供账单文件(Excel 格式),包含每笔运单的物流单号、计费重量、应收运费等。
技术上,我们用 EasyExcel 批量导入账单数据。定义 LogisticsBillRow 实体类,映射 Excel 列。实现 AnalysisEventListener 监听器,每读取 1000 行就批量插入数据库,避免内存溢出。
导入时进行数据校验:物流单号是否存在、金额格式是否正确、是否重复导入等。校验失败的记录单独保存到错误表,生成错误报告供人工处理。
第七步:自动对账与差异分析
将 TMS 计算的应付运费与物流商账单的应收运费对比,发现差异并分析原因。
技术上,我们用 SQL 关联查询实现自动对账:
SELECT
w.waybill_no,
w.tracking_no,
w.freight_amount AS tms_amount,
b.bill_amount AS carrier_amount,
(b.bill_amount - w.freight_amount) AS diff_amount,
CASE
WHEN ABS(b.bill_amount - w.freight_amount) < 0.01 THEN '一致'
WHEN b.bill_amount > w.freight_amount THEN '物流商多收'
ELSE '物流商少收'
END AS diff_type
FROM tms_waybill w
INNER JOIN tms_logistics_bill b ON w.tracking_no = b.tracking_no
WHERE w.tenant_id = ? AND b.bill_batch_no = ?
HAVING ABS(diff_amount) >= 0.01;
差异原因分析:
- 重量差异: TMS 使用的重量与物流商实际称重不一致
- 价格规则变更: 物流商调整了价格,但 TMS 未及时更新
- 附加费用: 物流商收取了偏远地区附加费、燃油附加费等,TMS 未计算
- 数据错误: 物流商账单数据错误或 TMS 计算错误
对账结果生成报表,推送给财务人员审核。差异金额超过阈值的,自动创建工单,要求运营人员核实处理。
技术架构设计
3.1 整体架构
TMS 采用微服务架构,与 OMS、WMS、FMS 等模块通过 RPC 和 MQ 通信。
核心模块:
- 运单管理模块: 运单创建、状态管理、运单查询
- 渠道管理模块: 物流渠道配置、价格规则管理、智能推荐引擎
- 轨迹管理模块: 轨迹拉取、轨迹存储、异常检测
- 费用管理模块: 费用核算、账单导入、自动对账
- 物流商适配模块: 统一适配层、多物流商接入
技术架构图:
┌─────────────────────────────────────────────────────────┐
│ 前端层 │
│ 商家后台 / 运营后台 / 移动端 App │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ API 网关层 │
│ Spring Cloud Gateway + Sentinel │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ TMS 业务层 │
│ ┌──────────┬──────────┬──────────┬──────────┐ │
│ │运单管理 │渠道管理 │轨迹管理 │费用管理 │ │
│ └──────────┴──────────┴──────────┴──────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 物流商适配层 │
│ ┌──────────┬──────────┬──────────┬──────────┐ │
│ │DHL Adapter│FedEx │UPS │SF Express│ │
│ └──────────┴──────────┴──────────┴──────────┘ │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ 基础设施层 │
│ MySQL + Redis + RocketMQ + XXL-Job + OSS │
└─────────────────────────────────────────────────────────┘
3.2 数据流转
运单创建数据流:
- WMS 出库完成 → RocketMQ 消息 → TMS 消费消息
- TMS 智能推荐渠道 → 创建运单记录 → 调用物流商 API
- 保存物流单号 → 上传面单到 OSS → 通知 OMS
轨迹同步数据流:
- XXL-Job 定时任务 → 分片查询待拉取运单
- 调用物流商 API → 获取轨迹数据 → 去重保存
- 更新运单状态 → 异常检测 → 推送 MQ 通知
费用对账数据流:
- 物流商提供账单 → EasyExcel 批量导入
- TMS 计算应付运费 → SQL 关联对账
- 生成差异报表 → 推送财务审核
核心技术亮点
4.1 智能规则引擎与渠道推荐算法
问题背景:
跨境电商有 100+ 个物流渠道,不同渠道的价格、时效、可靠性差异很大。商家手动选择渠道效率低,容易选错。需要一个智能推荐系统,根据发货需求自动推荐最优渠道。
技术方案:
设计了两阶段推荐算法:过滤 + 评分。
第一阶段:过滤候选渠道
根据硬性条件过滤不符合的渠道:
- 目的国过滤: 渠道不支持目的国,直接排除
- 重量范围过滤: 包裹重量超出渠道限制,排除
- 特殊属性过滤: 包裹带电但渠道不支持带电,排除
- 服务类型过滤: 需要签名服务但渠道不提供,排除
技术实现:
public List<LogisticsChannel> filterChannels(ShipmentRequest request) {
return channelMapper.selectList(
new LambdaQueryWrapper<LogisticsChannel>()
.eq(LogisticsChannel::getStatus, 1) // 启用状态
.apply("FIND_IN_SET({0}, dest_countries)", request.getDestCountry())
.le(LogisticsChannel::getMinWeight, request.getWeight())
.ge(LogisticsChannel::getMaxWeight, request.getWeight())
.eq(request.hasBattery(), LogisticsChannel::getSupportBattery, true)
.eq(request.hasLiquid(), LogisticsChannel::getSupportLiquid, true)
);
}
第二阶段:计算综合评分
对每个候选渠道计算三个维度的得分:
-
成本得分(40% 权重):
- 预估运费越低,得分越高
- 归一化公式:
成本得分 = (最高运费 - 当前运费) / (最高运费 - 最低运费) × 100
-
时效得分(35% 权重):
- 平均妥投天数越少,得分越高
- 归一化公式:
时效得分 = (最长天数 - 当前天数) / (最长天数 - 最短天数) × 100
-
可靠性得分(25% 权重):
- 近 30 天妥投率越高,得分越高
- 直接使用妥投率:
可靠性得分 = 妥投率 × 100
综合评分公式:
综合评分 = 成本得分 × 0.4 + 时效得分 × 0.35 + 可靠性得分 × 0.25
技术实现:
public List<ChannelRecommendation> calculateScores(
List<LogisticsChannel> channels, ShipmentRequest request) {
// 1. 计算每个渠道的预估运费
Map<Long, BigDecimal> freightMap = channels.stream()
.collect(Collectors.toMap(
LogisticsChannel::getId,
ch -> calculateFreight(ch, request.getWeight())
));
// 2. 获取归一化参数
BigDecimal maxFreight = Collections.max(freightMap.values());
BigDecimal minFreight = Collections.min(freightMap.values());
// 3. 计算综合评分
return channels.stream().map(channel -> {
// 成本得分
BigDecimal freight = freightMap.get(channel.getId());
double costScore = maxFreight.subtract(freight)
.divide(maxFreight.subtract(minFreight), 4, RoundingMode.HALF_UP)
.multiply(BigDecimal.valueOf(100))
.doubleValue();
// 时效得分(假设最长 30 天,最短 3 天)
double timeScore = (30.0 - channel.getAvgDeliveryDays()) / 27.0 * 100;
// 可靠性得分
double reliabilityScore = channel.getDeliveryRate() * 100;
// 综合评分
double totalScore = costScore * 0.4 + timeScore * 0.35 + reliabilityScore * 0.25;
return new ChannelRecommendation(channel, totalScore, freight);
})
.sorted(Comparator.comparing(ChannelRecommendation::getScore).reversed())
.collect(Collectors.toList());
}
实施效果:
- 渠道推荐准确率 85%+,商家采纳率 78%
- 平均运费降低 12%(相比人工选择)
- 渠道选择时间从 5 分钟降到 10 秒
- 规则引擎响应时间 < 100ms(Redis 缓存命中率 95%)
4.2 统一物流商 API 适配层设计
问题背景:
对接 20+ 家物流商,每家的 API 协议、数据格式、认证方式都不同。如果在业务代码中直接调用,会导致代码耦合严重,难以维护。新增物流商需要修改大量业务代码。
技术方案:
采用适配器模式(Adapter Pattern)设计统一适配层,屏蔽物流商差异。
核心设计:
- 定义统一接口:
public interface LogisticsCarrierAdapter {
CreateWaybillResponse createWaybill(CreateWaybillRequest request);
List<TrackNode> queryTrack(String trackingNo);
boolean cancelWaybill(String trackingNo);
byte[] getLabel(String trackingNo, LabelFormat format);
}
- 每个物流商实现自己的适配器:
@Component("dhlAdapter")
public class DHLAdapter implements LogisticsCarrierAdapter {
@Override
public CreateWaybillResponse createWaybill(CreateWaybillRequest request) {
DHLCreateShipmentRequest dhlRequest = convertToDHLRequest(request);
DHLCreateShipmentResponse dhlResponse = dhlApiClient.createShipment(dhlRequest);
return convertToStandardResponse(dhlResponse);
}
}
- 业务代码通过工厂获取适配器:
@Service
public class WaybillService {
@Autowired
private Map<String, LogisticsCarrierAdapter> adapterMap;
public void createWaybill(Long channelId, CreateWaybillRequest request) {
LogisticsChannel channel = channelMapper.selectById(channelId);
String adapterName = channel.getCarrierCode() + "Adapter";
LogisticsCarrierAdapter adapter = adapterMap.get(adapterName);
CreateWaybillResponse response = adapter.createWaybill(request);
saveWaybill(response);
}
}
实施效果:
- 新增物流商只需实现适配器,无需修改业务代码
- 物流商 API 变更只影响对应适配器
- 从对接一个新物流商需要 5 天降到 2 天
4.3 运单创建幂等性设计
问题背景:
运单创建涉及调用物流商 API,可能因为网络超时、服务重启等原因导致重复调用。如果不做幂等控制,会产生多个物流单号,造成资源浪费和数据混乱。
技术方案:
设计了两层幂等保护:TMS 侧幂等 + 物流商侧幂等。
TMS 侧幂等(防止重复提交):
@Transactional(rollbackFor = Exception.class)
public CreateWaybillResponse createWaybill(CreateWaybillRequest request) {
String lockKey = "waybill:create:" + request.getOrderNo();
String lockValue = UUID.randomUUID().toString();
// 1. 尝试获取分布式锁
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, 60, TimeUnit.SECONDS);
if (!locked) {
throw new BusinessException("运单创建中,请勿重复提交");
}
try {
// 2. 查询是否已创建
LogisticsWaybill existing = waybillMapper.selectOne(
new LambdaQueryWrapper<LogisticsWaybill>()
.eq(LogisticsWaybill::getOrderNo, request.getOrderNo())
);
if (existing != null) {
return buildResponse(existing);
}
// 3. 创建运单
return doCreateWaybill(request);
} finally {
// 4. 释放锁
releaseLock(lockKey, lockValue);
}
}
物流商侧幂等(防止重复下单):
调用物流商 API 时传递幂等键(client_reference_id),物流商根据幂等键去重。
DHLCreateShipmentRequest dhlRequest = DHLCreateShipmentRequest.builder()
.clientReferenceId(waybillNo) // TMS 运单号作为幂等键
.shipperInfo(shipperInfo)
.consigneeInfo(consigneeInfo)
.build();
实施效果:
- 运单重复创建率从 0.5% 降到 0
- 避免了资源浪费和数据混乱
- 提升了系统稳定性
4.4 物流状态机设计
问题背景:
运单状态流转复杂,有 10 种状态。如果不加控制,可能出现非法状态转换,比如”已签收”回退到”运输中”。需要设计状态机控制状态流转。
技术方案:
定义状态转换规则,只允许合法的状态转换。
运单状态定义:
public enum WaybillStatus {
WAIT_CREATE(0, "待创建"),
CREATING(1, "创建中"),
CREATED(2, "已创建"),
PICKED_UP(3, "已揽收"),
IN_TRANSIT(4, "运输中"),
ARRIVED_DEST(5, "到达目的国"),
OUT_FOR_DELIVERY(6, "派送中"),
DELIVERY_FAILED(7, "派送失败"),
EXCEPTION(8, "异常"),
DELIVERED(9, "已签收"),
CANCELED(10, "已取消");
}
状态转换规则:
private static final Map<Integer, Set<Integer>> ALLOWED_TRANSITIONS = Map.of(
0, Set.of(1, 10), // 待创建 → 创建中/已取消
1, Set.of(2, 8, 10), // 创建中 → 已创建/异常/已取消
2, Set.of(3, 8, 10), // 已创建 → 已揽收/异常/已取消
3, Set.of(4, 8), // 已揽收 → 运输中/异常
4, Set.of(5, 8), // 运输中 → 到达目的国/异常
5, Set.of(6, 8), // 到达目的国 → 派送中/异常
6, Set.of(7, 9, 8), // 派送中 → 派送失败/已签收/异常
7, Set.of(6, 8), // 派送失败 → 派送中/异常
8, Set.of(4, 6, 10) // 异常 → 运输中/派送中/已取消
);
状态转换校验:
public void updateStatus(Long waybillId, Integer newStatus) {
LogisticsWaybill waybill = waybillMapper.selectById(waybillId);
Integer oldStatus = waybill.getStatus();
// 校验状态转换是否合法
Set<Integer> allowedNext = ALLOWED_TRANSITIONS.get(oldStatus);
if (allowedNext == null || !allowedNext.contains(newStatus)) {
log.error("非法状态转换: {} → {}", oldStatus, newStatus);
throw new BusinessException("非法状态转换");
}
// 更新状态
waybill.setStatus(newStatus);
waybillMapper.updateById(waybill);
}
实施效果:
- 避免了非法状态转换
- 状态流转清晰可控
- 异常状态可追溯
4.5 XXL-Job 分片任务与游标分页
问题背景:
需要定时拉取 100 万+ 运单的物流轨迹,单机处理耗时太长。需要分布式并行处理,提高效率。
技术方案:
使用 XXL-Job 分片任务 + 游标分页。
XXL-Job 分片配置:
在 XXL-Job 管理后台配置任务,路由策略选择”分片广播”,执行器部署 4 个节点。任务触发时,4 个节点同时执行,每个节点处理 1/4 的数据。
分片查询逻辑:
@XxlJob("pullLogisticsTrack")
public void pullTrack() {
int shardIndex = XxlJobHelper.getShardIndex(); // 当前分片索引 0-3
int shardTotal = XxlJobHelper.getShardTotal(); // 总分片数 4
Long lastId = 0L;
int pageSize = 500;
while (true) {
List<LogisticsWaybill> page = waybillMapper.selectTrackPullPage(
shardTotal, shardIndex, lastId, pageSize);
if (page.isEmpty()) break;
// 处理这一页数据
for (LogisticsWaybill waybill : page) {
pullAndSaveTrack(waybill);
}
// 更新游标
lastId = page.get(page.size() - 1).getId();
}
}
游标分页 SQL:
SELECT * FROM tms_waybill
WHERE status IN (3,4,5,6,7)
AND last_track_pull_time < DATE_SUB(NOW(), INTERVAL 30 MINUTE)
AND id > #{lastId}
AND MOD(id, #{shardTotal}) = #{shardIndex}
ORDER BY id ASC
LIMIT #{pageSize};
为什么用游标分页而不是 OFFSET 分页?
OFFSET 分页在深分页时性能很差。比如 LIMIT 1000000, 500,MySQL 需要扫描前 100 万行再跳过,非常慢。
游标分页只需要 WHERE id > lastId,利用主键索引,性能稳定。
实施效果:
- 100 万运单轨迹拉取时间从 2 小时降到 30 分钟
- 4 个节点并行处理,吞吐量提升 4 倍
- 游标分页避免了深分页性能问题
4.6 物流轨迹异常检测与预警
问题背景:
物流过程中可能出现超时、停滞、异常等问题,需要及时发现并预警,避免客户投诉。
技术方案:
定义三种异常检测规则,在轨迹拉取时实时检测。
规则 1: 超时未更新
- 运单创建后 72 小时仍无轨迹 → 判定为”揽收超时”
- 运输中 7 天无新轨迹节点 → 判定为”轨迹超时”
public void checkTimeout(LogisticsWaybill waybill) {
LocalDateTime now = LocalDateTime.now();
// 检查揽收超时
if (waybill.getStatus() == WaybillStatus.CREATED.getCode()) {
long hours = ChronoUnit.HOURS.between(waybill.getCreateTime(), now);
if (hours >= 72) {
createAlert(waybill, "揽收超时", "运单创建 72 小时仍未揽收");
}
}
// 检查轨迹超时
if (waybill.getStatus() >= 3 && waybill.getStatus() <= 7) {
long days = ChronoUnit.DAYS.between(waybill.getLastTrackTime(), now);
if (days >= 7) {
createAlert(waybill, "轨迹超时", "运输中 7 天无新轨迹");
}
}
}
规则 2: 轨迹停滞
连续 3 个轨迹节点位置相同且时间跨度超过 48 小时 → 判定为”轨迹停滞”
public void checkStagnation(LogisticsWaybill waybill) {
List<TrackNode> recentNodes = trackMapper.selectRecentNodes(
waybill.getTrackingNo(), 3);
if (recentNodes.size() < 3) return;
// 检查位置是否相同
String location = recentNodes.get(0).getNodeLocation();
boolean sameLocation = recentNodes.stream()
.allMatch(node -> location.equals(node.getNodeLocation()));
if (!sameLocation) return;
// 检查时间跨度
LocalDateTime firstTime = recentNodes.get(2).getNodeTime();
LocalDateTime lastTime = recentNodes.get(0).getNodeTime();
long hours = ChronoUnit.HOURS.between(firstTime, lastTime);
if (hours >= 48) {
createAlert(waybill, "轨迹停滞",
String.format("连续 3 个节点位置相同(%s),时间跨度 %d 小时", location, hours));
}
}
规则 3: 异常节点
轨迹描述包含”退回”、“拒收”、“丢失”、“损坏”等关键词 → 判定为”异常节点”
private static final List<String> EXCEPTION_KEYWORDS = List.of(
"退回", "拒收", "丢失", "损坏", "扣关", "清关失败", "地址错误"
);
public void checkExceptionNode(TrackNode node) {
String desc = node.getNodeDesc();
for (String keyword : EXCEPTION_KEYWORDS) {
if (desc.contains(keyword)) {
createAlert(node.getWaybillNo(), "异常节点",
String.format("轨迹包含异常关键词: %s", keyword));
break;
}
}
}
预警通知:
检测到异常后,通过钉钉机器人推送告警消息,同时创建异常工单。
实施效果:
- 异常发现时效从平均 3 天降到 2 小时
- 客户投诉率降低 40%
- 异常处理效率提升 60%
4.7 物流商 API 调用治理
问题背景:
调用物流商 API 时,可能遇到限流、超时、服务不可用等问题。如果不做治理,会影响 TMS 系统稳定性。
技术方案:
使用 Sentinel 实现限流、熔断、线程池隔离。
1. 限流控制:
物流商 API 有调用频率限制,比如 DHL 限制每秒 10 次。我们用 Sentinel 限流,超过阈值直接拒绝。
@SentinelResource(
value = "dhl_create_waybill",
blockHandler = "handleBlock"
)
public CreateWaybillResponse createWaybill(CreateWaybillRequest request) {
return dhlApiClient.createShipment(request);
}
public CreateWaybillResponse handleBlock(CreateWaybillRequest request, BlockException ex) {
log.warn("DHL API 限流,请稍后重试");
throw new BusinessException("物流商接口繁忙,请稍后重试");
}
Sentinel 配置:每秒最多 10 次调用,超过则触发限流。
2. 熔断降级:
物流商 API 故障时,自动熔断,避免大量请求堆积。
@SentinelResource(
value = "dhl_query_track",
fallback = "queryTrackFallback"
)
public List<TrackNode> queryTrack(String trackingNo) {
return dhlApiClient.queryTrack(trackingNo);
}
public List<TrackNode> queryTrackFallback(String trackingNo, Throwable ex) {
log.error("DHL 轨迹查询失败,返回缓存数据", ex);
return getCachedTrack(trackingNo);
}
Sentinel 配置:1 分钟内异常率超过 50%,熔断 10 秒。
3. 线程池隔离:
不同物流商使用独立线程池,避免相互影响。
@Configuration
public class ThreadPoolConfig {
@Bean("dhlExecutor")
public ThreadPoolExecutor dhlExecutor() {
return new ThreadPoolExecutor(
10, 20, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
@Bean("fedexExecutor")
public ThreadPoolExecutor fedexExecutor() {
return new ThreadPoolExecutor(
10, 20, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
实施效果:
- 避免了物流商限流导致的接口报错
- 单个物流商故障不影响其他物流商
- 系统可用性从 99.5% 提升到 99.9%
4.8 RocketMQ 可靠消息与本地消息表
问题背景:
运单创建成功后需要通知 OMS,如果直接发 MQ 消息,可能因为网络问题导致消息丢失。需要保证消息可靠性。
技术方案:
使用本地消息表 + 定时任务补偿。
本地消息表设计:
CREATE TABLE tms_event_outbox (
id BIGINT PRIMARY KEY,
tenant_id BIGINT NOT NULL,
event_type VARCHAR(64) NOT NULL COMMENT '事件类型',
biz_id BIGINT NOT NULL COMMENT '业务ID(运单ID)',
payload JSON NOT NULL COMMENT '消息体',
status TINYINT NOT NULL DEFAULT 0 COMMENT '0待发送 1已发送 2发送失败',
retry_count INT NOT NULL DEFAULT 0,
next_retry_time DATETIME NOT NULL,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
INDEX idx_status_retry (status, next_retry_time)
);
发送消息流程:
@Transactional(rollbackFor = Exception.class)
public void createWaybill(CreateWaybillRequest request) {
// 1. 创建运单
LogisticsWaybill waybill = buildWaybill(request);
waybillMapper.insert(waybill);
// 2. 写入本地消息表(在同一事务中)
EventOutbox outbox = EventOutbox.builder()
.eventType("WAYBILL_CREATED")
.bizId(waybill.getId())
.payload(buildPayload(waybill))
.status(0)
.nextRetryTime(LocalDateTime.now())
.build();
outboxMapper.insert(outbox);
// 事务提交后,消息一定已保存
}
定时任务补偿:
@Scheduled(fixedDelay = 10000) // 每 10 秒执行一次
public void sendPendingMessages() {
List<EventOutbox> pending = outboxMapper.selectList(
new LambdaQueryWrapper<EventOutbox>()
.eq(EventOutbox::getStatus, 0)
.le(EventOutbox::getNextRetryTime, LocalDateTime.now())
.last("LIMIT 100")
);
for (EventOutbox outbox : pending) {
try {
// 发送 MQ 消息
rocketMQTemplate.syncSend("waybill-topic", outbox.getPayload());
// 更新状态为已发送
outbox.setStatus(1);
outboxMapper.updateById(outbox);
} catch (Exception e) {
// 发送失败,更新重试次数和下次重试时间
outbox.setRetryCount(outbox.getRetryCount() + 1);
outbox.setNextRetryTime(calculateNextRetryTime(outbox.getRetryCount()));
outbox.setStatus(outbox.getRetryCount() >= 5 ? 2 : 0);
outboxMapper.updateById(outbox);
}
}
}
实施效果:
- 消息可靠性 100%,不会因为网络问题丢失
- 即使 MQ 暂时不可用,消息也会保存在本地,稍后重试
- 定时任务补偿机制保证最终一致性
4.9 Redis 缓存策略
问题背景:
物流规则查询频繁,轨迹查询 QPS 高,需要缓存减少数据库压力。
技术方案:
针对不同场景设计不同的缓存策略。
1. 物流规则缓存(读多写少):
物流渠道配置、价格规则变更频率低,适合长时间缓存。
public LogisticsChannel getChannel(Long channelId) {
String key = "channel:info:" + channelId;
// 1. 查询缓存
LogisticsChannel cached = redisTemplate.opsForValue().get(key);
if (cached != null) {
return cached;
}
// 2. 查询数据库
LogisticsChannel channel = channelMapper.selectById(channelId);
// 3. 写入缓存,TTL 24 小时
redisTemplate.opsForValue().set(key, channel, 24, TimeUnit.HOURS);
return channel;
}
更新策略:Cache Aside 模式,先更新数据库,再删除缓存。
@Transactional(rollbackFor = Exception.class)
public void updateChannel(LogisticsChannel channel) {
channelMapper.updateById(channel);
// 事务提交后删除缓存
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
String key = "channel:info:" + channel.getId();
redisTemplate.delete(key);
}
}
);
}
2. 热点轨迹缓存(读多写多):
热门运单的轨迹查询频繁,但轨迹更新也频繁。
public List<TrackNode> getTrack(String trackingNo) {
String key = "waybill:track:" + trackingNo;
// 1. 查询缓存
List<TrackNode> cached = redisTemplate.opsForValue().get(key);
if (cached != null) {
return cached;
}
// 2. 查询数据库
List<TrackNode> track = trackMapper.selectByTrackingNo(trackingNo);
// 3. 写入缓存,TTL 30 分钟(与轨迹拉取频率一致)
redisTemplate.opsForValue().set(key, track, 30, TimeUnit.MINUTES);
return track;
}
更新策略:轨迹拉取后主动删除缓存,下次查询时重新加载。
3. 布隆过滤器防止缓存穿透:
防止查询不存在的运单号导致缓存穿透。
@PostConstruct
public void initBloomFilter() {
// 初始化布隆过滤器,预计 1000 万运单,误判率 0.01%
bloomFilter = BloomFilter.create(
Funnels.stringFunnel(Charset.defaultCharset()),
10000000,
0.0001
);
// 加载已有运单号
List<String> trackingNos = waybillMapper.selectAllTrackingNos();
trackingNos.forEach(bloomFilter::put);
}
public LogisticsWaybill getWaybill(String trackingNo) {
// 1. 布隆过滤器判断
if (!bloomFilter.mightContain(trackingNo)) {
return null; // 一定不存在
}
// 2. 查询缓存和数据库
return getFromCacheOrDB(trackingNo);
}
实施效果:
- 缓存命中率 95%+
- 数据库 QPS 从 5000 降到 250
- 接口响应时间从 50ms 降到 5ms
4.10 EasyExcel 批量导入与自动对账
问题背景:
物流商每月提供账单文件(Excel),包含几十万笔运单的费用数据。需要批量导入并自动对账。
技术方案:
使用 EasyExcel 批量导入 + SQL 自动对账。
EasyExcel 批量导入:
@Data
public class LogisticsBillRow {
@ExcelProperty("物流单号")
private String trackingNo;
@ExcelProperty("计费重量(g)")
private Integer chargeWeight;
@ExcelProperty("应收运费(元)")
private BigDecimal billAmount;
}
public class LogisticsBillImportListener extends AnalysisEventListener<LogisticsBillRow> {
private static final int BATCH_SIZE = 1000;
private final List<LogisticsBillRow> buffer = new ArrayList<>(BATCH_SIZE);
private final LogisticsBillService billService;
private final Long tenantId;
private final String billBatchNo;
@Override
public void invoke(LogisticsBillRow row, AnalysisContext context) {
// 数据校验
if (StringUtils.isBlank(row.getTrackingNo())) {
throw new BusinessException("物流单号不能为空");
}
buffer.add(row);
// 达到批次大小,批量插入
if (buffer.size() >= BATCH_SIZE) {
billService.batchImport(tenantId, billBatchNo, buffer);
buffer.clear();
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext context) {
// 处理剩余数据
if (!buffer.isEmpty()) {
billService.batchImport(tenantId, billBatchNo, buffer);
buffer.clear();
}
}
}
自动对账 SQL:
SELECT
w.waybill_no,
w.tracking_no,
w.freight_amount AS tms_amount,
b.bill_amount AS carrier_amount,
(b.bill_amount - w.freight_amount) AS diff_amount,
CASE
WHEN ABS(b.bill_amount - w.freight_amount) < 0.01 THEN '一致'
WHEN b.bill_amount > w.freight_amount THEN '物流商多收'
ELSE '物流商少收'
END AS diff_type
FROM tms_waybill w
INNER JOIN tms_logistics_bill b ON w.tracking_no = b.tracking_no
WHERE w.tenant_id = ? AND b.bill_batch_no = ?
HAVING ABS(diff_amount) >= 0.01;
差异分析:
public ReconciliationReport reconcile(Long tenantId, String billBatchNo) {
// 1. 执行对账 SQL
List<ReconciliationDetail> details = waybillMapper.reconcile(tenantId, billBatchNo);
// 2. 统计差异
int totalCount = details.size();
int matchCount = (int) details.stream()
.filter(d -> "一致".equals(d.getDiffType())).count();
BigDecimal totalDiff = details.stream()
.map(ReconciliationDetail::getDiffAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);
// 3. 生成报表
return ReconciliationReport.builder()
.totalCount(totalCount)
.matchCount(matchCount)
.diffCount(totalCount - matchCount)
.totalDiffAmount(totalDiff)
.details(details)
.build();
}
实施效果:
- 账单导入时间从 2 小时降到 10 分钟
- 对账准确率 100%
- 财务人工对账工作量减少 80%
数据库设计
5.1 核心表结构
运单表(tms_waybill):
CREATE TABLE tms_waybill (
id BIGINT PRIMARY KEY COMMENT '主键',
tenant_id BIGINT NOT NULL COMMENT '租户ID',
waybill_no VARCHAR(64) NOT NULL COMMENT '运单号',
order_no VARCHAR(64) NOT NULL COMMENT '订单号',
tracking_no VARCHAR(128) COMMENT '物流单号',
channel_id BIGINT NOT NULL COMMENT '物流渠道ID',
carrier_code VARCHAR(32) NOT NULL COMMENT '物流商编码',
status TINYINT NOT NULL COMMENT '状态 0待创建 2已创建 3已揽收 9已签收',
shipper_name VARCHAR(128) COMMENT '发件人姓名',
shipper_phone VARCHAR(32) COMMENT '发件人电话',
shipper_address VARCHAR(512) COMMENT '发件人地址',
consignee_name VARCHAR(128) COMMENT '收件人姓名',
consignee_phone VARCHAR(32) COMMENT '收件人电话',
consignee_address VARCHAR(512) COMMENT '收件人地址',
consignee_country VARCHAR(8) COMMENT '目的国',
estimated_weight INT COMMENT '预估重量(g)',
actual_weight INT COMMENT '实际重量(g)',
volumetric_weight INT COMMENT '材积重(g)',
charge_weight INT COMMENT '计费重量(g)',
freight_amount DECIMAL(10,2) COMMENT '运费金额',
label_url VARCHAR(512) COMMENT '面单URL',
last_track_time DATETIME COMMENT '最后轨迹时间',
last_track_pull_time DATETIME COMMENT '最后拉取时间',
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
UNIQUE KEY uk_waybill_no (waybill_no),
UNIQUE KEY uk_order_no (order_no),
KEY idx_tracking_no (tracking_no),
KEY idx_status (status),
KEY idx_create_time (create_time)
) COMMENT '运单表';
轨迹表(tms_track_202605):
CREATE TABLE tms_track_202605 (
id BIGINT PRIMARY KEY,
tenant_id BIGINT NOT NULL,
waybill_no VARCHAR(64) NOT NULL,
tracking_no VARCHAR(128) NOT NULL,
node_time DATETIME NOT NULL COMMENT '节点时间',
node_desc VARCHAR(512) NOT NULL COMMENT '节点描述',
node_location VARCHAR(256) COMMENT '节点位置',
node_status VARCHAR(32) COMMENT '节点状态码',
create_time DATETIME NOT NULL,
KEY idx_tracking_no (tracking_no),
KEY idx_node_time (node_time),
UNIQUE KEY uk_track (tracking_no, node_time, node_desc)
) COMMENT '轨迹表-按月分表';
物流渠道表(tms_logistics_channel):
CREATE TABLE tms_logistics_channel (
id BIGINT PRIMARY KEY,
channel_name VARCHAR(128) NOT NULL COMMENT '渠道名称',
carrier_code VARCHAR(32) NOT NULL COMMENT '物流商编码',
carrier_name VARCHAR(128) NOT NULL COMMENT '物流商名称',
dest_countries VARCHAR(512) COMMENT '目的国列表(逗号分隔)',
min_weight INT COMMENT '最小重量(g)',
max_weight INT COMMENT '最大重量(g)',
support_battery TINYINT COMMENT '是否支持带电',
support_liquid TINYINT COMMENT '是否支持液体',
avg_delivery_days INT COMMENT '平均妥投天数',
delivery_rate DECIMAL(5,4) COMMENT '妥投率',
status TINYINT NOT NULL COMMENT '状态 0禁用 1启用',
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
KEY idx_carrier (carrier_code),
KEY idx_status (status)
) COMMENT '物流渠道表';
价格规则表(tms_price_rule):
CREATE TABLE tms_price_rule (
id BIGINT PRIMARY KEY,
channel_id BIGINT NOT NULL,
dest_country VARCHAR(8) NOT NULL,
first_weight INT NOT NULL COMMENT '首重(g)',
first_price DECIMAL(10,2) NOT NULL COMMENT '首重价格',
续重_unit INT NOT NULL COMMENT '续重单位(g)',
续重_price DECIMAL(10,2) NOT NULL COMMENT '续重价格',
effective_date DATE NOT NULL COMMENT '生效日期',
expire_date DATE COMMENT '失效日期',
create_time DATETIME NOT NULL,
KEY idx_channel_country (channel_id, dest_country),
KEY idx_effective (effective_date, expire_date)
) COMMENT '价格规则表';
本地消息表(tms_event_outbox):
CREATE TABLE tms_event_outbox (
id BIGINT PRIMARY KEY,
tenant_id BIGINT NOT NULL,
event_type VARCHAR(64) NOT NULL,
biz_id BIGINT NOT NULL,
payload JSON NOT NULL,
status TINYINT NOT NULL DEFAULT 0 COMMENT '0待发送 1已发送 2失败',
retry_count INT NOT NULL DEFAULT 0,
next_retry_time DATETIME NOT NULL,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
KEY idx_status_retry (status, next_retry_time)
) COMMENT '本地消息表';
业务面试题
Q1: TMS 在供应链中的作用是什么?
A1: TMS 是连接 WMS 和物流商的桥梁,负责三个核心职责:
第一,统一物流管理。对接 20+ 家物流商,提供统一的运单创建、轨迹追踪、费用核算接口,让商家不用对接每个物流商。
第二,智能渠道推荐。根据目的国、重量、时效要求等,自动推荐最优物流渠道,帮商家降低运费、提升时效。
第三,物流可视化。实时追踪物流轨迹,异常检测与预警,让商家和客户随时了解包裹位置。
Q2: 为什么需要智能渠道推荐?
A2: 因为跨境电商有 100+ 个物流渠道,不同渠道的价格、时效、可靠性差异很大。
举个例子:发美国的 1kg 包裹,DHL 快线 3 天到但运费 80 元,燕文经济线 15 天到但运费 30 元,顺丰国际 7 天到运费 50 元。商家很难判断选哪个。
智能推荐系统会综合考虑成本、时效、可靠性,计算综合评分,推荐最优渠道。比如客户要求 7 天内到,系统会推荐顺丰国际;如果不着急,会推荐燕文经济线省钱。
这样商家不用花时间研究每个渠道,系统自动推荐,提高效率,降低成本。
Q3: 运单创建为什么需要幂等性?
A3: 因为运单创建涉及调用物流商 API,可能因为网络超时、服务重启等原因导致重复调用。
举个例子:商家点击”创建运单”,TMS 调用 DHL API,DHL 已经创建了运单并返回物流单号,但网络超时,TMS 没收到响应。商家以为失败了,再点一次,就会重复创建运单,产生两个物流单号。
这会导致两个问题:第一,浪费物流商资源,可能被收两次运费。第二,数据混乱,不知道哪个物流单号是真的。
所以我们设计了两层幂等:TMS 侧用分布式锁防止重复提交,物流商侧用幂等键(运单号)防止重复下单。
Q4: 为什么轨迹表要按月分表?
A4: 因为轨迹数据量非常大,单表存不下。
我们日均运单 8 万单,每单平均 10 个轨迹节点,一天就是 80 万条轨迹。一年就是 2.9 亿条,单表太大,查询性能会很差。
按月分表后,单表数据量控制在 2000 万以内,查询性能稳定。而且轨迹查询一般只查最近的,很少查几个月前的,按月分表符合业务特点。
分表策略是按轨迹时间分,不是按运单创建时间分。因为查询轨迹时,是按物流单号查,需要知道轨迹在哪个月的表里。我们在运单表记录 last_track_time,查询时根据这个时间确定分表。
Q5: 物流费用对账为什么会有差异?
A5: 对账差异主要有四个原因:
第一,重量差异。TMS 使用的是预估重量,物流商实际称重可能不一样。比如 TMS 预估 800g,物流商称重 850g,运费就不一样了。
第二,价格规则变更。物流商调整了价格,但 TMS 没及时更新。比如物流商 5 月 1 日涨价,但 TMS 还用旧价格计算,就会有差异。
第三,附加费用。物流商收取了偏远地区附加费、燃油附加费、关税等,但 TMS 没计算这些费用。
第四,数据错误。物流商账单数据错误,或者 TMS 计算错误。
所以对账不是简单的金额对比,还要分析差异原因,找出是哪个环节出了问题。
Q6: 为什么要用 XXL-Job 分片任务?
A6: 因为需要定时拉取 100 万+ 运单的物流轨迹,单机处理太慢。
如果单机处理,假设每个运单拉取轨迹需要 100ms,100 万运单就是 10 万秒,约 28 小时。这显然不行,轨迹更新太慢了。
用 XXL-Job 分片任务,可以部署 4 个执行器节点,每个节点处理 1/4 的数据,并行执行。这样 28 小时变成 7 小时,还是太慢。
所以我们还优化了拉取逻辑:只拉取”运输中”状态的运单,已签收的不拉;距离上次拉取超过 30 分钟的才拉,刚拉过的不拉。这样实际需要拉取的运单只有 20 万左右,4 个节点并行,30 分钟就能完成。
Q7: 物流轨迹异常检测有什么用?
A7: 异常检测可以提前发现物流问题,避免客户投诉。
举个例子:包裹在海关卡了 5 天,如果不检测,客户可能等了 10 天还没收到货,就会投诉。但如果我们检测到”轨迹停滞”异常,第 3 天就发现了,主动联系客户说明情况,客户就不会投诉了。
我们定义了三种异常:超时未更新(72 小时无轨迹)、轨迹停滞(连续 3 个节点位置相同超过 48 小时)、异常节点(包含”退回”、“拒收”等关键词)。
检测到异常后,系统自动创建工单,推送钉钉告警,运营人员及时处理。这样异常发现时效从平均 3 天降到 2 小时,客户投诉率降低 40%。
Q8: 为什么要用本地消息表?
A8: 因为直接发 MQ 消息可能丢失,本地消息表保证消息可靠性。
举个例子:运单创建成功,需要通知 OMS。如果直接发 MQ 消息,可能因为网络问题、MQ 服务不可用等原因导致消息发送失败。这样 OMS 就不知道运单创建了,订单状态不会更新。
用本地消息表后,在运单创建的事务中,同时写入本地消息表。事务提交后,消息一定已保存。然后定时任务扫描本地消息表,发送 MQ 消息。如果发送失败,会自动重试,直到成功。
这样保证了消息可靠性,即使 MQ 暂时不可用,消息也不会丢失。
Q9: 为什么要用适配器模式对接物流商?
A9: 因为每个物流商的 API 协议、数据格式、认证方式都不同,如果在业务代码中直接调用,会导致代码耦合严重,难以维护。
举个例子:DHL 用 REST API + JSON,FedEx 用 SOAP + XML,顺丰用 HTTP + 自定义格式。如果业务代码直接调用,就要写三套不同的代码,而且新增物流商需要修改业务代码。
用适配器模式后,定义统一接口 LogisticsCarrierAdapter,包含 createWaybill()、queryTrack() 等方法。每个物流商实现自己的适配器,负责协议转换、数据格式转换、业务逻辑适配。
业务代码只调用统一接口,不关心底层实现。新增物流商只需实现适配器,无需修改业务代码。从对接一个新物流商需要 5 天降到 2 天。
Q10: 物流费用怎么计算?
A10: 物流费用采用”首重+续重+材积重”计费模型。
第一步,计算材积重:材积重 = 长 × 宽 × 高 / 5000(单位:cm 和 g)。
第二步,确定计费重量:取实际重量和材积重的较大值。
第三步,根据价格规则计算运费:
- 如果计费重量 ≤ 首重:
运费 = 首重价格 - 如果计费重量 > 首重:
运费 = 首重价格 + CEIL((计费重量 - 首重) / 续重单位) × 续重价格
举个例子:包裹实际重量 800g,尺寸 30×20×15cm,材积重 = 30×20×15/5000 = 1800g。计费重量取较大值 1800g。
价格规则:首重 500g 价格 50 元,续重每 100g 价格 8 元。
运费 = 50 + CEIL((1800 - 500) / 100) × 8 = 50 + 13 × 8 = 154 元。
技术面试题
Q1: 如何保证运单创建的幂等性?
A1: 我们设计了两层幂等保护:
第一层,TMS 侧幂等。用 Redis SETNX 实现分布式锁,Key 是 waybill:create:{order_no},TTL 60 秒。如果 SETNX 返回 false,说明已经有线程在创建运单了,直接返回”创建中”。同时查询数据库,判断该订单号是否已经创建过运单,如果已创建,直接返回已有运单信息。
第二层,物流商侧幂等。调用物流商 API 时传递幂等键(client_reference_id),我们用 TMS 运单号作为幂等键。物流商根据幂等键去重,如果已经创建过,直接返回已有的物流单号。
这样即使网络超时导致重复调用,也不会产生多个物流单号。
Q2: 如何实现物流状态机?
A2: 我们定义了状态转换规则,只允许合法的状态转换。
首先,定义 10 种运单状态:待创建、创建中、已创建、已揽收、运输中、到达目的国、派送中、派送失败、异常、已签收、已取消。
然后,定义状态转换规则,用 Map 存储:Map<Integer, Set<Integer>> ALLOWED_TRANSITIONS。Key 是当前状态,Value 是允许转换到的状态集合。
比如”已签收”状态,不允许转换到任何其他状态,所以 Value 是空集合。“派送中”状态,允许转换到”派送失败”、“已签收”、“异常”,所以 Value 是 Set.of(7, 9, 8)。
更新状态时,先查询当前状态,再判断新状态是否在允许转换的集合中。如果不在,抛异常,记录日志并告警。
这样避免了非法状态转换,状态流转清晰可控。
Q3: XXL-Job 分片任务如何实现?
A3: XXL-Job 分片任务通过取模实现数据分片。
首先,在 XXL-Job 管理后台配置任务,路由策略选择”分片广播”,执行器部署 4 个节点。
然后,在任务代码中获取分片参数:int shardIndex = XxlJobHelper.getShardIndex(),int shardTotal = XxlJobHelper.getShardTotal()。比如 4 个节点,shardIndex 分别是 0、1、2、3,shardTotal 都是 4。
最后,在查询 SQL 中加上分片条件:WHERE MOD(id, #{shardTotal}) = #{shardIndex}。这样每个节点只查询自己负责的数据。
比如运单 ID 是 1001,MOD(1001, 4) = 1,就由 shardIndex=1 的节点处理。运单 ID 是 1002,MOD(1002, 4) = 2,就由 shardIndex=2 的节点处理。
这样 4 个节点并行处理,互不干扰,吞吐量提升 4 倍。
Q4: 为什么用游标分页而不是 OFFSET 分页?
A4: 因为 OFFSET 分页在深分页时性能很差。
OFFSET 分页的原理是:LIMIT 1000000, 500,MySQL 需要扫描前 100 万行,然后跳过,再返回 500 行。数据量越大,OFFSET 越大,性能越差。
游标分页的原理是:WHERE id > lastId ORDER BY id ASC LIMIT 500,利用主键索引,直接定位到 lastId 之后的数据,不需要扫描前面的数据。
举个例子:查询第 1000 页,每页 500 条。OFFSET 分页:LIMIT 499500, 500,需要扫描 49.95 万行。游标分页:WHERE id > 499500 LIMIT 500,直接定位,只扫描 500 行。
所以游标分页性能稳定,不受数据量和页码影响。我们用游标分页拉取轨迹,100 万运单,30 分钟完成。
Q5: 如何实现物流轨迹异常检测?
A5: 我们定义了三种异常检测规则:
第一,超时未更新。运单创建后 72 小时仍无轨迹,判定为”揽收超时”。运输中 7 天无新轨迹节点,判定为”轨迹超时”。实现方式是比较当前时间和最后轨迹时间的差值。
第二,轨迹停滞。连续 3 个轨迹节点位置相同且时间跨度超过 48 小时,判定为”轨迹停滞”。实现方式是查询最近 3 个轨迹节点,判断位置是否相同,计算时间跨度。
第三,异常节点。轨迹描述包含”退回”、“拒收”、“丢失”、“损坏”等关键词,判定为”异常节点”。实现方式是用关键词列表匹配轨迹描述。
检测到异常后,系统自动创建异常工单,推送钉钉告警,通知运营人员处理。同时通过 MQ 通知 OMS,OMS 可以主动联系客户说明情况。
Q6: 如何保证 RocketMQ 消息的可靠性?
A6: 我们用本地消息表保证消息可靠性。
第一步,在运单创建的事务中,同时写入本地消息表。消息表包含事件类型、业务 ID、消息体、状态等字段。
第二步,事务提交后,定时任务扫描本地消息表,查询状态为”待发送”且到达重试时间的消息。
第三步,发送 MQ 消息。如果发送成功,更新消息状态为”已发送”。如果发送失败,更新重试次数和下次重试时间。重试次数超过 5 次,标记为”失败”,人工介入处理。
这样即使 MQ 暂时不可用,消息也会保存在本地,稍后重试,保证消息不丢失。
Q7: Redis 缓存如何保证一致性?
A7: 我们采用 Cache Aside 模式:先更新数据库,再删除缓存。
具体流程是:
第一步,更新数据库。在事务中执行更新操作。
第二步,删除缓存。在事务提交后删除缓存。我们用 Spring 的 TransactionSynchronizationManager 注册回调,在事务提交后执行删除缓存的操作。
为什么要在事务提交后删除缓存?如果在事务提交前删除缓存,可能出现这种情况:线程 A 删除了缓存,线程 B 查询缓存未命中,从数据库加载旧数据到缓存,线程 A 事务提交,数据库更新。结果:缓存是旧数据,数据库是新数据,不一致了。
所以必须在事务提交后删除缓存。
为什么是删除缓存,而不是更新缓存?因为物流规则的计算逻辑复杂,如果更新缓存,需要重新计算所有字段。删除缓存后,下次查询时自动从数据库加载最新数据,计算逻辑在查询时统一处理,不容易出错。
Q8: 如何实现 Sentinel 限流和熔断?
A8: 我们用 Sentinel 对物流商 API 调用进行限流和熔断。
限流配置:每秒最多 10 次调用,超过则触发限流。用 @SentinelResource 注解标记方法,配置 blockHandler 处理限流逻辑。
熔断配置:1 分钟内异常率超过 50%,熔断 10 秒。用 @SentinelResource 注解标记方法,配置 fallback 处理熔断逻辑。
线程池隔离:不同物流商使用独立线程池,避免相互影响。每个物流商配置独立的 ThreadPoolExecutor,核心线程 10,最大线程 20,队列 100。
这样避免了物流商限流导致的接口报错,单个物流商故障不影响其他物流商,系统可用性从 99.5% 提升到 99.9%。
Q9: EasyExcel 如何实现批量导入?
A9: EasyExcel 用监听器模式实现批量导入。
第一步,定义实体类,用 @ExcelProperty 注解映射 Excel 列。
第二步,实现 AnalysisEventListener 监听器,重写 invoke() 方法处理每一行数据。
第三步,在 invoke() 方法中,将数据加入缓冲区。当缓冲区达到批次大小(比如 1000 条),批量插入数据库,清空缓冲区。
第四步,重写 doAfterAllAnalysed() 方法,处理剩余数据。
这样避免了一次性加载所有数据到内存,防止内存溢出。批量插入提高了数据库写入效率。
我们用 EasyExcel 导入物流商账单,几十万笔数据,10 分钟完成。
Q10: 如何实现自动对账?
A10: 我们用 SQL 关联查询实现自动对账。
第一步,将 TMS 运单表和物流商账单表按物流单号关联。
第二步,计算差异金额:diff_amount = 账单金额 - TMS 金额。
第三步,判断差异类型:差异金额绝对值 < 0.01 元判定为”一致”,差异金额 > 0 判定为”物流商多收”,差异金额 < 0 判定为”物流商少收”。
第四步,只返回有差异的记录:HAVING ABS(diff_amount) >= 0.01。
对账结果生成报表,包含总笔数、一致笔数、差异笔数、总差异金额、差异明细等。差异金额超过阈值的,自动创建工单,要求运营人员核实处理。
这样财务人工对账工作量减少 80%,对账准确率 100%。
简历描述
项目描述
项目:跨境电商 SaaS 供应链管理平台 - 物流管理模块(TMS)
时间:2024.06 - 2024.12
角色:核心开发
技术:Spring Boot 3.2、MyBatis-Plus、MySQL 8.0、Redis 7.0、RocketMQ 5.0、
XXL-Job 2.4、Sentinel 1.8、EasyExcel 3.3
这是个跨境电商的供应链 SaaS 平台,我负责物流管理模块。系统对接 20+ 家物流
承运商(DHL、FedEx、UPS、顺丰国际等),提供统一的运单创建、轨迹追踪、费用
核算接口。日均运单量 8 万+,轨迹节点 50 万+,峰值 QPS 5000+。
核心难点:
1. 多物流商 API 对接,协议和数据格式差异大
2. 智能渠道推荐,综合考虑成本、时效、可靠性
3. 大规模轨迹数据的定时拉取和异常检测
4. 物流费用自动核算和对账
5. 高并发场景下的幂等性和可靠性保证
核心亮点(简历版)
1. 智能规则引擎与渠道推荐算法
设计了两阶段推荐算法:过滤 + 评分。第一阶段根据目的国、重量范围、特殊属性等硬性条件过滤候选渠道。第二阶段计算综合评分,公式:综合评分 = 成本得分 × 40% + 时效得分 × 35% + 可靠性得分 × 25%。渠道推荐准确率 85%+,平均运费降低 12%,规则引擎响应时间 < 100ms。
2. 统一物流商 API 适配层设计
采用适配器模式对接 20+ 家物流商,定义统一接口 LogisticsCarrierAdapter,每个物流商实现自己的适配器,负责协议转换、数据格式转换、业务逻辑适配。新增物流商只需实现适配器,无需修改业务代码。从对接一个新物流商需要 5 天降到 2 天。
3. 运单创建幂等性与物流状态机
设计了两层幂等保护:TMS 侧用 Redis SETNX 分布式锁,物流商侧用幂等键(运单号)。定义 10 种运单状态和状态转换规则,用状态机控制状态流转,只允许合法的状态转换。运单重复创建率从 0.5% 降到 0。
4. XXL-Job 分片任务与游标分页
使用 XXL-Job 分片任务,4 个执行器节点并行处理。用游标分页(WHERE id > lastId)代替 OFFSET 分页,避免深分页性能问题。100 万运单轨迹拉取时间从 2 小时降到 30 分钟。
5. 物流轨迹异常检测与预警
定义三种异常检测规则:超时未更新、轨迹停滞、异常节点。检测到异常后,自动创建工单,推送钉钉告警。异常发现时效从平均 3 天降到 2 小时,客户投诉率降低 40%。
6. RocketMQ 可靠消息与本地消息表
使用本地消息表保证消息可靠性。在运单创建的事务中,同时写入本地消息表。定时任务扫描本地消息表,发送 MQ 消息,失败自动重试。消息可靠性 100%。
7. Redis 缓存策略与布隆过滤器
针对不同场景设计不同的缓存策略:物流规则缓存 TTL 24 小时,热点轨迹缓存 TTL 30 分钟。用布隆过滤器防止缓存穿透。缓存命中率 95%+,接口响应时间从 50ms 降到 5ms。
8. Sentinel 限流熔断与线程池隔离
使用 Sentinel 对物流商 API 调用进行限流和熔断。不同物流商使用独立线程池,避免相互影响。系统可用性从 99.5% 提升到 99.9%。
9. EasyExcel 批量导入与自动对账
使用 EasyExcel 批量导入物流商账单,监听器模式 + 批量插入,避免内存溢出。用 SQL 关联查询实现自动对账,生成差异报表。账单导入时间从 2 小时降到 10 分钟,财务人工对账工作量减少 80%。
关键代码示例
1. 智能渠道推荐
public List<ChannelRecommendation> recommendChannels(ShipmentRequest request) {
// 1. 过滤候选渠道
List<LogisticsChannel> channels = channelMapper.selectList(
new LambdaQueryWrapper<LogisticsChannel>()
.eq(LogisticsChannel::getStatus, 1)
.apply("FIND_IN_SET({0}, dest_countries)", request.getDestCountry())
.le(LogisticsChannel::getMinWeight, request.getWeight())
.ge(LogisticsChannel::getMaxWeight, request.getWeight())
);
// 2. 计算综合评分
return channels.stream().map(channel -> {
BigDecimal freight = calculateFreight(channel, request.getWeight());
double costScore = calculateCostScore(freight, channels);
double timeScore = (30.0 - channel.getAvgDeliveryDays()) / 27.0 * 100;
double reliabilityScore = channel.getDeliveryRate() * 100;
double totalScore = costScore * 0.4 + timeScore * 0.35 + reliabilityScore * 0.25;
return new ChannelRecommendation(channel, totalScore, freight);
})
.sorted(Comparator.comparing(ChannelRecommendation::getScore).reversed())
.limit(3)
.collect(Collectors.toList());
}
2. 统一物流商适配器
public interface LogisticsCarrierAdapter {
CreateWaybillResponse createWaybill(CreateWaybillRequest request);
List<TrackNode> queryTrack(String trackingNo);
boolean cancelWaybill(String trackingNo);
byte[] getLabel(String trackingNo, LabelFormat format);
}
@Component("dhlAdapter")
public class DHLAdapter implements LogisticsCarrierAdapter {
@Override
public CreateWaybillResponse createWaybill(CreateWaybillRequest request) {
DHLCreateShipmentRequest dhlRequest = convertToDHLRequest(request);
DHLCreateShipmentResponse dhlResponse = dhlApiClient.createShipment(dhlRequest);
return convertToStandardResponse(dhlResponse);
}
}
3. 运单创建幂等性
@Transactional(rollbackFor = Exception.class)
public CreateWaybillResponse createWaybill(CreateWaybillRequest request) {
String lockKey = "waybill:create:" + request.getOrderNo();
String lockValue = UUID.randomUUID().toString();
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, 60, TimeUnit.SECONDS);
if (!locked) {
throw new BusinessException("运单创建中,请勿重复提交");
}
try {
LogisticsWaybill existing = waybillMapper.selectOne(
new LambdaQueryWrapper<LogisticsWaybill>()
.eq(LogisticsWaybill::getOrderNo, request.getOrderNo())
);
if (existing != null) {
return buildResponse(existing);
}
return doCreateWaybill(request);
} finally {
releaseLock(lockKey, lockValue);
}
}
4. 物流状态机
private static final Map<Integer, Set<Integer>> ALLOWED_TRANSITIONS = Map.of(
0, Set.of(1, 10), // 待创建 → 创建中/已取消
1, Set.of(2, 8, 10), // 创建中 → 已创建/异常/已取消
2, Set.of(3, 8, 10), // 已创建 → 已揽收/异常/已取消
3, Set.of(4, 8), // 已揽收 → 运输中/异常
4, Set.of(5, 8), // 运输中 → 到达目的国/异常
5, Set.of(6, 8), // 到达目的国 → 派送中/异常
6, Set.of(7, 9, 8), // 派送中 → 派送失败/已签收/异常
7, Set.of(6, 8), // 派送失败 → 派送中/异常
8, Set.of(4, 6, 10) // 异常 → 运输中/派送中/已取消
);
public void updateStatus(Long waybillId, Integer newStatus) {
LogisticsWaybill waybill = waybillMapper.selectById(waybillId);
Integer oldStatus = waybill.getStatus();
Set<Integer> allowedNext = ALLOWED_TRANSITIONS.get(oldStatus);
if (allowedNext == null || !allowedNext.contains(newStatus)) {
throw new BusinessException("非法状态转换");
}
waybill.setStatus(newStatus);
waybillMapper.updateById(waybill);
}
5. XXL-Job 分片任务
@XxlJob("pullLogisticsTrack")
public void pullTrack() {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
Long lastId = 0L;
int pageSize = 500;
while (true) {
List<LogisticsWaybill> page = waybillMapper.selectList(
new LambdaQueryWrapper<LogisticsWaybill>()
.in(LogisticsWaybill::getStatus, 3, 4, 5, 6, 7)
.lt(LogisticsWaybill::getLastTrackPullTime,
LocalDateTime.now().minusMinutes(30))
.gt(LogisticsWaybill::getId, lastId)
.apply("MOD(id, {0}) = {1}", shardTotal, shardIndex)
.orderByAsc(LogisticsWaybill::getId)
.last("LIMIT " + pageSize)
);
if (page.isEmpty()) break;
for (LogisticsWaybill waybill : page) {
pullAndSaveTrack(waybill);
}
lastId = page.get(page.size() - 1).getId();
}
}
6. 轨迹异常检测
public void checkTimeout(LogisticsWaybill waybill) {
LocalDateTime now = LocalDateTime.now();
if (waybill.getStatus() == WaybillStatus.CREATED.getCode()) {
long hours = ChronoUnit.HOURS.between(waybill.getCreateTime(), now);
if (hours >= 72) {
createAlert(waybill, "揽收超时", "运单创建 72 小时仍未揽收");
}
}
if (waybill.getStatus() >= 3 && waybill.getStatus() <= 7) {
long days = ChronoUnit.DAYS.between(waybill.getLastTrackTime(), now);
if (days >= 7) {
createAlert(waybill, "轨迹超时", "运输中 7 天无新轨迹");
}
}
}
7. 本地消息表
@Transactional(rollbackFor = Exception.class)
public void createWaybill(CreateWaybillRequest request) {
LogisticsWaybill waybill = buildWaybill(request);
waybillMapper.insert(waybill);
EventOutbox outbox = EventOutbox.builder()
.eventType("WAYBILL_CREATED")
.bizId(waybill.getId())
.payload(buildPayload(waybill))
.status(0)
.nextRetryTime(LocalDateTime.now())
.build();
outboxMapper.insert(outbox);
}
@Scheduled(fixedDelay = 10000)
public void sendPendingMessages() {
List<EventOutbox> pending = outboxMapper.selectList(
new LambdaQueryWrapper<EventOutbox>()
.eq(EventOutbox::getStatus, 0)
.le(EventOutbox::getNextRetryTime, LocalDateTime.now())
.last("LIMIT 100")
);
for (EventOutbox outbox : pending) {
try {
rocketMQTemplate.syncSend("waybill-topic", outbox.getPayload());
outbox.setStatus(1);
outboxMapper.updateById(outbox);
} catch (Exception e) {
outbox.setRetryCount(outbox.getRetryCount() + 1);
outbox.setNextRetryTime(calculateNextRetryTime(outbox.getRetryCount()));
outbox.setStatus(outbox.getRetryCount() >= 5 ? 2 : 0);
outboxMapper.updateById(outbox);
}
}
}
8. EasyExcel 批量导入
public class LogisticsBillImportListener extends AnalysisEventListener<LogisticsBillRow> {
private static final int BATCH_SIZE = 1000;
private final List<LogisticsBillRow> buffer = new ArrayList<>(BATCH_SIZE);
@Override
public void invoke(LogisticsBillRow row, AnalysisContext context) {
buffer.add(row);
if (buffer.size() >= BATCH_SIZE) {
billService.batchImport(tenantId, billBatchNo, buffer);
buffer.clear();
}
}
@Override
public void doAfterAllAnalysed(AnalysisContext context) {
if (!buffer.isEmpty()) {
billService.batchImport(tenantId, billBatchNo, buffer);
buffer.clear();
}
}
}