diskqueue

diskqueue #

github.com/nsqio/go-diskqueue 是 NSQ(一个实时分布式消息平台)官方团队开发的一个 Go 语言库,主要用于实现基于磁盘的持久化队列。它的核心作用是为消息系统提供可靠的消息存储和异步处理能力,尤其适合需要高吞吐、持久化和故障恢复的场景。

https://github.com/nsqio/go-diskqueue?tab=readme-ov-file

nsq #

https://github.com/nsqio

NSQ是一个实时分布式消息传递平台,旨在大规模运行,每天处理数十亿条消息。

它支持分布式去中心化拓扑,消除单点故障,实现容错和高可用性,并提供可靠的消息传递保证。请参阅功能和保障

NSQ操作简单,易于配置和部署(所有参数均可在命令行中指定,且编译后的二进制文件无运行时依赖)。为了实现最大的灵活性,它与数据格式无关(消息可以是 JSON、MsgPack、Protocol Buffers 或其他任何格式)。官方的 Go 和 Python 库开箱即用(以及许多其他客户端库),如果您有兴趣构建自己的客户端,可以参考协议规范

案例: #

实现队列缓冲,当有大量的数据需要缓存,并且需要保证前后顺序一致,就能用到这个

// Helper for serialization (using gob as an example)
func serializeDetectionResult(data *file2.DetectionResult) ([]byte, error) {
	var buf bytes.Buffer
	encoder := gob.NewEncoder(&buf)
	if err := encoder.Encode(data); err != nil {
		return nil, fmt.Errorf("failed to serialize DetectionResult: %w", err)
	}
	return buf.Bytes(), nil
}

func deserializeDetectionResult(data []byte) (*file2.DetectionResult, error) {
	var result file2.DetectionResult
	decoder := gob.NewDecoder(bytes.NewReader(data))
	if err := decoder.Decode(&result); err != nil {
		return nil, fmt.Errorf("failed to deserialize DetectionResult: %w", err)
	}
	return &result, nil
}

func log(lvl diskqueue.LogLevel, f string, args ...interface{}) {
	api.Log.Infof(f, args...)
}

// 处理并转发检测结果,带有数据库缓冲功能,防止yolo数据积压
// inputData  数据传入通道
// outputData 数据传入出通道
func RelayDetWithDiskQueueBuffer(ctx context.Context, wg *sync.WaitGroup, inputData chan *file2.DetectionResult) (outputData chan *file2.DetectionResult, errData chan error) {
	// 1. Setup DiskQueue
	outputData = make(chan *file2.DetectionResult, 10)
	errData = make(chan error, 3) // 增加容量以防快速连续错误
	// 使用临时目录确保唯一性,并在结束时清理
	tempDir, err := ioutil.TempDir(filepath.Join(filepath.Dir(api.GetCasePath()), "data"), "diskqueue_relay_*")
	if err != nil {
		close(outputData)
		errData <- fmt.Errorf("failed to create temp dir for diskqueue: %w", err)
		close(errData)
		return
	}
	api.Log.Infof("DiskQueue data will be stored in: %s", tempDir)

	// diskqueue 参数可以根据需要调整
	// name, dataPath, maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimeout,logf
	dq := diskqueue.New(
		"detection_results_queue", // 队列名称
		tempDir,                   // 数据路径
		1024*1024*100,             // 每个文件最大100MB
		0,                         // 最小消息尺寸 (0 for default)
		1024*1024*5,               // 最大消息尺寸 5MB (根据你的 DetectionResult 大小调整)
		100,                       // 每写入 N 条消息后执行一次 fsync (大致)
		2*time.Second,             // 每隔多久强制 fsync 一次
		log,                       // 使用你的日志Infof (diskqueue 需要一个 func(string, ...interface{}))
	)

	// --- Goroutine 1: Writer (inputData -> diskqueue) ---
	wg.Add(1)
	go func() {
		defer func() {
			close(outputData)
			//关闭diskqueue
			if err := dq.Close(); err != nil { // 关闭 diskqueue,这将清空所有内部channel并fsync
				api.Log.Errorf("Failed to close diskqueue: %v", err)
				select {
				case errData <- fmt.Errorf("failed to close diskqueue: %w", err):
				default:
				}
			}
			//删除目录
			if err := os.RemoveAll(tempDir); err != nil { // 清理磁盘队列文件
				api.Log.Errorf("Failed to remove diskqueue temp dir %s: %v", tempDir, err)
				select {
				case errData <- fmt.Errorf("failed to remove diskqueue temp dir %s: %w", tempDir, err):
				default:
				}
			}
			close(errData)
			wg.Done()
		}()

		diskQueueReadChan := dq.ReadChan()

		for {
			currentDepth := dq.Depth()
			select {
			case input, ok := <-inputData:
				if !ok {
					api.Log.Info("Writer: Input channel closed. Exiting.")
					inputData = nil // 防止再次选中此 case
					continue        // Input channel closed, nothing more to write
				}
				// 序列化并存入 diskqueue
				serializedData, serErr := serializeDetectionResult(input)
				if serErr != nil {
					api.Log.Errorf("Writer: Failed to serialize input data: %v", serErr)
					select {
					case errData <- fmt.Errorf("serialization error: %w", serErr):
					default:
					}
					return // Skip this item, try next
				}

				putErr := dq.Put(serializedData)
				if putErr != nil {
					api.Log.Errorf("Writer: Failed to put data into diskqueue: %v", putErr)
					select {
					case errData <- fmt.Errorf("diskqueue put error: %w", putErr):
					default:
					}
					return
				}
			case <-ctx.Done():
				api.Log.Info("Writer: Context cancelled. Exiting.")
				return
			default:
				//如果有容量了
				if len(outputData) < cap(outputData)/2 {
					if currentDepth > 0 {
						serializedData, ok := <-diskQueueReadChan
						if ok {
							deserialized, err := deserializeDetectionResult(serializedData)
							if err != nil {
								api.Log.Errorf("Failed to deserialize data from diskqueue: %v", err)
								select {
								case errData <- fmt.Errorf("deserialization error from diskqueue: %w", err):
								default:
								}
								return
							}
							outputData <- deserialized
						} else {
							return
						}
					}
				} else {
					time.Sleep(10 * time.Millisecond)
				}
			}

			// 终止条件:输入关闭,磁盘队列读取通道关闭(意味着队列空且dq已关闭或正在关闭),并且没有待发送项
			if inputData == nil && currentDepth == 0 {
				api.Log.Info("All data processed. Exiting relay goroutine normally.")
				return
			}
		}
	}()

	return outputData, errData
}