1. 为什么要把数据放在 Kafka 和 Redis 分开存储?
先说结论:
这个项目不是“随便用了两个中间件”,而是根据两类数据的业务特征,做了明确分工:
Kafka:承接更偏“区块级、批量、可回放、可重消费”的数据Redis PubSub:承接更偏“低延迟、实时、轻量、直接推送”的数据
这背后的核心不是技术偏好,而是链上监控场景里两种完全不同的数据形态。
1.1 Kafka 和 Redis 的存储特性差异
先从中间件本身说。
Kafka 更适合什么
Kafka 的典型特性是:
- 持久化存储,消息会落盘
- 支持消费位点管理,可以重放、补消费
- 天然适合高吞吐批量数据
- 分区内有顺序
- 很适合做“主消息总线”
所以在区块链场景里,Kafka 很适合放:
- 区块级批量解析结果
- 需要回溯、补数的数据
- 需要多个消费组分别消费的数据
- 比较大的结构化批量消息
Redis PubSub 更适合什么
Redis PubSub 的典型特性是:
- 延迟低
- 发布订阅简单
- 不强调消息持久化和重放
- 更适合做“实时广播型消息”
- 更像“当前有一条新事件,赶紧推给在线消费者”
所以它更适合放:
- 刚刚发生的实时 swap 事件
- 更轻量的单交易事件
- 对延迟敏感、对历史回放没那么强依赖的数据
1.2 为什么这个项目必须分开存
因为它处理的是两种不同层次的数据。
Kafka 里放的是“区块级批量数据”
Kafka 路径里解码后的数据结构是 BlockTxData:
type BlockTxData struct {
Chain string `json:"Chain"`
BlockNumber int64 `json:"BlockNumber"`
Swaps []SwapInfo `json:"Swaps"`
TxBalances []TxBalance `json:"TxBalances"`
}这里可以看到,Kafka 一条消息不是“一笔 swap”,而是:
- 一条链
- 一个区块
- 区块里的多笔
Swaps - 对应交易的余额变化
TxBalances
这类数据天然有几个特征:
- 数据量更大
- 更适合批量传输
- 很适合做持久化留存
- 如果消费者挂了,需要重放补消费
- 后续可能还会做离线分析或别的下游消费
所以这种“区块级批量数据”放 Kafka 很合理。
其中 TxBalance 结构是:
type TxBalance struct {
TxHash string `json:"tx_hash"`
BalanceChanges map[string]string `json:"balance_changes"`
}它是按 txHash -> token 余额变化 来补充交易语义的。
Redis 里放的是“单笔交易级实时 swap 数据”
Redis 路径里解码后的结构是 SwapData,里面包的是 SwapMessage:
type SwapData struct {
Message *SwapMessage `json:"message"`
Version int `json:"version"`
}
type SwapMessage struct {
Chain string `json:"chain"`
Block int64 `json:"block"`
TxIndex int `json:"tx_index"`
TxHash string `json:"tx_hash"`
Swaps []SwapInfo `json:"swaps"`
BalanceChanges map[string]string `json:"balance_changes"`
Protocol string `json:"protocol"`
UnixTime int64 `json:"unix_time"`
}这说明 Redis 传的不是整块批量数据,而是更细粒度的“单个交易级 swap 事件”。
它的业务特征是:
- 粒度更细
- 到达更实时
- 更适合直接推监控系统
- 对延迟比对回放更敏感
所以这个项目把 Redis 放在“实时快路径”上,是合理的。
1.3 两边具体存的“数据内容”是什么
你面试时最好直接这样区分:
Kafka 侧存的内容
Kafka 中的消息更像“区块解析产物”,包含:
ChainBlockNumber- 区块中所有
Swaps - 每笔交易的
TxBalances
也就是:一个区块里发生了哪些 swap,这些 swap 对应交易的净余额变化是什么
适合做:
- 批量消费
- 补数重放
- 多个下游消费组复用
Redis 侧存的内容
Redis 中的消息更像“单笔交易的实时交换事件”,包含:
- 交易所属链、区块、索引、哈希
- 当前交易中的
Swaps - 当前交易的
BalanceChanges ProtocolUnixTime
也就是:这笔交易刚刚发生了,它里面有哪些 swap 操作,净余额变化是什么
适合做:
- 低延迟监控
- 迅速触发告警或跟单逻辑
- 实时性优先的业务流
1.4 两边共用的业务基础数据结构:SwapInfo
不管来自 Kafka 还是 Redis,最终都会落到统一的 SwapInfo 结构上,这是整个统一流水线的关键。
type SwapInfo struct {
Pair string `json:"pair"`
Token string `json:"token"`
BaseToken string `json:"base_token"`
TokenType string `json:"token_type"`
Chain string `json:"c"`
Op string `json:"op"`
Price string `json:"p"`
BasePrice string `json:"bp"`
Volume string `json:"v"`
BaseTokenAmount string `json:"base_token_amount"`
TxHash string `json:"tx"`
Timestamp int64 `json:"t"`
Signer string `json:"s"`
Rank int `json:"r"`
Attacked int `json:"attacked"`
Dex string `json:"dex"`
TokenSymbol string `json:"ts"`
TokenDecimals int `json:"tdec"`
BaseTokenSymbol string `json:"bts"`
BaseTokenDecimals int `json:"btdec"`
Block int64 `json:"block"`
Index int `json:"index"`
LogIndex int `json:"log_index"`
Protocol string `json:"protocol"`
}所以你可以理解成:
Kafka和Redis的外层包装不同- 但它们的业务核心载荷都在努力对齐到
SwapInfo + BalanceChanges
这就是“统一流水线”能成立的前提。
2. 这个流水线具体如何实现的?如何扩展新链、新消息类型?
这里建议你脑子里记住一句话:
源头不同,入口不同;中间抽象统一;下游处理复用
2.1 整体流水线长什么样
以 swap_records_monitor.go 为例,整体流程是:
- Kafka 消费区块级数据
- Redis 订阅实时 swap 数据
- 各自做解码和去重
- 输出到同一个
ChannelSink - 由
MergeChannelProducer把消息写入mergeChan ConsumeChanMsgFactory从mergeChan取消息,再执行统一 Processor- 后续进入规则检查、告警推送等
代码上非常清楚:
channelSink := processor_pipline.NewChannelSink(10000)
mergeChan := make(chan *chan_msg.ChanMsg, 10000)
solBlockFactory := kafka.NewConsumeKafkaFactoryWithSink("sol_block_monitor", ...)
solBlockFactory.RegisterProcessor(processors.NewDecodeBlockTxProcessor())
solBlockFactory.RegisterProcessor(processors.NewFilterBlockTxProcessor())
redisFactory := redispubsub.NewConsumeRedisPubSubFactory("sol_swap_monitor", ...)
redisFactory.RegisterProcessor(processors.NewDecodeSwapTxProcessor())
redisFactory.RegisterProcessor(processors.NewFilterSwapTxProcessor())
mergeProducer := producers.NewMergeChannelProducer(channelSink, "merge_channel_producer")
mergeFactory := chan_msg.NewConsumeChanMsgFactory("merge_processor", mergeChan, factoryOptions...)
mergeFactory.RegisterProcessor(processors.NewMergeSwapRecordsProcessor(walletConfigLoader))
mergeFactory.RegisterProcessor(processors.NewSwapRecordsMonitorRulesCheckProcessor(walletConfigLoader, notificationLimiter))2.2 统一流水线的抽象核心:Processor
中间层的核心是 Processor 接口:
type Processor interface {
ProcessorName() string
ProcessMessage(message interface{}) (messagesForKafka *util.KafkaMessageGroup, nextProcessorData interface{}, err error)
}它的好处是:
- 每一段逻辑都被封成独立处理器
- 前一个处理器的输出就是下一个处理器的输入
- 新增处理步骤不需要重写整个主流程
- 不同来源都能复用同一套“中间业务处理器”
2.3 Kafka 和 Redis 如何汇入同一个通道
这一层很关键。
Kafka 消费后写入 ChannelSink
Kafka 工厂支持配置 MessageSink,这里设置成 channelSink:
func WithSink(sink processor_pipline.MessageSink) ConsumeKafkaWithSinkOption {
return func(factory *ConsumeKafkaFactoryWithSink) {
factory.messageSink = sink
}
}它消费完之后,把处理结果投到 sink:
case messageGroup := <-c.consumerGroupHandler.writeMessage:
if err := c.messageSink.Send(messageGroup); err != nil {
logs.Errorf("send %s message error: %v", messageGroup.Topic, err)
}Redis 订阅后也写入同一个 ChannelSink
Redis 工厂逻辑也一样:
case messageGroup := <-c.consumerHandler.writeMessage:
if err := c.messageSink.Send(messageGroup); err != nil {
logs.Errorf("send message error: %v", err)
}ChannelSink 的作用
ChannelSink 其实是一个统一的中转器。它把消息切成单条,写进 Go channel:
type ChannelSink struct {
ch chan *util.KafkaMessageGroup
}
func (c *ChannelSink) Send(messageGroup *util.KafkaMessageGroup) error {
for i := range messageGroup.Messages {
c.ch <- &util.KafkaMessageGroup{
Topic: messageGroup.Topic,
Keys: messageGroup.Keys,
Messages: []interface{}{messageGroup.Messages[i]},
}
}
return nil
}这一步非常重要,因为它把“不同来源、不同批量大小”的消息,统一变成一个个消息单元,再交给后续合并流程。
2.4 如何扩展新链
扩展新链,分两种情况。
情况 A:新链的数据格式与现有一致
这是最容易扩的。
如果新链最后也能产出:
BlockTxData- 或
SwapData / SwapMessage - 或者至少能转成
SwapInfo + BalanceChanges
那基本只需要:
- 增加这条链的采集/生产逻辑
- 把 topic / channel 配到配置里
- 使用同样的 Decode / Filter / Merge Processor
- 在
chain manager等地方补这条链的 token level、平台币等元信息
因为中间处理链并不强依赖“是哪条链”,而是依赖“是否满足统一业务结构”。
情况 B:新链消息格式完全不同
那就要加一层适配。
典型做法是新增:
DecodeXxxProcessor- 负责把新链消息转成统一中间结构
只要你能把它转换到后续处理器认识的结构,比如 SwapMessage 或 SingleSwapResult,后面的流水线就还能复用。
2.5 如何扩展新消息类型
也是同样的思路。
比如以后你要新增一种消息:
- mint 事件
- transfer 事件
- 清算事件
- 某种策略信号事件
有两条路:
路线 1:接入到现有 swap 主链路
如果这类消息最终也能抽象成“某钱包对某 token 的净流入/净流出”,那就可以复用当前 MergeSwapRecordsProcessor 之后的部分。
路线 2:新建一条并行 Processor 流水线
如果语义完全不同,就应该:
- 新建解码 Processor
- 新建过滤 Processor
- 新建合并/归一化 Processor
- 最终产出新的标准业务对象
- 下游接新的规则检查器或事件处理器
所以这个架构的扩展点,不是“在 main.go 里狂加 if-else”,而是“在某个层次新增 Processor”。
3. 怎么把复杂链上 swap 变成统一可判断的数据?具体有哪些步骤?
这部分是整个流水线最核心的“语义提纯”过程。
你可以把它理解成:
原始链上交易 -> 业务上可判断的统一事件
3.1 原始 swap 为什么复杂
链上实际交易往往不是“一个钱包买了一个币”这么简单,可能包含:
- 一笔交易里多次 swap
- token/baseToken 双边变化
- 中间经过多个池子
- 合约内部路由
- 同一交易里多个 token 变化
- 需要结合余额变化才能知道“真实净流入/净流出”
- 需要区分 buy/sell、协议、价格、base token 价格等
所以如果直接让规则引擎面对原始 swap,规则会非常复杂,性能和可维护性都会出问题。
3.2 第一步:解码成统一原始结构
无论来自 Kafka 还是 Redis,先解码成结构体。
Kafka 解码成 BlockTxData:
func (p *DecodeBlockTxProcessor) ProcessMessage(message interface{}) (*util.KafkaMessageGroup, interface{}, error) {
consumerMsg, ok := message.(*sarama.ConsumerMessage)
blockTxData := new(BlockTxData)
err := json.Unmarshal(consumerMsg.Value, blockTxData)
// ...
p.normalizeBlockTxData(blockTxData)
return nil, blockTxData, nil
}Redis 解码成 SwapData:
func (p *DecodeSwapTxProcessor) ProcessMessage(message interface{}) (messagesForKafka *util.KafkaMessageGroup, nextProcessorData interface{}, err error) {
var swapData SwapData
if err := json.Unmarshal(msgBytes, &swapData); err != nil {
return nil, nil, err
}
return nil, &swapData, nil
}3.3 第二步:先做幂等去重
这是为了确保同一笔交易不会被多次处理。
- Kafka 区块流里按
txHash批量抢处理权 - Redis 单消息里按
txHash单条抢处理权
这样保证进入核心业务链的是“已获得处理权的交易”。
3.4 第三步:对 Kafka 的区块级数据按交易拆分
Kafka 一条是一个区块,所以要把块里的 swap 再拆成“按 txHash 分组”的 SwapMessage。
txSwapMap := make(map[string][]SwapInfo)
for _, swap := range filteredSwaps {
txSwapMap[swap.TxHash] = append(txSwapMap[swap.TxHash], swap)
}
for txHash, swaps := range txSwapMap {
swapMessage := &SwapMessage{
Chain: blockTxData.Chain,
Block: blockTxData.BlockNumber,
TxHash: txHash,
Swaps: swaps,
}
if balanceChanges, ok := filteredTxBalances[txHash]; ok {
swapMessage.BalanceChanges = balanceChanges.BalanceChanges
}
result.Messages = append(result.Messages, swapMessage)
}也就是说,Kafka 最终也会被转成和 Redis 类似的“单交易级”消息。
3.5 第四步:统一汇入通道后,再做交易级合并
进入 MergeSwapRecordsProcessor 后,系统开始把一笔交易中复杂的多个 swap 操作,转换成“业务可判断结果”。
这里主要做了几件事。
4.1 先过滤出监控钱包
如果这笔交易里根本没有监控对象,直接跳过。
for _, swap := range swapMessage.Swaps {
s := tools.CheckAndLowerString(swap.Signer)
if p.userConfigs.WalletInMonitor(s) {
walletsInMonitor[s] = true
hasMonitoredWallet = true
}
}
if !hasMonitoredWallet {
return nil
}4.2 读取 BalanceChanges,识别真实净变化
单看 swap 记录不一定能知道最终钱包真实是净流入还是净流出,所以它会把 BalanceChanges 解析出来。
balanceChanges := make(map[string]*big.Float)
for tokenAddr, balanceStr := range swapMessage.BalanceChanges {
balance, ok := new(big.Float).SetString(balanceStr)
if !ok {
continue
}
if balance.Cmp(big.NewFloat(0)) != 0 {
balanceChanges[tokenAddr] = balance
}
}
if len(balanceChanges) == 0 {
return nil
}4.3 按 maker + token 聚合流量
它不是把每个 swap 当成一个独立事件,而是按:
- 某个 maker
- 在这笔交易里
- 对某个 token
汇总成一条净流量。
flowMap := make(map[string]*transaction_summary.MakerTokenFlow)
baseTokenSymbol := contract.GetChainDefaultPlatformTokenSymbol(swapMessage.Chain)
baseTokenPrice := big.NewFloat(0)
tokenPriceMap := make(map[string]*big.Float)后面会根据 buy/sell 去更新:
NetFlowInFlowOutFlowNetValue
4.4 结合价格,计算价值
金额判断不能只看 token 数量,所以它还会收集 price/basePrice,计算出价值。
func (p *MergeSwapRecordsProcessor) calculateFlowValues(flowMap map[string]*transaction_summary.MakerTokenFlow, tokenPriceMap map[string]*big.Float) {
for _, flow := range flowMap {
price, priceExists := tokenPriceMap[flow.Token]
if !priceExists {
price = big.NewFloat(0)
}
flow.NetValue.Mul(flow.NetFlow, price)
flow.InValue.Mul(flow.InFlow, price)
flow.OutValue.Mul(flow.OutFlow, price)
}
}4.5 最终生成 SingleSwapResult
这是整个转换最关键的一步:把复杂交易降维成统一业务对象。
func (p *MergeSwapRecordsProcessor) splitToSingleSwapResult(msr *transaction_summary.MergedSwapResult) []*transaction_summary.SingleSwapResult {
var results []*transaction_summary.SingleSwapResult
for i := range msr.MakerSummaries {
for j := range msr.MakerSummaries[i].NetInflow {
results = append(results, &transaction_summary.SingleSwapResult{
TxHash: msr.TxHash,
Block: msr.Block,
Chain: msr.Chain,
Maker: msr.MakerSummaries[i].Maker,
Netflow: msr.MakerSummaries[i].NetInflow[j],
Op: util.OperationBuy,
Protocol: msr.TokenProtocol[msr.MakerSummaries[i].NetInflow[j].Token],
})
}
// ...
}
return results
}最后规则引擎面对的就不是原始链上路径,而是这种统一对象:
- 哪条链
- 哪个钱包
- 哪个 token
- 净流入还是净流出
- 数量、价值、价格
- 协议
- 时间
- 交易哈希
这就非常适合做规则判断了。
3.6 你可以怎么总结这一段
面试里你可以说:
“这个项目没有直接拿原始链上 swap 去跑规则,而是先做交易级语义归一化。做法是:先解码,再按 txHash 去重和聚合,然后结合 BalanceChanges 识别真实净变化,再按 maker + token 汇总净流量,最后生成统一的 SingleSwapResult。后面的规则引擎只面对这种标准业务对象,而不需要理解复杂路由和多跳 swap 细节。”
4. 多源消息的一致性和顺序怎么保证?
这是最容易被问到、也是最容易答虚的地方。
你要先接受一个事实:
这个项目追求的是业务一致性和幂等,不是严格全局顺序一致性。
4.1 先说顺序:它没有做“全局严格有序”
从代码上看,这条链路有几个特征:
- Kafka 多分区消费
- Redis PubSub 实时推送
- 两个源异步汇入同一个 channel
ChannelSink会拆消息单条送入- 后续
mergeChan也是并发运行的
所以这套系统没有实现“全局严格顺序”:
- 不保证 Kafka 与 Redis 之间的绝对先后
- 不保证跨分区严格全局有序
- 不保证所有源最终都按链上绝对时间顺序到达
这一点你一定不要在面试里吹成“完全有序”。
4.2 它真正保证的是什么
它真正做的是下面几层保证:
保证 1:同一笔交易尽量只处理一次
这是最核心的业务一致性。
依靠:
- 本地 LRU
- Redis
SETNX - 批量
Pipeline - 入口级去重
这样做的结果是:
- 即使 Kafka 和 Redis 都看到了同一笔交易
- 也尽量只有一个入口能“抢到处理权”
保证 2:Kafka 源内部有分区顺序
Kafka 本身是分区内有序的。
所以如果同一类消息被稳定地发到同一分区,源内顺序是相对可控的。
但因为代码里没有在这个链路里显式构建“全局排序器”,所以只能说:
Kafka 分区内顺序可用全局顺序不保证
保证 3:Redis 源走的是“实时优先”
Redis 在这里更像低延迟补充源,不是强一致主账本。
所以它不承担“绝对顺序来源”的职责,而承担:
- 快速把实时事件推到监控链路
- 尽早触发规则判断
保证 4:规则设计本身在规避强顺序依赖
这个非常重要。
项目里很多关键规则并不是依赖“严格事件先后”,而是依赖:
- 当前交易的净结果
- 时间窗口统计
- 去重后的聚合结果
比如 group_all_buy 规则不是要求“第 1、2、3 个钱包必须按严格顺序到达”,而是看:
- 在一个时间窗口里
- 某 token 是否满足钱包数或成交量阈值
这就是一种典型的“用时间窗口容忍弱有序”的业务设计。
4.3 多源一致性是如何实现的
你可以把它概括成三层。
第一层:结构一致性
不管来源是 Kafka 还是 Redis,都会尽量归到:
SwapMessage- 再归到
SingleSwapResult
也就是说,多源先统一数据模型。
第二层:事务幂等一致性
入口先按 txHash 抢处理权。
所以同一交易即使多源重复到达,也尽量只会有一条进入核心处理链。
第三层:业务语义一致性
规则引擎不依赖“消息来自哪里”,只依赖:
MakerTokenNetflowOpProtocolUnixTime
这相当于把“来源差异”屏蔽在上游了。
4.4 面试里怎么回答“顺序怎么保证”
建议你这样答,非常稳:
“这套系统没有追求跨 Kafka 和 Redis 的严格全局顺序,而是优先保证交易粒度的幂等和业务结果一致。Kafka 负责可回放和批量主链路,Redis 负责低延迟实时路径。进入主链路前会按 txHash 做原子去重,后续再通过交易级聚合和时间窗口规则来降低顺序抖动带来的影响。所以它更像一个‘弱有序、强幂等、重结果一致性’的设计。”
这个回答很像大厂喜欢听的风格,因为它承认 trade-off,而不是硬吹。
总结
“这个项目之所以同时用 Kafka 和 Redis,是因为两类链上数据的业务特征不一样。Kafka 承接的是区块级、批量、可回放的数据,更适合作为主消息总线;Redis 承接的是单交易级、低延迟的实时 swap 事件,更适合实时监控。为了避免下游分别适配两套来源,系统中间抽象了一套统一 Processor 流水线:先分别解码和去重,再汇入同一个 ChannelSink,之后通过 MergeSwapRecordsProcessor 把复杂链上 swap 归一化成统一的 SingleSwapResult,也就是某个钱包对某个 token 的净流入或净流出,包含数量、价值、协议、时间等信息。后续规则引擎只需要面对这种标准对象,不用关心消息原始来源。至于一致性,这套系统不是追求全局强顺序,而是通过 Kafka 分区顺序、入口级幂等去重、交易级聚合和时间窗口规则,来保证业务结果的一致性。”
我建议你选 MergeSwapRecordsProcessor 作为你重点学习、并写到简历上的 Processor。
原因很简单:它同时满足 4 个条件。
足够核心:它处在多数据源统一流水线的中间关键位置,不是边角模块。业务价值强:它直接把复杂链上交易转换成后续规则引擎能消费的统一数据。技术点够多:涉及数据归一化、聚合、缓存、过滤、模型抽象。对校招生友好:你可以讲成“负责链上交易聚合与标准化处理”,既有技术含量,又不会夸到容易被追穿。
如果你是校招,我建议你在简历和面试里把它包装成:
负责链上多源 swap 数据聚合与标准化 Processor 的开发,完成监控钱包过滤、净流入/净流出计算、价格补全及统一事件模型输出,为后续规则引擎与跟单链路提供稳定输入。
下面我把这个 Processor 详细拆开。
一、为什么这个 Processor 值得你认领
MergeSwapRecordsProcessor 的本质不是“做格式转换”,而是做一件非常有业务价值的事:
把原始链上交易语义,提炼成可用于监控/跟单判断的标准化事件
在 Web3 监控系统里,原始数据往往很脏、很复杂:
- 一笔交易里可能有多次 swap
- 有主 token 和 base token 双边变化
- 同一交易里可能经过多个池子
- 单看 swap 明细,不一定知道用户真实是净买入还是净卖出
- 不同数据源格式不同,但后续规则判断希望输入统一
所以这个 Processor 的业务意义是:
- 屏蔽链上原始数据复杂性
- 让下游规则引擎不再面对底层细节
- 提供统一的“钱包对某 token 的净行为结果”
这就是一个非常典型、很适合校招生讲的“中间层数据抽象”项目经验。
二、这个 Processor 在整个业务链路里的位置
它处在这条链路中间:
Kafka/Redis -> Decode -> Filter -> MergeSwapRecordsProcessor -> RulesCheckProcessor -> 告警/跟单
前面两步已经完成了:
- 从 Kafka 或 Redis 拿到原始消息
- 解码成结构体
- 通过
txHash去重,确保同一交易尽量不重复进入主链路
它要做的是第三步:
把一笔交易中的多个 swap 行为,合并成统一的 SingleSwapResult
也就是给下游提供标准业务对象。
三、它处理的输入和输出是什么
这是你面试必须讲清楚的第一件事。
输入是什么
它接收的是 SwapMessage,也就是“单笔交易级”的 swap 消息。
type SwapMessage struct {
Chain string `json:"chain"`
Block int64 `json:"block"`
TxIndex int `json:"tx_index"`
TxHash string `json:"tx_hash"`
Swaps []SwapInfo `json:"swaps"`
BalanceChanges map[string]string `json:"balance_changes"`
Protocol string `json:"protocol"`
UnixTime int64 `json:"unix_time"`
}这个输入已经比原始消息干净很多了,但仍然很复杂,因为:
Swaps里可能有多条 swapBalanceChanges记录的是 token 的净余额变化- 还要结合
Signer、Price、Volume等字段理解业务语义
输出是什么
它最后输出的是 SingleSwapResult。
type SingleSwapResult struct {
TxHash string `json:"tx_hash"`
Chain string `json:"chain"`
Block int64 `json:"block"`
Maker string `json:"maker"`
Op string `json:"op"`
Netflow *TokenFlow `json:"net_inflow"`
BaseToken string `json:"base_token"`
BaseTokenPrice *big.Float `json:"base_token_price"`
Protocol string `json:"protocol"`
UnixTime int64 `json:"unix_time"`
}这个对象的业务含义非常直观:
- 哪条链
- 哪笔交易
- 哪个 maker
- 对哪个 token
- 是 buy 还是 sell
- 净流入/净流出多少
- 对应价值和协议是什么
下游规则引擎看到这个对象,就可以直接做:
- 链过滤
- 时间段过滤
- 成交额过滤
- 协议过滤
- 钱包组全买规则
- 跟单信号生成
四、详细业务流程
这部分是你最适合讲的重点。
第一步:先判断这笔交易里是否有监控钱包
Processor 不会傻乎乎处理所有交易,而是先检查这笔交易里的 signer 是否在监控列表里。
walletsInMonitor := make(map[string]bool)
hasMonitoredWallet := false
for _, swap := range swapMessage.Swaps {
s := tools.CheckAndLowerString(swap.Signer)
if p.userConfigs.WalletInMonitor(s) {
walletsInMonitor[s] = true
hasMonitoredWallet = true
}
}
if !hasMonitoredWallet {
return nil
}业务意义
这一步其实是很典型的“前置过滤”设计:
- 如果交易和业务完全无关,就不要做后面的重计算
- 减少 CPU 和内存开销
- 提高流水线吞吐
面试里怎么讲
“我负责的这个 Processor 会先基于内存里的监控钱包索引做前置过滤,只处理和用户策略相关的钱包交易,避免无关链上交易进入后续规则链路。”
第二步:解析 BalanceChanges,识别真实净变化
这是这个 Processor 最有技术含量的地方之一。
balanceChanges := make(map[string]*big.Float)
for tokenAddr, balanceStr := range swapMessage.BalanceChanges {
balance, ok := new(big.Float).SetString(balanceStr)
if !ok {
continue
}
if balance.Cmp(big.NewFloat(0)) != 0 {
balanceChanges[tokenAddr] = balance
}
}
if len(balanceChanges) == 0 {
return nil
}为什么要看 BalanceChanges
因为链上交易明细可能包含多个 swap 操作,但最终用户对某个 token 是净买入还是净卖出,不能只看单条 swap。
举个例子:
- 一笔交易里先换成中间币
- 再从中间币换成目标币
- 如果只看 swap 过程,你会误以为用户操作了多个 token
- 但从用户视角,真正有意义的是最终对哪些 token 产生了净流入/净流出
所以 BalanceChanges 是在帮助你从“过程视角”切到“结果视角”。
面试里怎么讲
“为了避免直接依赖链上多跳 swap 过程带来的噪声,这个 Processor 会结合 BalanceChanges 识别最终净变化,只保留对业务判断有意义的 token 流向结果。”
第三步:按 maker + token 聚合交易流量
这一步是真正把原始明细抽象成业务对象。
flowMap := make(map[string]*transaction_summary.MakerTokenFlow)
baseTokenSymbol := contract.GetChainDefaultPlatformTokenSymbol(swapMessage.Chain)
baseTokenPrice := big.NewFloat(0)
tokenPriceMap := make(map[string]*big.Float)后面它会遍历每个 swap,根据买卖方向更新:
NetFlowInFlowOutFlowSwapCount
比如对于主 token:
if walletsInMonitor[swap.Signer] && tokenExists && chain_manager.GetTokenLevel(swapMessage.Chain, swap.Token) >= chain_manager.TokenLevelPopular {
tokenFlowKey := fmt.Sprintf("%s_%s", swap.Signer, swap.Token)
tokenFlow := p.getOrCreateFlow(flowMap, tokenFlowKey, swap.Signer, swap.Token, swap.TokenSymbol, swap.Chain, swapMessage.TxHash, swapMessage.Block)
switch swap.Op {
case "buy":
tokenFlow.NetFlow.Add(tokenFlow.NetFlow, volume)
tokenFlow.InFlow.Add(tokenFlow.InFlow, volume)
case "sell":
tokenFlow.NetFlow.Sub(tokenFlow.NetFlow, volume)
tokenFlow.OutFlow.Add(tokenFlow.OutFlow, volume)
}
tokenFlow.SwapCount++
}业务意义
这一层的抽象非常关键:
从:
- 一笔交易里多个 swap 过程
变成:
- 某个 maker 对某个 token 的净结果
这时候规则引擎就不必理解复杂链上路径,而只需要判断:
- 这个钱包是不是净买了某个 token
- 买了多少
- 价值有多大
- 属于哪个协议
面试里怎么讲
“我做的核心抽象是把链上多条 swap 明细按 maker + token 聚合成净流量对象,这样后续规则引擎只需要判断用户对某个 token 的最终净行为,而不需要理解底层多跳 swap 细节。”
第四步:补齐价格和价值信息
规则系统很多时候不能只看数量,还要看交易价值,所以这里还做了 token 价格和平台币价格处理。
它会缓存平台币价格,避免重复计算:
func (p *MergeSwapRecordsProcessor) getPlatformTokenPrice(chain, token string) *big.Float {
cacheKey := fmt.Sprintf("%s_%s", strings.ToUpper(chain), strings.ToUpper(token))
if cachedValue, ok := p.platformTokenPriceCache.Load(cacheKey); ok {
cache := cachedValue.(*PlatformTokenPriceCache)
if time.Since(cache.UpdateTime) < 30*time.Second {
return new(big.Float).Set(cache.Price)
}
}
return big.NewFloat(0)
}然后统一计算价值:
func (p *MergeSwapRecordsProcessor) calculateFlowValues(flowMap map[string]*transaction_summary.MakerTokenFlow, tokenPriceMap map[string]*big.Float) {
for _, flow := range flowMap {
price, priceExists := tokenPriceMap[flow.Token]
if !priceExists {
price = big.NewFloat(0)
}
flow.NetValue.Mul(flow.NetFlow, price)
flow.InValue.Mul(flow.InFlow, price)
flow.OutValue.Mul(flow.OutFlow, price)
}
}业务意义
后面很多规则都依赖交易价值:
- 交易额大于某阈值
- 钱包组总买入额
- 平均买入额
- token 市值关联判断
所以这个 Processor 不只是做“数量聚合”,而是在做“可用于策略判断的语义补全”。
面试里怎么讲
“除了净数量,我还会在 Processor 内补齐 token 价格和价值维度,因为后续规则很多是基于金额判断而不是基于 token 数量判断。”
第五步:生成统一的 SingleSwapResult
最后一步就是把中间聚合结果变成标准业务事件。
func (p *MergeSwapRecordsProcessor) splitToSingleSwapResult(msr *transaction_summary.MergedSwapResult) []*transaction_summary.SingleSwapResult {
var results []*transaction_summary.SingleSwapResult
for i := range msr.MakerSummaries {
for j := range msr.MakerSummaries[i].NetInflow {
results = append(results, &transaction_summary.SingleSwapResult{
TxHash: msr.TxHash,
Block: msr.Block,
Chain: msr.Chain,
Maker: msr.MakerSummaries[i].Maker,
Netflow: msr.MakerSummaries[i].NetInflow[j],
Op: util.OperationBuy,
UnixTime: msr.UnixTime,
Protocol: msr.TokenProtocol[msr.MakerSummaries[i].NetInflow[j].Token],
})
}
for j := range msr.MakerSummaries[i].NetOutflow {
results = append(results, &transaction_summary.SingleSwapResult{
TxHash: msr.TxHash,
Block: msr.Block,
Chain: msr.Chain,
Maker: msr.MakerSummaries[i].Maker,
Netflow: msr.MakerSummaries[i].NetOutflow[j],
Op: util.OperationSell,
Protocol: msr.TokenProtocol[msr.MakerSummaries[i].NetOutflow[j].Token],
UnixTime: msr.UnixTime,
})
}
}
return results
}业务意义
这一步非常适合拿来讲“系统分层”:
- 上游关注采集和去重
- 当前 Processor 关注语义归一化
- 下游规则引擎关注策略判断
每一层职责清晰。
五、这个 Processor 最值得讲的技术重点
你在简历和面试里,不要把它讲成“做了一堆字段处理”,而要讲成下面 4 个重点。
技术重点 1:多源数据统一建模
Kafka 和 Redis 的外层结构不同,但这个 Processor 处理的已经是统一交易对象 SwapMessage,说明整体流水线是“先统一数据模型,再处理业务”。
你可以讲:
通过统一中间结构屏蔽不同消息源差异,降低下游规则链路耦合。
技术重点 2:前置过滤降低计算成本
它先判断交易是否涉及监控钱包,再决定是否进入重计算。
你可以讲:
通过监控钱包索引做前置过滤,减少无关交易进入后续聚合与规则链路。
技术重点 3:从链上过程数据提炼业务结果
它不是简单转发 swap,而是结合 BalanceChanges、价格、协议信息,提炼出“某钱包对某 token 的净结果”。
你可以讲:
将复杂链上交易从过程视角转为结果视角,输出统一的净流入/净流出事件。
技术重点 4:缓存与轻量优化
它做了平台币价格缓存,避免每次重复计算。
你可以讲:
对平台币价格做短 TTL 缓存,降低重复价格推导开销。
六、为什么这个点适合写到简历上
因为它非常符合校招后端的“中间层能力证明”。
你写这个 Processor,有几个天然优势:
- 不像“自动交易策略”那样容易被问得太深
- 不像“完整规则引擎”那样容易被怀疑参与度不够
- 它业务上足够关键,但技术上也在你能掌控的范围内
- 很容易衔接到后续规则引擎、缓存幂等、告警跟单闭环
所以这个点非常适合写成“我负责的一个核心模块”。
七、适合写到简历上的版本
我给你几版,你选你喜欢的。
版本 1:最稳妥
负责链上多源 swap 数据聚合与标准化 Processor 开发,结合监控钱包过滤、余额变化解析与价格补全,将复杂交易明细统一抽象为可供规则引擎消费的净流入/净流出事件模型。
版本 2:偏技术一些
参与监控中心核心 Processor 开发,基于 Kafka/Redis 统一消息模型,对链上多跳 swap 交易做交易级聚合、净流量计算与价值补全,输出标准化 SingleSwapResult 供告警与跟单策略复用。
版本 3:偏业务结果一些
负责链上交易聚合 Processor 开发,将多源链上 swap 数据转换为钱包维度的标准事件,支撑后续监控告警、钱包组跟单和策略规则判断。
八、面试里怎么介绍这个 Processor
你可以直接背下面这段:
“我在这个项目里比较适合展开讲的是交易聚合 Processor。它位于多源数据统一流水线中间,前面已经完成了解码和去重,我负责的部分是把一笔链上交易里的多条 swap 明细,结合余额变化和价格信息,聚合成标准化的业务事件。具体会先过滤是否涉及监控钱包,再解析 BalanceChanges 判断真实净变化,然后按 maker + token 聚合净流量,补齐价格和价值信息,最后输出统一的 SingleSwapResult 给规则引擎。这样下游就不需要理解底层复杂交易路径,只需要针对统一模型做策略判断。”
九、面试官可能怎么追问
你最好提前准备这 5 个问题。
1. 为什么不能直接把原始 swap 给规则引擎?
答:
因为原始 swap 是过程数据,一笔交易里可能有多跳路径和多个 token 变化。规则引擎更关心的是用户最终对某个 token 的净行为,所以需要先做语义聚合。
2. 为什么要结合 BalanceChanges?
答:
单看 swap 明细可能误判真实结果,BalanceChanges 能帮助识别最终净流入/净流出,减少中间路径带来的噪声。
3. 为什么按 maker + token 聚合?
答:
因为后续规则本质上是围绕“某钱包是否买入/卖出某 token”来判断的,这个粒度最适合策略系统消费。
4. 这个 Processor 的性能优化点是什么?
答:
前置钱包过滤、价格缓存、只处理有净变化的 token,都是为了减少无效计算。
5. 这个 Processor 和下游规则引擎的边界是什么?
答:
这个 Processor 负责“语义归一化”,规则引擎负责“策略判断”。前者把复杂链上数据转换成标准事件,后者基于标准事件执行业务规则。
下面我给你这两个:
逐行业务版源码讲解围绕这个 Processor 的面试问答稿
都只围绕 MergeSwapRecordsProcessor,这样你能真正吃透一个点。
1. 逐行业务版源码讲解
我不会机械逐行翻译代码,而是按真实执行顺序,把关键代码块和业务含义串起来。你读完后,基本就能自己复述这段逻辑了。
1. 模块职责和整体目标
这个 Processor 的目标是:
把一笔交易中的多个 swap 明细,聚合成“某个监控钱包对某个 token 的净买入/净卖出结果”
对应代码入口:
func (p *MergeSwapRecordsProcessor) ProcessMessage(message interface{}) (messagesForKafka *util.KafkaMessageGroup, nextProcessorData interface{}, err error) {
chanMsg, ok := message.(*chan_msg.ChanMsg)
if !ok {
return nil, nil, fmt.Errorf("message is not *chan_msg.ChanMsg type")
}
var messageGroup util.KafkaMessageGroup
valueStr, ok := chanMsg.Value.(string)
if !ok {
return nil, nil, fmt.Errorf("chanMsg.Volume is not string type")
}
err = json.Unmarshal([]byte(valueStr), &messageGroup)
if err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal KafkaMessageGroup: %w", err)
}
if len(messageGroup.Messages) == 0 {
return nil, nil, nil
}
msg := messageGroup.Messages[0]
msgBytes, err := json.Marshal(msg)
if err != nil {
return nil, nil, err
}
var swapMessage SwapMessage
err = json.Unmarshal(msgBytes, &swapMessage)
if err != nil {
return nil, nil, err
}
if swapMessage.Swaps == nil || len(swapMessage.Swaps) == 0 {
return nil, nil, nil
}
swapMessage.UnixTime = swapMessage.Swaps[0].Timestamp
mergedResult := p.processSwapMessage(&swapMessage)
if mergedResult == nil {
return nil, nil, nil
}
singleSwapResults := p.splitToSingleSwapResult(mergedResult)
if len(singleSwapResults) == 0 {
return nil, nil, nil
}
return nil, singleSwapResults, nil
}业务上这段在做什么
这段可以理解成 4 步:
- 从统一通道里拿到消息
- 把消息还原成
SwapMessage - 调用核心聚合逻辑
processSwapMessage - 把聚合结果拆成标准事件
SingleSwapResult
你要怎么理解
这里体现了这个 Processor 的定位:
- 它不关心消息来自 Kafka 还是 Redis
- 它只关心“现在我拿到的是一笔交易级 swap 消息”
- 它输出的是下游规则引擎能直接消费的统一结构
所以你面试时可以说:
“这个 Processor 是整个统一流水线里的语义归一化层,上游已经做了解码和去重,我负责把交易级 swap 明细转成标准化业务事件。”
2. 初始化:依赖用户配置和价格缓存
构造函数:
func NewMergeSwapRecordsProcessor(userConfigs *user_configs.UserConfigsLoader) *MergeSwapRecordsProcessor {
return &MergeSwapRecordsProcessor{
userConfigs: userConfigs,
cacheTTL: 400 * time.Millisecond,
}
}结构体:
type MergeSwapRecordsProcessor struct {
userConfigs *user_configs.UserConfigsLoader
platformTokenPriceCache sync.Map
cacheTTL time.Duration
}业务含义
这个 Processor 依赖两类信息:
userConfigs:知道哪些钱包正在被监控platformTokenPriceCache:缓存平台币价格,减少重复计算
为什么需要这两个
userConfigs
因为它不是处理全量链上交易,而是只处理和监控业务有关的钱包。
platformTokenPriceCache
因为很多金额计算要用平台币价格,比如 SOL、ETH、BNB 的价格。如果每笔都重新推导,开销会比较大。
3. 第一步核心逻辑:判断交易是否与监控业务相关
核心代码:
walletsInMonitor := make(map[string]bool)
hasMonitoredWallet := false
for _, swap := range swapMessage.Swaps {
s := tools.CheckAndLowerString(swap.Signer)
if p.userConfigs.WalletInMonitor(s) {
walletsInMonitor[s] = true
hasMonitoredWallet = true
}
}
if !hasMonitoredWallet {
return nil
}业务解释
不是每笔链上交易都值得进入后续规则处理。
系统只关心“被用户配置过的钱包”。
所以它会遍历这笔交易里的每条 swap,看 Signer 是否在监控列表里。
如果没有任何监控钱包参与:
- 直接丢掉
- 不做后面的净流量计算
- 不做价格补全
- 不进入规则引擎
这个点的技术重点
这是一个非常典型的前置过滤优化。
作用:
- 降低无效计算
- 减少后续规则链路压力
- 保持实时系统吞吐
面试怎么讲
“这个 Processor 第一件事不是聚合,而是基于内存中的监控钱包索引做前置过滤,只处理和用户策略相关的钱包交易,避免无关数据进入重计算链路。”
4. 第二步核心逻辑:解析余额变化,识别真实净结果
代码:
balanceChanges := make(map[string]*big.Float)
for tokenAddr, balanceStr := range swapMessage.BalanceChanges {
balance, ok := new(big.Float).SetString(balanceStr)
if !ok {
continue
}
if balance.Cmp(big.NewFloat(0)) != 0 {
balanceChanges[tokenAddr] = balance
}
}
if len(balanceChanges) == 0 {
return nil
}业务解释
链上交易里可能有很多中间动作,但监控系统真正关心的是:
最终哪些 token 对这个钱包产生了净变化
比如一笔多跳 swap:
- 先
A -> B - 再
B -> C
从过程看,用户碰了 A、B、C
但从最终结果看,用户关心的其实是:
- A 流出了多少
- C 流入了多少
- B 只是中间态
BalanceChanges 就是在帮助系统聚焦“最终结果”。
为什么不能只看 swap 明细
因为只看过程会带来很多噪声:
- 中间 token 会误触发规则
- 多跳路由会让一笔交易看起来像很多事件
- 你可能误把同一交易算成多个独立买入行为
所以这里的思路是:
从过程日志抽取最终状态变化
面试怎么讲
“链上 swap 明细本身是过程数据,容易包含路由和中间币噪声,所以 Processor 会结合 BalanceChanges 识别最终净变化,只保留对业务判断有意义的 token 流向结果。”
5. 第三步核心逻辑:建立按 maker + token 的聚合表
初始化:
flowMap := make(map[string]*transaction_summary.MakerTokenFlow)
baseTokenSymbol := contract.GetChainDefaultPlatformTokenSymbol(swapMessage.Chain)
baseTokenPrice := big.NewFloat(0)
tokenPriceMap := make(map[string]*big.Float)业务解释
这一步的目标是把一笔交易中散落的 swap 明细,整理成一个聚合容器。
聚合维度是:
makertoken
即:
某个钱包,在这笔交易里,对某个 token 的净行为
这比“逐条 swap 判断”更适合规则系统。
6. 第四步核心逻辑:收集价格信息
代码片段:
if chain_manager.GetTokenLevel(swapMessage.Chain, swap.BaseToken) == chain_manager.TokenLevelPlatform && baseTokenPrice.Cmp(big.NewFloat(0)) == 0 {
if priceFromSwap, priceOk := new(big.Float).SetString(swap.BasePrice); priceOk {
baseTokenPrice = priceFromSwap
baseTokenSymbol = swap.BaseTokenSymbol
p.setPlatformTokenPrice(swapMessage.Chain, baseTokenSymbol, baseTokenPrice)
}
}
if tokenPrice, ok := new(big.Float).SetString(swap.Price); ok {
tokenPriceMap[swap.Token] = tokenPrice
}
if bTokenPrice, ok := new(big.Float).SetString(swap.BasePrice); ok {
tokenPriceMap[swap.BaseToken] = bTokenPrice
}业务解释
规则不只依赖数量,还依赖价值。
比如“交易额大于 1000 美元”这种规则,必须先算出价值。
所以这里做两件事:
- 从 swap 里提取 token price / base token price
- 识别平台币价格,并写入短期缓存
为什么缓存平台币价格
因为一批交易里经常会反复用到同一个平台币价格,比如:
SOLETHBNB
做缓存后:
- 减少重复解析
- 降低短时间内重复计算开销
面试怎么讲
“为了支撑后续按交易额、均价等维度做规则判断,这个 Processor 会在聚合阶段顺手补齐 token 和 base token 的价格信息,并对平台币价格做短 TTL 缓存。”
7. 第五步核心逻辑:聚合 token 主流和 base token 对流
这一块是最值得细读的。
对主 token 的聚合
if walletsInMonitor[swap.Signer] && tokenExists && chain_manager.GetTokenLevel(swapMessage.Chain, swap.Token) >= chain_manager.TokenLevelPopular {
tokenFlowKey := fmt.Sprintf("%s_%s", swap.Signer, swap.Token)
tokenFlow := p.getOrCreateFlow(flowMap, tokenFlowKey, swap.Signer, swap.Token, swap.TokenSymbol, swap.Chain, swapMessage.TxHash, swapMessage.Block)
switch swap.Op {
case "buy":
tokenFlow.NetFlow.Add(tokenFlow.NetFlow, volume)
tokenFlow.InFlow.Add(tokenFlow.InFlow, volume)
case "sell":
tokenFlow.NetFlow.Sub(tokenFlow.NetFlow, volume)
tokenFlow.OutFlow.Add(tokenFlow.OutFlow, volume)
}
tokenFlow.SwapCount++
}对 base token 的聚合
if walletsInMonitor[swap.Signer] && baseTokenExists && chain_manager.GetTokenLevel(swapMessage.Chain, swap.BaseToken) >= chain_manager.TokenLevelPopular {
baseTokenFlowKey := fmt.Sprintf("%s_%s", swap.Signer, swap.BaseToken)
baseTokenFlow := p.getOrCreateFlow(flowMap, baseTokenFlowKey, swap.Signer, swap.BaseToken, swap.BaseTokenSymbol, swap.Chain, swapMessage.TxHash, swapMessage.Block)
switch swap.Op {
case "buy":
baseTokenFlow.NetFlow.Sub(baseTokenFlow.NetFlow, baseAmount)
baseTokenFlow.OutFlow.Add(baseTokenFlow.OutFlow, baseAmount)
case "sell":
baseTokenFlow.NetFlow.Add(baseTokenFlow.NetFlow, baseAmount)
baseTokenFlow.InFlow.Add(baseTokenFlow.InFlow, baseAmount)
}
baseTokenFlow.SwapCount++
}业务解释
这里实际上在做“资金流”抽象。
如果是 buy
表示:
- 主 token 流入
- base token 流出
如果是 sell
表示:
- 主 token 流出
- base token 流入
这样做的结果是,一笔交易最终会被归纳成:
- 这个钱包净买入了哪些 token
- 净卖出了哪些 token
- 金额各是多少
这就是非常清晰的“业务结果”。
为什么要把主 token 和 base token 都建模
因为完整交易语义需要两边都知道:
- 买入了什么
- 花掉了什么
否则只能看到“买了某 token”,却不知道付出了多少对价。
面试怎么讲
“Processor 会同时聚合主 token 和 base token 的流向,保证一笔交易既能表达买入结果,也能表达对应的资金对价,从而支撑成交额、均价等后续策略判断。”
8. 第六步核心逻辑:把数量变成价值
代码:
func (p *MergeSwapRecordsProcessor) calculateFlowValues(flowMap map[string]*transaction_summary.MakerTokenFlow, tokenPriceMap map[string]*big.Float) {
for _, flow := range flowMap {
price, priceExists := tokenPriceMap[flow.Token]
if !priceExists {
price = big.NewFloat(0)
}
flow.NetValue.Mul(flow.NetFlow, price)
flow.InValue.Mul(flow.InFlow, price)
flow.OutValue.Mul(flow.OutFlow, price)
}
}业务解释
数量本身不够,系统还要知道价值:
NetFlow是净数量NetValue是净价值InValue/OutValue分别是流入流出价值
这样后面规则才能判断:
- 是否超过阈值
- 钱包组总量是否够大
- 平均成交额是否满足条件
这一步很像把“日志型数据”升级成“分析型数据”。
9. 第七步核心逻辑:先形成 MergedSwapResult
代码:
result := &transaction_summary.MergedSwapResult{
TxHash: swapMessage.TxHash,
Chain: swapMessage.Chain,
Block: swapMessage.Block,
MakerSummaries: p.generateMakerSummaries(makerFlows, swapMessage.TxHash, swapMessage.Chain, swapMessage.Block, baseTokenPrice, tokenPriceMap, baseTokenSymbol),
TotalMakers: len(uniqueMakers),
TotalSwaps: len(swapMessage.Swaps),
UniqueTokens: len(uniqueTokens),
BaseToken: baseTokenSymbol,
TokenProtocol: p.getTokenProtocolMap(swapMessage),
UnixTime: swapMessage.UnixTime,
}业务解释
它先构建的是一个“交易级总结果”:
- 这笔交易里有哪些 maker
- 每个 maker 的净流入/净流出是什么
- 涉及多少 token
- 用的哪个协议
这是中间结果,主要用于下一步拆分。
10. 第八步核心逻辑:拆成 SingleSwapResult
代码:
func (p *MergeSwapRecordsProcessor) splitToSingleSwapResult(msr *transaction_summary.MergedSwapResult) []*transaction_summary.SingleSwapResult {
var results []*transaction_summary.SingleSwapResult
for i := range msr.MakerSummaries {
for j := range msr.MakerSummaries[i].NetInflow {
results = append(results, &transaction_summary.SingleSwapResult{
TxHash: msr.TxHash,
Block: msr.Block,
Chain: msr.Chain,
Maker: msr.MakerSummaries[i].Maker,
Netflow: msr.MakerSummaries[i].NetInflow[j],
Op: util.OperationBuy,
UnixTime: msr.UnixTime,
Protocol: msr.TokenProtocol[msr.MakerSummaries[i].NetInflow[j].Token],
})
}
// sell 同理
}
return results
}业务解释
这一步把中间总结果拆成“规则最喜欢吃的结构”。
比如一笔交易可能最后拆成:
- 钱包 A 净买入 token X
- 钱包 A 净卖出 token Y
- 钱包 B 净买入 token Z
这就是非常适合下游做判断的粒度。
为什么要拆到这么细
因为后面的规则就是基于这种最小业务单元工作的:
- 一个钱包
- 一个 token
- 一种操作方向
- 一份价值信息
这比让规则引擎面对整笔复杂交易要清晰很多。
11. 这段代码你应该提炼出的核心设计思想
你真正该学的不是具体变量名,而是这 4 个设计思想:
-
前置过滤
只处理有监控钱包的交易,减少无效计算。 -
过程转结果
不直接使用原始 swap,而是结合余额变化提炼真实净结果。 -
统一抽象
把复杂交易归一化成SingleSwapResult,让下游规则复用。 -
轻量优化
价格缓存、只保留净变化 token、按业务粒度聚合。
3. 围绕这个 Processor 的面试问答稿
下面这部分你可以直接背,尤其适合校招。
Q1:你在这个项目里做了什么比较有技术含量的模块?
答:
我参与了监控中心里交易聚合 Processor 的开发。这个 Processor 位于多源流水线的中间层,上游已经完成 Kafka/Redis 消息解码和去重,我负责把一笔链上交易里的多个 swap 明细,结合余额变化和价格信息,聚合成标准化的 SingleSwapResult,供后续规则引擎和跟单链路使用。
Q2:为什么不能直接把原始 swap 明细交给规则引擎?
答:
因为原始 swap 是过程数据,一笔交易里可能有多跳路由、中间 token 和多个 swap 事件。规则系统更关心的是用户最终对某个 token 的净行为,比如净买入了多少、价值有多大、属于哪个协议。所以中间需要一个归一化层,把复杂过程转换成统一结果。
Q3:这个 Processor 的输入和输出分别是什么?
答:
输入是交易级的 SwapMessage,里面包含这笔交易的 Swaps、BalanceChanges、链、区块号、协议、时间等信息。输出是标准化的 SingleSwapResult,也就是某个 maker 对某个 token 的净买入或净卖出结果,包含数量、价值、价格、协议和时间,适合后续规则判断。
Q4:为什么要先判断钱包是否在监控列表里?
答:
这是一个典型的前置过滤优化。链上全量交易很多,如果这笔交易根本不涉及用户关注的钱包,就没必要继续做净流量计算、价格补全和规则检查。这样可以明显降低无效计算,提升流水线吞吐。
Q5:为什么要结合 BalanceChanges?
答:
因为单看 swap 过程,可能会把中间币或路由步骤误当成用户真实行为。BalanceChanges 反映的是交易结束后 token 的净变化,所以更适合判断钱包最终是净买入还是净卖出哪些 token。它能帮助系统从过程视角切换到结果视角。
Q6:你们为什么按 maker + token 聚合?
答:
因为后续规则本质上是围绕“某钱包是否对某 token 发生了有效交易行为”来判断的,比如交易额过滤、协议过滤、钱包组全买等。按 maker + token 聚合后,下游就能直接基于这个粒度做判断,而不需要理解底层多跳 swap 过程。
Q7:这个 Processor 里有哪些性能优化点?
答:
主要有 3 个:
- 前置过滤,只处理监控钱包相关交易。
- 只处理
BalanceChanges非零的 token,减少无效聚合。 - 对平台币价格做短 TTL 缓存,降低重复价格推导开销。
Q8:这个 Processor 和规则引擎的边界怎么划分?
答:
Processor 负责“语义归一化”,把复杂链上交易转换成标准业务对象;规则引擎负责“策略判断”,比如链过滤、金额阈值、协议过滤、钱包组全买等。这样分层之后,规则系统不需要关心底层链上解析细节。
Q9:如果要扩展新链,这个 Processor 需要怎么改?
答:
如果新链的数据最终也能转成统一的 SwapMessage 或至少转成 SwapInfo + BalanceChanges,那这个 Processor 基本可以直接复用。主要改动通常在上游解码和链元信息管理层,比如平台币、token level、价格字段适配。
Q10:如果一笔交易里有多个 token 变化,最终会怎么输出?
答:
它会先形成交易级的 MergedSwapResult,里面按 maker 汇总净流入和净流出,然后再拆成多个 SingleSwapResult。也就是说,同一笔交易最终可能对应多个标准事件,每个事件只表达一个钱包对一个 token 的净行为。
Q11:你觉得这个 Processor 最体现什么后端能力?
答:
我觉得主要体现了三点:
- 数据抽象能力,把复杂链上过程数据提炼成统一业务对象。
- 中间层解耦能力,让不同数据源复用同一套后续规则链路。
- 性能和实时性意识,通过前置过滤和轻量缓存降低无效计算。
Q12:如果面试官问“你真的做过这个模块吗”,你怎么证明?
答:
我会从输入输出和业务目标讲起,再解释为什么要结合 BalanceChanges、为什么按 maker + token 聚合、最后怎么生成 SingleSwapResult。因为这个模块的关键不是 API 调用,而是把交易过程转成业务结果,这一套逻辑能讲顺,基本就能体现我是真理解过这段代码的。
总结
“我比较适合展开讲的是交易聚合 Processor。它负责把 Kafka/Redis 汇入的交易级 swap 消息,先过滤出监控钱包相关交易,再结合 BalanceChanges 识别真实净变化,按 maker + token 聚合净流量,并补齐价格和价值信息,最后输出标准化的 SingleSwapResult 给规则引擎。这个模块的核心价值是把复杂链上过程数据,转换成可被监控和跟单策略直接消费的统一业务事件。”