Go | 批量数据处理器
〰️

Go | 批量数据处理器

AI custom autofill
BatchBuilder efficiently processes data in batches to enhance performance.
Tags
go
CS
Published
November 7, 2025

一 BatchBuilder 批量处理使用指南

📚 什么是批量处理?

想象一下你每天要寄很多封信:
❌ 不好的方式(逐条处理):
  • 收到第1封信 → 立即跑去邮局寄 → 回来
  • 收到第2封信 → 立即跑去邮局寄 → 回来
  • 收到第3封信 → 立即跑去邮局寄 → 回来
  • ... 这样来回跑100次,累死了!
✅ 好的方式(批量处理):
  • 收到信先放在一个盒子里
  • 等盒子满了(比如100封),或者等了一段时间(比如5分钟),再一次性去邮局寄
  • 这样只需要跑几次,效率大大提升!
BatchBuilder 就是帮你做这件事的工具:它会把零散的数据收集起来,等到合适的时机(数量够了或时间到了),再一次性处理。

🎯 为什么需要批量处理?

1. 提高效率

  • 网络请求有开销:每次发送数据都要建立连接、发送请求头等
  • 批量发送可以减少请求次数,提高吞吐量

2. 减少延迟

  • 如果数据量很小,立即发送可能不值得
  • 等待一小段时间,收集更多数据一起发送,整体延迟反而更小

3. 保护系统

  • 如果数据产生速度太快,逐条处理可能压垮系统
  • 批量处理可以控制处理速度,保护下游服务

🔧 BatchBuilder 核心参数

创建 BatchBuilder 时需要设置4个参数:
batch := util.NewBatchBuilder( maxBatchSize, // 单个批次最大大小 maxBatchCount, // 最大批次数量(缓冲队列长度) linger, // 延迟时间(等待多久后强制处理) onBatch, // 批次处理回调函数 )

参数详解

参数
说明
类比
maxBatchSize
单个批次最多包含多少数据
盒子最多能装100封信
maxBatchCount
最多可以同时存在多少个批次
最多可以准备10个盒子
linger
批次等待多久后必须处理(即使没满)
即使盒子没满,5分钟后也要去寄
onBatch
当批次准备好时,执行的处理函数
去邮局寄信的动作

触发处理的三种情况

  1. 批次满了:当批次大小达到 maxBatchSize 时,立即处理
  1. 时间到了:即使批次没满,等待 linger 时间后也会处理
  1. 队列积压:当有多个批次时,会优先处理最老的批次

💡 实际使用案例

案例1:批量写入时序数据库(TSDB Writer)

场景:监控系统每秒产生大量指标数据,需要写入时序数据库。如果每条数据都单独发送,会产生大量网络请求,效率很低。
解决方案:使用 BatchBuilder 批量收集指标,达到一定数量或时间后,一次性发送到数据库。
package tsdb import ( "time" "icode.baidu.com/baidu/monitor-components/bcm-collector/internal/util" pb "icode.baidu.com/baidu/monitor-components/bcm-collector/internal/proto/v2" ) type WriterImpl struct { logger *zap.Logger endpoint string client *http.Client batch *util.BatchBuilder[[]*pb.ResourceMetricsProto] } // 创建 Writer,配置批量处理参数 func NewWriter( logger *zap.Logger, endpoint string, timeout time.Duration, batchSize int, // 每个批次最多1000个指标 batchCount int, // 最多缓存10个批次 batchLinger time.Duration, // 最多等待5秒 ) *WriterImpl { w := &WriterImpl{ logger: logger, endpoint: endpoint, client: &http.Client{Timeout: timeout}, } // 创建 BatchBuilder,设置批量处理回调函数 w.batch = util.NewBatchBuilder( batchSize, // 1000 batchCount, // 10 batchLinger, // 5秒 w.export, // 当批次准备好时,调用 export 函数 ) return w } // 启动批量处理器 func (w *WriterImpl) Start() { w.batch.Start() } // 停止批量处理器(会等待所有批次处理完成) func (w *WriterImpl) Stop() { w.batch.Stop() } // 写入数据(会被批量收集,不会立即发送) func (w *WriterImpl) Write(resourceMetrics []*pb.ResourceMetricsProto) { // 计算这批数据的指标数量 metricCount := 0 for _, rm := range resourceMetrics { metricCount += len(rm.Metrics) } // 添加到批次中 if !w.batch.Append(util.ContextualValue[[]*pb.ResourceMetricsProto]{ Value: resourceMetrics, Size: metricCount, // 告诉 BatchBuilder 这批数据的大小 }) { w.logger.Warn("fail to append metrics to batch") } } // 批次处理函数:当批次准备好时,会被自动调用 func (w *WriterImpl) export(resourceMetrics [][]*pb.ResourceMetricsProto) { // 将多个批次的数据合并 req := &prompb.WriteRequest{} for _, rms := range resourceMetrics { for _, rm := range rms { req.Timeseries = appendTimeseries(req.Timeseries, rm) } } // 一次性发送到数据库 // ... HTTP 请求代码 ... }
工作流程
  1. 调用 Write() 写入数据 → 数据被添加到批次中
  1. 当批次达到1000个指标,或等待5秒后 → 自动调用 export() 函数
  1. export() 将所有数据合并,一次性发送到数据库
效果
  • 原来:每秒1000条数据 = 1000次网络请求
  • 现在:每秒1000条数据 = 1-2次网络请求(批量发送)
  • 性能提升:500-1000倍!


📝 完整使用示例

下面是一个最简单的使用示例,帮助你快速上手:
package main import ( "fmt" "time" "icode.baidu.com/baidu/monitor-components/bcm-collector/internal/util" ) func main() { // 1. 创建 BatchBuilder // 参数说明: // - maxBatchSize: 10 (每个批次最多10条数据) // - maxBatchCount: 5 (最多缓存5个批次) // - linger: 3秒 (最多等待3秒) // - onBatch: 处理函数 (批次准备好时调用) batch := util.NewBatchBuilder( 10, 5, 3*time.Second, func(items []string) { // 当批次准备好时,这个函数会被调用 fmt.Printf("处理批次,共 %d 条数据: %v\\n", len(items), items) }, ) // 2. 启动批量处理器 batch.Start() defer batch.Stop() // 程序退出时停止 // 3. 添加数据 for i := 0; i < 25; i++ { data := fmt.Sprintf("数据-%d", i) batch.Append(util.ContextualValue[string]{ Value: data, Size: 1, // 每条数据大小为1 }) fmt.Printf("添加: %s\\n", data) time.Sleep(100 * time.Millisecond) // 模拟数据产生的间隔 } // 4. 等待所有批次处理完成 time.Sleep(5 * time.Second) }
运行结果
添加: 数据-0 添加: 数据-1 ... 添加: 数据-9 处理批次,共 10 条数据: [数据-0 数据-1 ... 数据-9] ← 批次满了,立即处理 添加: 数据-10 ... 添加: 数据-19 处理批次,共 10 条数据: [数据-10 数据-11 ... 数据-19] ← 批次满了,立即处理 添加: 数据-20 ... (等待3秒后) 处理批次,共 5 条数据: [数据-20 数据-21 ... 数据-24] ← 时间到了,处理剩余数据

⚙️ 参数调优建议

如何选择合适的参数?

1. maxBatchSize(批次大小)

  • 太小:批次处理频繁,效率提升不明显
  • 太大:等待时间长,内存占用大
  • 建议:根据数据大小和处理速度,选择 100-10000 之间的值

2. maxBatchCount(批次数量)

  • 太小:容易拒绝新数据(背压)
  • 太大:内存占用大
  • 建议:设置为 5-20,根据数据产生速度调整

3. linger(延迟时间)

  • 太小:批次可能总是没满就处理,效率低
  • 太大:数据延迟高
  • 建议:根据业务需求,选择 1-10 秒

实际配置示例

// 高频小数据场景(如日志) batch := util.NewBatchBuilder( 1000, // 批次大小:1000条 10, // 批次数量:10个 2 * time.Second, // 延迟:2秒 onBatch, ) // 低频大数据场景(如文件上传) batch := util.NewBatchBuilder( 10, // 批次大小:10个文件 3, // 批次数量:3个 30 * time.Second, // 延迟:30秒 onBatch, ) // 实时性要求高的场景 batch := util.NewBatchBuilder( 100, // 批次大小:100条 5, // 批次数量:5个 500 * time.Millisecond, // 延迟:500毫秒 onBatch, )

⚠️ 注意事项

1. 必须调用 Start() 和 Stop()

batch := util.NewBatchBuilder(...) batch.Start() // 必须启动,否则定时处理不会工作 defer batch.Stop() // 必须停止,否则可能丢失数据

2. Append() 可能返回 false

当批次队列满了(达到 maxBatchCount)时,Append() 会返回 false,表示数据被拒绝。这时需要:
  • 记录日志
  • 可能需要调整 maxBatchCount 参数
  • 或者实现重试机制

3. onBatch 函数应该是异步的

虽然 BatchBuilder 会在独立的 goroutine 中调用 onBatch,但如果 onBatch 执行时间很长,可能会阻塞后续批次的处理。建议:
  • onBatch 内部再启动 goroutine(如果处理很慢)
  • 或者使用异步队列

4. 优雅关闭

Stop() 会等待所有批次处理完成,确保数据不丢失。在程序退出时一定要调用。

📚 总结

BatchBuilder 是一个强大的批量处理工具,适用于:
  • ✅ 网络请求批量发送
  • ✅ 数据库批量写入
  • ✅ 消息队列批量消费
  • ✅ 日志批量收集
  • ✅ 任何需要批量处理的场景
记住三个要点
  1. 批量收集:数据先收集,不立即处理
  1. 自动触发:批次满了或时间到了,自动处理
  1. 提高效率:减少处理次数,提高吞吐量
 

二 原理讲解

核心数据结构

BatchBuilder 内部使用以下数据结构:
type BatchBuilder[V any] struct { maxBatchSize int // 单个批次最大大小 maxBatchCount int // 最大批次数量(缓冲队列长度) lingerMilli int64 // 延迟时间(毫秒) onBatch func([]V) // 批次处理回调函数 batches []*Batch[V] // 批次队列 lock sync.Mutex // 保护共享状态的互斥锁 full chan bool // 批次满时的通知通道 wg *sync.WaitGroup // 等待所有处理完成 cancel context.CancelFunc // 取消后台 goroutine } type Batch[V any] struct { values []V // 批次中的数据 timeout int64 // 批次超时时间(Unix 毫秒时间戳) size int // 批次当前大小 }

工作流程

1. 初始化阶段

batch := util.NewBatchBuilder(1000, 10, 5*time.Second, onBatch) batch.Start()
  • 创建 BatchBuilder 实例,设置参数
  • 调用 Start() 启动后台 goroutine,负责定时检查和触发批次处理

2. 数据添加阶段(Append)

batch.Append(util.ContextualValue[V]{Value: data, Size: 1})
执行流程:
  1. 获取锁bb.lock.Lock() - 保护共享状态
  1. 查找或创建批次
      • 如果批次队列为空,或最后一个批次已满,创建新批次
      • 新批次的超时时间 = 当前时间 + lingerMilli
  1. 添加数据
      • 将数据添加到最后一个批次
      • 更新批次大小(如果 Size > 0 使用自定义大小,否则 +1)
  1. 检查是否需要立即处理
      • 如果批次队列长度 > 1(有多个批次在等待)
      • 或者当前批次已满(size >= maxBatchSize
      • 则立即取出第一个批次进行处理
  1. 释放锁bb.lock.Unlock()
  1. 异步处理:如果取出了批次,在独立的 goroutine 中调用 onBatch()
关键代码片段:
func (bb *BatchBuilder[V]) append(cv ContextualValue[V]) ([]V, bool) { bb.lock.Lock() defer bb.lock.Unlock() // 获取或创建最后一个批次 var last *Batch[V] if len(bb.batches) > 0 { last = bb.batches[len(bb.batches)-1] } if last == nil || last.size >= bb.maxBatchSize { // 检查队列是否已满 if len(bb.batches) >= bb.maxBatchCount { return nil, false // 队列满,拒绝新数据 } // 创建新批次 last = &Batch[V]{timeout: time.Now().UnixMilli() + bb.lingerMilli} bb.batches = append(bb.batches, last) } // 添加数据到批次 last.values = append(last.values, cv.Value) last.size += cv.Size // 检查是否需要立即处理 if len(bb.batches) > 1 || last.size >= bb.maxBatchSize { batch := bb.batches[0] bb.batches[0] = nil // 防止内存泄漏 bb.batches = bb.batches[1:] return batch.values, true } return nil, true }

3. 定时检查阶段(runOnce)

后台 goroutine 持续运行,定期检查批次状态:
func (bb *BatchBuilder[V]) runOnce(ctx context.Context) bool { now := time.Now().UnixMilli() timeout := now + bb.lingerMilli isBatchFull := false // 检查第一个批次的状态 bb.lock.Lock() if len(bb.batches) > 0 { timeout = bb.batches[0].timeout // 使用第一个批次的超时时间 isBatchFull = bb.batches[0].size >= bb.maxBatchSize } bb.lock.Unlock() // 如果还没到超时时间且批次未满,等待 if timeout > now && !isBatchFull { select { case <-bb.full: // 批次满时的通知 case <-time.After(time.Duration(timeout-now) * time.Millisecond): // 等待到超时 case <-ctx.Done(): // 收到停止信号 // 强制处理所有批次 for bb.processBatch(true) { } return false } } // 处理批次 bb.processBatch(false) return true }
等待机制:
  • 等待批次满:通过 bb.full channel 接收通知(当批次满时,Append 会发送信号)
  • 等待超时:计算到第一个批次超时的时间,使用 time.After 等待
  • 等待停止:监听 ctx.Done(),收到停止信号时强制处理所有批次

4. 批次处理阶段(processBatch)

func (bb *BatchBuilder[V]) processBatch(force bool) bool { batch, ok := bb.popBatch(force) if !ok { return false } // 在独立 goroutine 中异步处理 bb.wg.Add(1) go func() { defer bb.wg.Done() bb.onBatch(batch.values) }() return true }
处理条件(popBatch):
  • force = true:强制处理(停止时)
  • force = false 且满足以下任一条件:
    • 批次大小 >= maxBatchSize(批次已满)
    • 当前时间 >= 批次超时时间(时间到了)

5. 停止阶段(Stop)

func (bb *BatchBuilder[V]) Stop() { bb.cancel() // 取消后台 goroutine bb.wg.Wait() // 等待所有处理完成 }
  • 取消 context,停止后台 goroutine
  • 等待所有 onBatch 处理完成(通过 WaitGroup

关键设计点

1. 线程安全

  • 使用 sync.Mutex 保护共享状态(batches 队列)
  • 所有对 batches 的访问都在锁保护下进行

2. 异步处理

  • onBatch 回调在独立的 goroutine 中执行
  • 不会阻塞 Append 操作,提高吞吐量

3. 内存管理

  • 从队列中取出批次后,立即将对应位置设为 nil,防止内存泄漏
  • 使用切片操作 bb.batches[1:] 移除已处理的批次

4. 优雅停止

  • 使用 context.Context 控制 goroutine 生命周期
  • 停止时强制处理所有剩余批次,不丢失数据
  • 使用 WaitGroup 确保所有处理完成后再返回

5. 批次队列机制

  • 支持多个批次同时存在(最多 maxBatchCount 个)
  • 总是处理最老的批次(FIFO 队列)
  • 如果队列中有多个批次,立即处理第一个,避免积压

时序图示例

时间轴:0ms -----> 1000ms -----> 2000ms -----> 3000ms -----> 5000ms Append(data1) ──┐ ├─> 创建批次1,timeout=5000ms Append(data2) ──┤ 批次1: [data1, data2], size=2 │ Append(...) │ 批次1持续增长... │ Append(data1000)┼─> 批次1已满 (size=1000) │ 立即处理批次1 │ 创建批次2,timeout=5000ms │ │ 后台goroutine检查:批次2未满,等待超时 │ │ 5000ms到达 ──> 处理批次2(即使未满)

性能优化点

  1. 最小化锁持有时间:只在必要时加锁,快速释放
  1. 批量处理:减少函数调用和网络请求次数
  1. 异步执行onBatch 不阻塞主流程
  1. 智能等待:精确计算等待时间,避免无效轮询

附录

package util import ( "context" "sync" "time" ) type BatchBuilder[V any] struct { maxBatchSize int maxBatchCount int lingerMilli int64 onBatch func([]V) batches []*Batch[V] lock sync.Mutex full chan bool wg *sync.WaitGroup cancel context.CancelFunc } type Batch[V any] struct { values []V timeout int64 size int } type ContextualValue[V any] struct { Value V Size int } func NewBatchBuilder[V any](maxBatchSize int, maxBatchCount int, linger time.Duration, onBatch func([]V)) *BatchBuilder[V] { return &BatchBuilder[V]{ maxBatchSize: maxBatchSize, maxBatchCount: maxBatchCount, lingerMilli: linger.Milliseconds(), onBatch: onBatch, full: make(chan bool, 1), wg: &sync.WaitGroup{}, } } func (bb *BatchBuilder[V]) BatchSize() int { return bb.maxBatchSize } func (bb *BatchBuilder[V]) Start() { ctx, cancel := context.WithCancel(context.Background()) bb.cancel = cancel bb.wg.Add(1) go func(ctx context.Context) { defer bb.wg.Done() for { if !bb.runOnce(ctx) { return } } }(ctx) } func (bb *BatchBuilder[V]) Stop() { bb.cancel() bb.wg.Wait() } func (bb *BatchBuilder[V]) runOnce(ctx context.Context) bool { now := time.Now().UnixMilli() timeout := now + bb.lingerMilli isBatchFull := false bb.lock.Lock() if len(bb.batches) > 0 { timeout = bb.batches[0].timeout isBatchFull = bb.batches[0].size >= bb.maxBatchSize } bb.lock.Unlock() if timeout > now && !isBatchFull { select { case <-bb.full: case <-time.After(time.Duration(timeout-now) * time.Millisecond): case <-ctx.Done(): // Force to process all batches before return. for bb.processBatch(true) { } return false } } bb.processBatch(false) return true } func (bb *BatchBuilder[V]) Append(cv ContextualValue[V]) bool { batch, ok := bb.append(cv) if ok && len(batch) > 0 { bb.wg.Add(1) go func() { defer bb.wg.Done() bb.onBatch(batch) }() } return ok } func (bb *BatchBuilder[V]) append(cv ContextualValue[V]) ([]V, bool) { bb.lock.Lock() defer bb.lock.Unlock() var last *Batch[V] if len(bb.batches) > 0 { last = bb.batches[len(bb.batches)-1] } if last == nil || last.size >= bb.maxBatchSize { if len(bb.batches) >= bb.maxBatchCount { return nil, false } last = &Batch[V]{timeout: time.Now().UnixMilli() + bb.lingerMilli} bb.batches = append(bb.batches, last) } last.values = append(last.values, cv.Value) if cv.Size <= 0 { last.size += 1 } else { last.size += cv.Size } if len(bb.batches) > 1 || last.size >= bb.maxBatchSize { batch := bb.batches[0] bb.batches[0] = nil // Get rid of memory leak. bb.batches = bb.batches[1:] return batch.values, true } return nil, true } func (bb *BatchBuilder[V]) processBatch(force bool) bool { batch, ok := bb.popBatch(force) if !ok { return false } // Need to wait all onBatch() routines to finish before stopping. bb.wg.Add(1) go func() { defer bb.wg.Done() bb.onBatch(batch.values) }() return true } func (bb *BatchBuilder[V]) popBatch(force bool) (*Batch[V], bool) { bb.lock.Lock() defer bb.lock.Unlock() if len(bb.batches) > 0 { first := bb.batches[0] if force || first.size >= bb.maxBatchSize || first.timeout <= time.Now().UnixMilli() { bb.batches[0] = nil // Get rid of memory leak. bb.batches = bb.batches[1:] return first, true } } return nil, false }