一 BatchBuilder 批量处理使用指南
📚 什么是批量处理?
想象一下你每天要寄很多封信:
❌ 不好的方式(逐条处理):
- 收到第1封信 → 立即跑去邮局寄 → 回来
- 收到第2封信 → 立即跑去邮局寄 → 回来
- 收到第3封信 → 立即跑去邮局寄 → 回来
- ... 这样来回跑100次,累死了!
✅ 好的方式(批量处理):
- 收到信先放在一个盒子里
- 等盒子满了(比如100封),或者等了一段时间(比如5分钟),再一次性去邮局寄
- 这样只需要跑几次,效率大大提升!
BatchBuilder 就是帮你做这件事的工具:它会把零散的数据收集起来,等到合适的时机(数量够了或时间到了),再一次性处理。
🎯 为什么需要批量处理?
1. 提高效率
- 网络请求有开销:每次发送数据都要建立连接、发送请求头等
- 批量发送可以减少请求次数,提高吞吐量
2. 减少延迟
- 如果数据量很小,立即发送可能不值得
- 等待一小段时间,收集更多数据一起发送,整体延迟反而更小
3. 保护系统
- 如果数据产生速度太快,逐条处理可能压垮系统
- 批量处理可以控制处理速度,保护下游服务
🔧 BatchBuilder 核心参数
创建 BatchBuilder 时需要设置4个参数:
batch := util.NewBatchBuilder( maxBatchSize, // 单个批次最大大小 maxBatchCount, // 最大批次数量(缓冲队列长度) linger, // 延迟时间(等待多久后强制处理) onBatch, // 批次处理回调函数 )
参数详解
参数 | 说明 | 类比 |
maxBatchSize | 单个批次最多包含多少数据 | 盒子最多能装100封信 |
maxBatchCount | 最多可以同时存在多少个批次 | 最多可以准备10个盒子 |
linger | 批次等待多久后必须处理(即使没满) | 即使盒子没满,5分钟后也要去寄 |
onBatch | 当批次准备好时,执行的处理函数 | 去邮局寄信的动作 |
触发处理的三种情况
- 批次满了:当批次大小达到
maxBatchSize时,立即处理
- 时间到了:即使批次没满,等待
linger时间后也会处理
- 队列积压:当有多个批次时,会优先处理最老的批次
💡 实际使用案例
案例1:批量写入时序数据库(TSDB Writer)
场景:监控系统每秒产生大量指标数据,需要写入时序数据库。如果每条数据都单独发送,会产生大量网络请求,效率很低。
解决方案:使用 BatchBuilder 批量收集指标,达到一定数量或时间后,一次性发送到数据库。
package tsdb import ( "time" "icode.baidu.com/baidu/monitor-components/bcm-collector/internal/util" pb "icode.baidu.com/baidu/monitor-components/bcm-collector/internal/proto/v2" ) type WriterImpl struct { logger *zap.Logger endpoint string client *http.Client batch *util.BatchBuilder[[]*pb.ResourceMetricsProto] } // 创建 Writer,配置批量处理参数 func NewWriter( logger *zap.Logger, endpoint string, timeout time.Duration, batchSize int, // 每个批次最多1000个指标 batchCount int, // 最多缓存10个批次 batchLinger time.Duration, // 最多等待5秒 ) *WriterImpl { w := &WriterImpl{ logger: logger, endpoint: endpoint, client: &http.Client{Timeout: timeout}, } // 创建 BatchBuilder,设置批量处理回调函数 w.batch = util.NewBatchBuilder( batchSize, // 1000 batchCount, // 10 batchLinger, // 5秒 w.export, // 当批次准备好时,调用 export 函数 ) return w } // 启动批量处理器 func (w *WriterImpl) Start() { w.batch.Start() } // 停止批量处理器(会等待所有批次处理完成) func (w *WriterImpl) Stop() { w.batch.Stop() } // 写入数据(会被批量收集,不会立即发送) func (w *WriterImpl) Write(resourceMetrics []*pb.ResourceMetricsProto) { // 计算这批数据的指标数量 metricCount := 0 for _, rm := range resourceMetrics { metricCount += len(rm.Metrics) } // 添加到批次中 if !w.batch.Append(util.ContextualValue[[]*pb.ResourceMetricsProto]{ Value: resourceMetrics, Size: metricCount, // 告诉 BatchBuilder 这批数据的大小 }) { w.logger.Warn("fail to append metrics to batch") } } // 批次处理函数:当批次准备好时,会被自动调用 func (w *WriterImpl) export(resourceMetrics [][]*pb.ResourceMetricsProto) { // 将多个批次的数据合并 req := &prompb.WriteRequest{} for _, rms := range resourceMetrics { for _, rm := range rms { req.Timeseries = appendTimeseries(req.Timeseries, rm) } } // 一次性发送到数据库 // ... HTTP 请求代码 ... }
工作流程:
- 调用
Write()写入数据 → 数据被添加到批次中
- 当批次达到1000个指标,或等待5秒后 → 自动调用
export()函数
export()将所有数据合并,一次性发送到数据库
效果:
- 原来:每秒1000条数据 = 1000次网络请求
- 现在:每秒1000条数据 = 1-2次网络请求(批量发送)
- 性能提升:500-1000倍!
📝 完整使用示例
下面是一个最简单的使用示例,帮助你快速上手:
package main import ( "fmt" "time" "icode.baidu.com/baidu/monitor-components/bcm-collector/internal/util" ) func main() { // 1. 创建 BatchBuilder // 参数说明: // - maxBatchSize: 10 (每个批次最多10条数据) // - maxBatchCount: 5 (最多缓存5个批次) // - linger: 3秒 (最多等待3秒) // - onBatch: 处理函数 (批次准备好时调用) batch := util.NewBatchBuilder( 10, 5, 3*time.Second, func(items []string) { // 当批次准备好时,这个函数会被调用 fmt.Printf("处理批次,共 %d 条数据: %v\\n", len(items), items) }, ) // 2. 启动批量处理器 batch.Start() defer batch.Stop() // 程序退出时停止 // 3. 添加数据 for i := 0; i < 25; i++ { data := fmt.Sprintf("数据-%d", i) batch.Append(util.ContextualValue[string]{ Value: data, Size: 1, // 每条数据大小为1 }) fmt.Printf("添加: %s\\n", data) time.Sleep(100 * time.Millisecond) // 模拟数据产生的间隔 } // 4. 等待所有批次处理完成 time.Sleep(5 * time.Second) }
运行结果:
添加: 数据-0 添加: 数据-1 ... 添加: 数据-9 处理批次,共 10 条数据: [数据-0 数据-1 ... 数据-9] ← 批次满了,立即处理 添加: 数据-10 ... 添加: 数据-19 处理批次,共 10 条数据: [数据-10 数据-11 ... 数据-19] ← 批次满了,立即处理 添加: 数据-20 ... (等待3秒后) 处理批次,共 5 条数据: [数据-20 数据-21 ... 数据-24] ← 时间到了,处理剩余数据
⚙️ 参数调优建议
如何选择合适的参数?
1. maxBatchSize(批次大小)
- 太小:批次处理频繁,效率提升不明显
- 太大:等待时间长,内存占用大
- 建议:根据数据大小和处理速度,选择 100-10000 之间的值
2. maxBatchCount(批次数量)
- 太小:容易拒绝新数据(背压)
- 太大:内存占用大
- 建议:设置为 5-20,根据数据产生速度调整
3. linger(延迟时间)
- 太小:批次可能总是没满就处理,效率低
- 太大:数据延迟高
- 建议:根据业务需求,选择 1-10 秒
实际配置示例
// 高频小数据场景(如日志) batch := util.NewBatchBuilder( 1000, // 批次大小:1000条 10, // 批次数量:10个 2 * time.Second, // 延迟:2秒 onBatch, ) // 低频大数据场景(如文件上传) batch := util.NewBatchBuilder( 10, // 批次大小:10个文件 3, // 批次数量:3个 30 * time.Second, // 延迟:30秒 onBatch, ) // 实时性要求高的场景 batch := util.NewBatchBuilder( 100, // 批次大小:100条 5, // 批次数量:5个 500 * time.Millisecond, // 延迟:500毫秒 onBatch, )
⚠️ 注意事项
1. 必须调用 Start() 和 Stop()
batch := util.NewBatchBuilder(...) batch.Start() // 必须启动,否则定时处理不会工作 defer batch.Stop() // 必须停止,否则可能丢失数据
2. Append() 可能返回 false
当批次队列满了(达到
maxBatchCount)时,Append() 会返回 false,表示数据被拒绝。这时需要:- 记录日志
- 可能需要调整
maxBatchCount参数
- 或者实现重试机制
3. onBatch 函数应该是异步的
虽然 BatchBuilder 会在独立的 goroutine 中调用
onBatch,但如果 onBatch 执行时间很长,可能会阻塞后续批次的处理。建议:- 在
onBatch内部再启动 goroutine(如果处理很慢)
- 或者使用异步队列
4. 优雅关闭
Stop() 会等待所有批次处理完成,确保数据不丢失。在程序退出时一定要调用。📚 总结
BatchBuilder 是一个强大的批量处理工具,适用于:
- ✅ 网络请求批量发送
- ✅ 数据库批量写入
- ✅ 消息队列批量消费
- ✅ 日志批量收集
- ✅ 任何需要批量处理的场景
记住三个要点:
- 批量收集:数据先收集,不立即处理
- 自动触发:批次满了或时间到了,自动处理
- 提高效率:减少处理次数,提高吞吐量
二 原理讲解
核心数据结构
BatchBuilder 内部使用以下数据结构:
type BatchBuilder[V any] struct { maxBatchSize int // 单个批次最大大小 maxBatchCount int // 最大批次数量(缓冲队列长度) lingerMilli int64 // 延迟时间(毫秒) onBatch func([]V) // 批次处理回调函数 batches []*Batch[V] // 批次队列 lock sync.Mutex // 保护共享状态的互斥锁 full chan bool // 批次满时的通知通道 wg *sync.WaitGroup // 等待所有处理完成 cancel context.CancelFunc // 取消后台 goroutine } type Batch[V any] struct { values []V // 批次中的数据 timeout int64 // 批次超时时间(Unix 毫秒时间戳) size int // 批次当前大小 }
工作流程
1. 初始化阶段
batch := util.NewBatchBuilder(1000, 10, 5*time.Second, onBatch) batch.Start()
- 创建 BatchBuilder 实例,设置参数
- 调用
Start()启动后台 goroutine,负责定时检查和触发批次处理
2. 数据添加阶段(Append)
batch.Append(util.ContextualValue[V]{Value: data, Size: 1})
执行流程:
- 获取锁:
bb.lock.Lock()- 保护共享状态
- 查找或创建批次:
- 如果批次队列为空,或最后一个批次已满,创建新批次
- 新批次的超时时间 = 当前时间 +
lingerMilli
- 添加数据:
- 将数据添加到最后一个批次
- 更新批次大小(如果
Size > 0使用自定义大小,否则 +1)
- 检查是否需要立即处理:
- 如果批次队列长度 > 1(有多个批次在等待)
- 或者当前批次已满(
size >= maxBatchSize) - 则立即取出第一个批次进行处理
- 释放锁:
bb.lock.Unlock()
- 异步处理:如果取出了批次,在独立的 goroutine 中调用
onBatch()
关键代码片段:
func (bb *BatchBuilder[V]) append(cv ContextualValue[V]) ([]V, bool) { bb.lock.Lock() defer bb.lock.Unlock() // 获取或创建最后一个批次 var last *Batch[V] if len(bb.batches) > 0 { last = bb.batches[len(bb.batches)-1] } if last == nil || last.size >= bb.maxBatchSize { // 检查队列是否已满 if len(bb.batches) >= bb.maxBatchCount { return nil, false // 队列满,拒绝新数据 } // 创建新批次 last = &Batch[V]{timeout: time.Now().UnixMilli() + bb.lingerMilli} bb.batches = append(bb.batches, last) } // 添加数据到批次 last.values = append(last.values, cv.Value) last.size += cv.Size // 检查是否需要立即处理 if len(bb.batches) > 1 || last.size >= bb.maxBatchSize { batch := bb.batches[0] bb.batches[0] = nil // 防止内存泄漏 bb.batches = bb.batches[1:] return batch.values, true } return nil, true }
3. 定时检查阶段(runOnce)
后台 goroutine 持续运行,定期检查批次状态:
func (bb *BatchBuilder[V]) runOnce(ctx context.Context) bool { now := time.Now().UnixMilli() timeout := now + bb.lingerMilli isBatchFull := false // 检查第一个批次的状态 bb.lock.Lock() if len(bb.batches) > 0 { timeout = bb.batches[0].timeout // 使用第一个批次的超时时间 isBatchFull = bb.batches[0].size >= bb.maxBatchSize } bb.lock.Unlock() // 如果还没到超时时间且批次未满,等待 if timeout > now && !isBatchFull { select { case <-bb.full: // 批次满时的通知 case <-time.After(time.Duration(timeout-now) * time.Millisecond): // 等待到超时 case <-ctx.Done(): // 收到停止信号 // 强制处理所有批次 for bb.processBatch(true) { } return false } } // 处理批次 bb.processBatch(false) return true }
等待机制:
- 等待批次满:通过
bb.fullchannel 接收通知(当批次满时,Append会发送信号)
- 等待超时:计算到第一个批次超时的时间,使用
time.After等待
- 等待停止:监听
ctx.Done(),收到停止信号时强制处理所有批次
4. 批次处理阶段(processBatch)
func (bb *BatchBuilder[V]) processBatch(force bool) bool { batch, ok := bb.popBatch(force) if !ok { return false } // 在独立 goroutine 中异步处理 bb.wg.Add(1) go func() { defer bb.wg.Done() bb.onBatch(batch.values) }() return true }
处理条件(popBatch):
force = true:强制处理(停止时)
force = false且满足以下任一条件:- 批次大小 >=
maxBatchSize(批次已满) - 当前时间 >= 批次超时时间(时间到了)
5. 停止阶段(Stop)
func (bb *BatchBuilder[V]) Stop() { bb.cancel() // 取消后台 goroutine bb.wg.Wait() // 等待所有处理完成 }
- 取消 context,停止后台 goroutine
- 等待所有
onBatch处理完成(通过WaitGroup)
关键设计点
1. 线程安全
- 使用
sync.Mutex保护共享状态(batches队列)
- 所有对
batches的访问都在锁保护下进行
2. 异步处理
onBatch回调在独立的 goroutine 中执行
- 不会阻塞
Append操作,提高吞吐量
3. 内存管理
- 从队列中取出批次后,立即将对应位置设为
nil,防止内存泄漏
- 使用切片操作
bb.batches[1:]移除已处理的批次
4. 优雅停止
- 使用
context.Context控制 goroutine 生命周期
- 停止时强制处理所有剩余批次,不丢失数据
- 使用
WaitGroup确保所有处理完成后再返回
5. 批次队列机制
- 支持多个批次同时存在(最多
maxBatchCount个)
- 总是处理最老的批次(FIFO 队列)
- 如果队列中有多个批次,立即处理第一个,避免积压
时序图示例
时间轴:0ms -----> 1000ms -----> 2000ms -----> 3000ms -----> 5000ms Append(data1) ──┐ ├─> 创建批次1,timeout=5000ms Append(data2) ──┤ 批次1: [data1, data2], size=2 │ Append(...) │ 批次1持续增长... │ Append(data1000)┼─> 批次1已满 (size=1000) │ 立即处理批次1 │ 创建批次2,timeout=5000ms │ │ 后台goroutine检查:批次2未满,等待超时 │ │ 5000ms到达 ──> 处理批次2(即使未满)
性能优化点
- 最小化锁持有时间:只在必要时加锁,快速释放
- 批量处理:减少函数调用和网络请求次数
- 异步执行:
onBatch不阻塞主流程
- 智能等待:精确计算等待时间,避免无效轮询
附录
package util import ( "context" "sync" "time" ) type BatchBuilder[V any] struct { maxBatchSize int maxBatchCount int lingerMilli int64 onBatch func([]V) batches []*Batch[V] lock sync.Mutex full chan bool wg *sync.WaitGroup cancel context.CancelFunc } type Batch[V any] struct { values []V timeout int64 size int } type ContextualValue[V any] struct { Value V Size int } func NewBatchBuilder[V any](maxBatchSize int, maxBatchCount int, linger time.Duration, onBatch func([]V)) *BatchBuilder[V] { return &BatchBuilder[V]{ maxBatchSize: maxBatchSize, maxBatchCount: maxBatchCount, lingerMilli: linger.Milliseconds(), onBatch: onBatch, full: make(chan bool, 1), wg: &sync.WaitGroup{}, } } func (bb *BatchBuilder[V]) BatchSize() int { return bb.maxBatchSize } func (bb *BatchBuilder[V]) Start() { ctx, cancel := context.WithCancel(context.Background()) bb.cancel = cancel bb.wg.Add(1) go func(ctx context.Context) { defer bb.wg.Done() for { if !bb.runOnce(ctx) { return } } }(ctx) } func (bb *BatchBuilder[V]) Stop() { bb.cancel() bb.wg.Wait() } func (bb *BatchBuilder[V]) runOnce(ctx context.Context) bool { now := time.Now().UnixMilli() timeout := now + bb.lingerMilli isBatchFull := false bb.lock.Lock() if len(bb.batches) > 0 { timeout = bb.batches[0].timeout isBatchFull = bb.batches[0].size >= bb.maxBatchSize } bb.lock.Unlock() if timeout > now && !isBatchFull { select { case <-bb.full: case <-time.After(time.Duration(timeout-now) * time.Millisecond): case <-ctx.Done(): // Force to process all batches before return. for bb.processBatch(true) { } return false } } bb.processBatch(false) return true } func (bb *BatchBuilder[V]) Append(cv ContextualValue[V]) bool { batch, ok := bb.append(cv) if ok && len(batch) > 0 { bb.wg.Add(1) go func() { defer bb.wg.Done() bb.onBatch(batch) }() } return ok } func (bb *BatchBuilder[V]) append(cv ContextualValue[V]) ([]V, bool) { bb.lock.Lock() defer bb.lock.Unlock() var last *Batch[V] if len(bb.batches) > 0 { last = bb.batches[len(bb.batches)-1] } if last == nil || last.size >= bb.maxBatchSize { if len(bb.batches) >= bb.maxBatchCount { return nil, false } last = &Batch[V]{timeout: time.Now().UnixMilli() + bb.lingerMilli} bb.batches = append(bb.batches, last) } last.values = append(last.values, cv.Value) if cv.Size <= 0 { last.size += 1 } else { last.size += cv.Size } if len(bb.batches) > 1 || last.size >= bb.maxBatchSize { batch := bb.batches[0] bb.batches[0] = nil // Get rid of memory leak. bb.batches = bb.batches[1:] return batch.values, true } return nil, true } func (bb *BatchBuilder[V]) processBatch(force bool) bool { batch, ok := bb.popBatch(force) if !ok { return false } // Need to wait all onBatch() routines to finish before stopping. bb.wg.Add(1) go func() { defer bb.wg.Done() bb.onBatch(batch.values) }() return true } func (bb *BatchBuilder[V]) popBatch(force bool) (*Batch[V], bool) { bb.lock.Lock() defer bb.lock.Unlock() if len(bb.batches) > 0 { first := bb.batches[0] if force || first.size >= bb.maxBatchSize || first.timeout <= time.Now().UnixMilli() { bb.batches[0] = nil // Get rid of memory leak. bb.batches = bb.batches[1:] return first, true } } return nil, false }
