diskqueue #
github.com/nsqio/go-diskqueue
是 NSQ(一个实时分布式消息平台)官方团队开发的一个 Go 语言库,主要用于实现基于磁盘的持久化队列。它的核心作用是为消息系统提供可靠的消息存储和异步处理能力,尤其适合需要高吞吐、持久化和故障恢复的场景。
https://github.com/nsqio/go-diskqueue?tab=readme-ov-file
nsq #
NSQ是一个实时分布式消息传递平台,旨在大规模运行,每天处理数十亿条消息。
它支持分布式和去中心化拓扑,消除单点故障,实现容错和高可用性,并提供可靠的消息传递保证。请参阅功能和保障。
NSQ操作简单,易于配置和部署(所有参数均可在命令行中指定,且编译后的二进制文件无运行时依赖)。为了实现最大的灵活性,它与数据格式无关(消息可以是 JSON、MsgPack、Protocol Buffers 或其他任何格式)。官方的 Go 和 Python 库开箱即用(以及许多其他客户端库),如果您有兴趣构建自己的客户端,可以参考协议规范。
案例: #
实现队列缓冲,当有大量的数据需要缓存,并且需要保证前后顺序一致,就能用到这个
// Helper for serialization (using gob as an example)
func serializeDetectionResult(data *file2.DetectionResult) ([]byte, error) {
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(data); err != nil {
return nil, fmt.Errorf("failed to serialize DetectionResult: %w", err)
}
return buf.Bytes(), nil
}
func deserializeDetectionResult(data []byte) (*file2.DetectionResult, error) {
var result file2.DetectionResult
decoder := gob.NewDecoder(bytes.NewReader(data))
if err := decoder.Decode(&result); err != nil {
return nil, fmt.Errorf("failed to deserialize DetectionResult: %w", err)
}
return &result, nil
}
func log(lvl diskqueue.LogLevel, f string, args ...interface{}) {
api.Log.Infof(f, args...)
}
// 处理并转发检测结果,带有数据库缓冲功能,防止yolo数据积压
// inputData 数据传入通道
// outputData 数据传入出通道
func RelayDetWithDiskQueueBuffer(ctx context.Context, wg *sync.WaitGroup, inputData chan *file2.DetectionResult) (outputData chan *file2.DetectionResult, errData chan error) {
// 1. Setup DiskQueue
outputData = make(chan *file2.DetectionResult, 10)
errData = make(chan error, 3) // 增加容量以防快速连续错误
// 使用临时目录确保唯一性,并在结束时清理
tempDir, err := ioutil.TempDir(filepath.Join(filepath.Dir(api.GetCasePath()), "data"), "diskqueue_relay_*")
if err != nil {
close(outputData)
errData <- fmt.Errorf("failed to create temp dir for diskqueue: %w", err)
close(errData)
return
}
api.Log.Infof("DiskQueue data will be stored in: %s", tempDir)
// diskqueue 参数可以根据需要调整
// name, dataPath, maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimeout,logf
dq := diskqueue.New(
"detection_results_queue", // 队列名称
tempDir, // 数据路径
1024*1024*100, // 每个文件最大100MB
0, // 最小消息尺寸 (0 for default)
1024*1024*5, // 最大消息尺寸 5MB (根据你的 DetectionResult 大小调整)
100, // 每写入 N 条消息后执行一次 fsync (大致)
2*time.Second, // 每隔多久强制 fsync 一次
log, // 使用你的日志Infof (diskqueue 需要一个 func(string, ...interface{}))
)
// --- Goroutine 1: Writer (inputData -> diskqueue) ---
wg.Add(1)
go func() {
defer func() {
close(outputData)
//关闭diskqueue
if err := dq.Close(); err != nil { // 关闭 diskqueue,这将清空所有内部channel并fsync
api.Log.Errorf("Failed to close diskqueue: %v", err)
select {
case errData <- fmt.Errorf("failed to close diskqueue: %w", err):
default:
}
}
//删除目录
if err := os.RemoveAll(tempDir); err != nil { // 清理磁盘队列文件
api.Log.Errorf("Failed to remove diskqueue temp dir %s: %v", tempDir, err)
select {
case errData <- fmt.Errorf("failed to remove diskqueue temp dir %s: %w", tempDir, err):
default:
}
}
close(errData)
wg.Done()
}()
diskQueueReadChan := dq.ReadChan()
for {
currentDepth := dq.Depth()
select {
case input, ok := <-inputData:
if !ok {
api.Log.Info("Writer: Input channel closed. Exiting.")
inputData = nil // 防止再次选中此 case
continue // Input channel closed, nothing more to write
}
// 序列化并存入 diskqueue
serializedData, serErr := serializeDetectionResult(input)
if serErr != nil {
api.Log.Errorf("Writer: Failed to serialize input data: %v", serErr)
select {
case errData <- fmt.Errorf("serialization error: %w", serErr):
default:
}
return // Skip this item, try next
}
putErr := dq.Put(serializedData)
if putErr != nil {
api.Log.Errorf("Writer: Failed to put data into diskqueue: %v", putErr)
select {
case errData <- fmt.Errorf("diskqueue put error: %w", putErr):
default:
}
return
}
case <-ctx.Done():
api.Log.Info("Writer: Context cancelled. Exiting.")
return
default:
//如果有容量了
if len(outputData) < cap(outputData)/2 {
if currentDepth > 0 {
serializedData, ok := <-diskQueueReadChan
if ok {
deserialized, err := deserializeDetectionResult(serializedData)
if err != nil {
api.Log.Errorf("Failed to deserialize data from diskqueue: %v", err)
select {
case errData <- fmt.Errorf("deserialization error from diskqueue: %w", err):
default:
}
return
}
outputData <- deserialized
} else {
return
}
}
} else {
time.Sleep(10 * time.Millisecond)
}
}
// 终止条件:输入关闭,磁盘队列读取通道关闭(意味着队列空且dq已关闭或正在关闭),并且没有待发送项
if inputData == nil && currentDepth == 0 {
api.Log.Info("All data processed. Exiting relay goroutine normally.")
return
}
}
}()
return outputData, errData
}