奇怪的io.copy

介绍 #

遇到一个从挂载目录copy文件慢的问题,从挂载目录直接鼠标拖动文件到其他地方很快,但使用函数进行copy却很慢。

这四种copy方式有什么区别?

io.Copy #

io.Copy 是标准库中用于文件复制的简便方式,它会根据内部默认的缓冲区大小(通常较小,可能是 32KB )进行读取和写入。由于 io.Copy 的缓冲区较小,它在大文件复制时可能需要更多的 I/O 操作次数,导致了较大的性能开销。

func CopyFile1(src string, dst string) (int64, error) {
	srcFile, err := os.Open(src)
	if err != nil {
		return -1, err
	}
	defer srcFile.Close()

	dstFile, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
	if err != nil {
		return -1, err
	}
	defer dstFile.Close()
	
	return io.Copy(dstFile, srcFile)
}

点开io.Copy,发现里面调用copyBuffer,与下面要说的io.CopyBuffer函数入口一致。

func Copy(dst Writer, src Reader) (written int64, err error) {
	return copyBuffer(dst, src, nil)
}

io.CopyBuffer #

io.CopyBuffer使用指定的缓冲区进行复制操作,但它设置的缓冲区不作用于我们设置底层读取缓存区的大小。

func CopyFile2(src string, dst string) (int64, error) {
	srcFile, err := os.Open(src)
	if err != nil {
		return -1, err
	}
	defer srcFile.Close()

	dstFile, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
	if err != nil {
		return -1, err
	}
	defer dstFile.Close()
	buffer := make([]byte, 1*1024*1024)
	return io.CopyBuffer(dstFile, srcFile, buffer)
}

我们进入io.CopyBuffer函数,发现与io.Copy函数入口一致。

func CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
	if buf != nil && len(buf) == 0 {
		panic("empty buffer in CopyBuffer")
	}
	return copyBuffer(dst, src, buf)
}

进入copyBuffer函数,由于os.OpenFile返回的*os.File实现了ReaderFrom方法,因此,//——//下面的代码不会执行。

func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
	// If the reader has a WriteTo method, use it to do the copy.
	// Avoids an allocation and a copy.
	if wt, ok := src.(WriterTo); ok {
		return wt.WriteTo(dst)
	}
	// Similarly, if the writer has a ReadFrom method, use it to do the copy.
	if rt, ok := dst.(ReaderFrom); ok {    
		return rt.ReadFrom(src)   //由于os.OpenFile返回的*os.File实现了该方法,因此,下面的代码不会执行
	}
    //-----------------------------------------------------------//
	if buf == nil {
		size := 32 * 1024
		if l, ok := src.(*LimitedReader); ok && int64(size) > l.N {
			if l.N < 1 {
				size = 1
			} else {
				size = int(l.N)
			}
		}
		buf = make([]byte, size)
	}
	for {
		nr, er := src.Read(buf)
		if nr > 0 {
			nw, ew := dst.Write(buf[0:nr])
			if nw < 0 || nr < nw {
				nw = 0
				if ew == nil {
					ew = errInvalidWrite
				}
			}
			written += int64(nw)
			if ew != nil {
				err = ew
				break
			}
			if nr != nw {
				err = ErrShortWrite
				break
			}
		}
		if er != nil {
			if er != EOF {
				err = er
			}
			break
		}
	}
	return written, err
}

我们进入ReadFrom方法,发现又递归调用到io.Copy函数,因此,copyBuffer函数设置的缓冲区在此情况下无效。

func (f *File) ReadFrom(r io.Reader) (n int64, err error) {
	if err := f.checkValid("write"); err != nil {
		return 0, err
	}
	n, handled, e := f.readFrom(r)
	if !handled {
		return genericReadFrom(f, r) // without wrapping
	}
	return n, f.wrapErr("write", e)
}
func genericReadFrom(f *File, r io.Reader) (int64, error) {
	return io.Copy(onlyWriter{f}, r)
}

如果目标的 io.Writer 实现了 ReadFrom 方法,那么 io.CopyBuffer 会优先调用该方法,而不是执行普通的缓冲区复制逻辑。因此,缓冲区的作用可能会被忽略,取决于 ReadFrom 方法的实现。

番外:如何不让它实现ReadFrom 方法

type MyWriter struct {
	dst *os.File
}

func (w *MyWriter) Write(p []byte) (n int, err error) {
	return w.dst.Write(p)
}
func Test_copy(t *testing.T) {
	srcFile, _ := os.Open("E:\\视频取证测试\\20240910192520\\文件导出列表.csv")
	dstFile, _ := os.Create("E:\\视频取证测试\\文件导出列表.csv")
	defer srcFile.Close()
	defer dstFile.Close()

	myWriter := &MyWriter{dst: dstFile}
	buffer := make([]byte, 1*1024*1024)
	io.CopyBuffer(myWriter, srcFile, buffer)
}

NewReaderSize #

func CopyFile3(src string, dst string) (int64, error) {

	srcFile, err := os.Open(src)
	if err != nil {
		return -1, err
	}
	defer srcFile.Close()

	dstFile, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
	if err != nil {
		return -1, err
	}
	defer dstFile.Close()
	
	buffer := make([]byte, 1*1024*1024)
	var totalBytes int64
	reader := bufio.NewReaderSize(srcFile, 1*1024*1024)
	for {
		n, err := reader.Read(buffer)
		if err != nil && err != io.EOF {
			return -1, err
		}
		if n == 0 {
			break
		}
		written, err := dstFile.Write(buffer[:n])
		if err != nil {
			return -1, err
		}
		totalBytes += int64(written)
	}

	return totalBytes, nil
}

bufio.NewReaderSize(file, 1*1024*1024) 设置了 1MB 的缓冲区用于文件读取,

func NewReaderSize(rd io.Reader, size int) *Reader {
	// Is it already a Reader?
	b, ok := rd.(*Reader)
	if ok && len(b.buf) >= size {
		return b
	}
	if size < minReadBufferSize {
		size = minReadBufferSize
	}
	r := new(Reader)
	r.reset(make([]byte, size), rd)
	return r
}
func (b *Reader) reset(buf []byte, r io.Reader) {
	*b = Reader{
		buf:          buf,
		rd:           r,
		lastByte:     -1,
		lastRuneSize: -1,
	}
}

在Read函数中,当开始读的时候使用b.buf,就是刚才设置的buf

func (b *Reader) Read(p []byte) (n int, err error) {
	n = len(p)
	if n == 0 {
		if b.Buffered() > 0 {
			return 0, nil
		}
		return 0, b.readErr()
	}
	if b.r == b.w {
		if b.err != nil {
			return 0, b.readErr()
		}
		if len(p) >= len(b.buf) {
			// Large read, empty buffer.
			// Read directly into p to avoid copy.
			n, b.err = b.rd.Read(p)
			if n < 0 {
				panic(errNegativeRead)
			}
			if n > 0 {
				b.lastByte = int(p[n-1])
				b.lastRuneSize = -1
			}
			return n, b.readErr()
		}
		// One read.
		// Do not use b.fill, which will loop.
		b.r = 0
		b.w = 0
		n, b.err = b.rd.Read(b.buf)//读的时候使用设置的buf
		if n < 0 {
			panic(errNegativeRead)
		}
		if n == 0 {
			return 0, b.readErr()
		}
		b.w += n
	}

	// copy as much as we can
	// Note: if the slice panics here, it is probably because
	// the underlying reader returned a bad count. See issue 49795.
	n = copy(p, b.buf[b.r:b.w])
	b.r += n
	b.lastByte = int(b.buf[b.r-1])
	b.lastRuneSize = -1
	return n, nil
}

最终在底层实现syscall.Read(fd.Sysfd, buf),读取1MB的数据。

func (f *File) Read(b []byte) (n int, err error) {
	if err := f.checkValid("read"); err != nil {
		return 0, err
	}
	n, e := f.read(b)
	return n, f.wrapErr("read", e)
}

func (f *File) read(b []byte) (n int, err error) {
	n, err = f.pfd.Read(b)
	runtime.KeepAlive(f)
	return n, err
}

func (fd *FD) Read(buf []byte) (int, error) {
	if err := fd.readLock(); err != nil {
		return 0, err
	}
	defer fd.readUnlock()

	if len(buf) > maxRW {
		buf = buf[:maxRW]
	}

	var n int
	var err error
	if fd.isFile {
		fd.l.Lock()
		defer fd.l.Unlock()
		switch fd.kind {
		case kindConsole:
			n, err = fd.readConsole(buf)
		default:
			n, err = syscall.Read(fd.Sysfd, buf)
			if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
				// Close uses CancelIoEx to interrupt concurrent I/O for pipes.
				// If the fd is a pipe and the Read was interrupted by CancelIoEx,
				// we assume it is interrupted by Close.
				err = ErrFileClosing
			}
		}
		if err != nil {
			n = 0
		}
	} else {
		o := &fd.rop
		o.InitBuf(buf)
		n, err = execIO(o, func(o *operation) error {
			return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
		})
		if race.Enabled {
			race.Acquire(unsafe.Pointer(&ioSync))
		}
	}
	if len(buf) != 0 {
		err = fd.eofError(n, err)
	}
	return n, err
}

io.file.Read #

func CopyFile4(src string, dst string) (int64, error) {

	srcFile, err := os.Open(src)
	if err != nil {
		return -1, err
	}
	defer srcFile.Close()

	dstFile, err := os.OpenFile(dst, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.ModePerm)
	if err != nil {
		return -1, err
	}
	defer dstFile.Close()

	buffer := make([]byte, 1*1024*1024)
	var totalBytes int64

	for {
		n, err := srcFile.Read(buffer)
		if err != nil && err != io.EOF {
			return -1, err
		}
		if n == 0 {
			break
		}

		written, err := dstFile.Write(buffer[:n])
		if err != nil {
			return -1, err
		}
		totalBytes += int64(written)
	}

	return totalBytes, nil
}
func (f *File) Read(b []byte) (n int, err error) {
	if err := f.checkValid("read"); err != nil {
		return 0, err
	}
	n, e := f.read(b)
	return n, f.wrapErr("read", e)
}
func (f *File) read(b []byte) (n int, err error) {
	n, err = f.pfd.Read(b)
	runtime.KeepAlive(f)
	return n, err
}
func (fd *FD) Read(buf []byte) (int, error) {
    if err := fd.readLock(); err != nil {
       return 0, err
    }
    defer fd.readUnlock()

    if len(buf) > maxRW {
       buf = buf[:maxRW]
    }

    var n int
    var err error
    if fd.isFile {
       fd.l.Lock()
       defer fd.l.Unlock()
       switch fd.kind {
       case kindConsole:
          n, err = fd.readConsole(buf)
       default:
          n, err = syscall.Read(fd.Sysfd, buf)
          if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
             // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
             // If the fd is a pipe and the Read was interrupted by CancelIoEx,
             // we assume it is interrupted by Close.
             err = ErrFileClosing
          }
       }
       if err != nil {
          n = 0
       }
    } else {
       o := &fd.rop
       o.InitBuf(buf)
       n, err = execIO(o, func(o *operation) error {
          return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
       })
       if race.Enabled {
          race.Acquire(unsafe.Pointer(&ioSync))
       }
    }
    if len(buf) != 0 {
       err = fd.eofError(n, err)
    }
    return n, err
}