delphi-缓冲文件(用于更快的磁盘访问)

我正在处理大文件,并且直接写入磁盘速度很慢。 由于文件很大,因此无法将其加载到TMemoryStream中。

TFileStream没有缓冲,所以我想知道是否有一个可以提供缓冲流的自定义库,还是我应该仅依靠OS提供的缓冲。 操作系统缓冲可靠吗? 我的意思是,如果缓存已满,则可能会从缓存中清除旧文件(我的文件),以便为新文件腾出空间。

我的文件在GB范围内。 它包含数百万条记录。 不幸的是,记录不是固定大小的。 因此,我必须进行数百万次读取(4到500字节之间)。 阅读(和写作)是连续的。 我不会上下跳入文件(我认为这是缓冲的理想选择)。

最后,我必须将此类文件写回到磁盘(再次进行数百万次小写操作)。


对大卫·赫弗南(David Heffernan)表示赞赏!
David提供了很棒的一段代码,提供了对磁盘的缓冲访问。
您必须拥有的人BufferedFileStream! 是金。 并且不要忘记投票。
谢谢大卫。

   Speed tests:
     Input file: 317MB.SFF
     Delphi stream: 9.84sec
     David's stream: 2.05sec
     ______________________________________

   More tests:
     Input file: input2_700MB.txt
     Lines: 19 millions
     Compiler optimization: ON
     I/O check: On
     FastMM: release mode
     **HDD**   

     Reading: **linear** (ReadLine) (PS: multiply time with 10)      
      We see clear performance drop at 8KB. Recommended 16 or 32KB
        Time: 618 ms  Cache size: 64KB.
        Time: 622 ms  Cache size: 128KB.
        Time: 622 ms  Cache size: 24KB.
        Time: 622 ms  Cache size: 32KB.
        Time: 622 ms  Cache size: 64KB.
        Time: 624 ms  Cache size: 256KB.
        Time: 625 ms  Cache size: 18KB.
        Time: 626 ms  Cache size: 26KB.
        Time: 626 ms  Cache size: 1024KB.
        Time: 626 ms  Cache size: 16KB.
        Time: 628 ms  Cache size: 42KB.
        Time: 644 ms  Cache size: 8KB.      <--- no difference until 8K
        Time: 664 ms  Cache size: 4KB.
        Time: 705 ms  Cache size: 2KB.
        Time: 791 ms  Cache size: 1KB.
        Time: 795 ms  Cache size: 1KB.

      **SSD**
      We see a small improvement as we go towards higher buffers. Recommended 16 or 32KB
        Time: 610 ms  Cache size: 128KB.
        Time: 611 ms  Cache size: 256KB.
        Time: 614 ms  Cache size: 32KB.
        Time: 623 ms  Cache size: 16KB.
        Time: 625 ms  Cache size: 66KB.
        Time: 639 ms  Cache size: 8KB.       <--- definitively not good with 8K
        Time: 660 ms  Cache size: 4KB.
     ______

     Reading: **Random** (ReadInteger) (100000 reads)
     SSD
       Time: 064 ms. Cache size: 1KB.   Count: 100000.  RAM: 13.27 MB         <-- probably the best buffer size for ReadInteger is 4bytes!
       Time: 067 ms. Cache size: 2KB.   Count: 100000.  RAM: 13.27 MB
       Time: 080 ms. Cache size: 4KB.   Count: 100000.  RAM: 13.27 MB
       Time: 098 ms. Cache size: 8KB.   Count: 100000.  RAM: 13.27 MB
       Time: 140 ms. Cache size: 16KB.  Count: 100000.  RAM: 13.27 MB
       Time: 213 ms. Cache size: 32KB.  Count: 100000.  RAM: 13.27 MB
       Time: 360 ms. Cache size: 64KB.  Count: 100000.  RAM: 13.27 MB
       Conclusion: don't use it for "random" reading   
WeGoToMars asked 2020-07-31T02:09:26Z
5个解决方案
85 votes

Windows文件缓存非常有效,尤其是在使用Vista或更高版本的情况下。 ReadFile()是Windows ReadFile()和2990097026811167167746 API函数的松散包装,在许多情况下,唯一快速的是内存映射文件。

但是,在一种常见的情况下,ReadFile()成为性能瓶颈。 也就是说,在每次调用流读取或写入函数时,读取或写入少量数据。 例如,如果您一次读取一个整数数组,则一次调用ReadFile()会一次读取4个字节,因此会产生大量开销。

同样,内存映射文件是解决此瓶颈的一种极好的方法,但是另一种常用的方法是读取一个更大的缓冲区(据说很多千字节),然后解析将来在内存缓存中从中读取流,而不是进一步调用 ReadFile()。此方法仅对顺序访问有效。


从更新的问题中描述的使用模式来看,我认为您可能会发现以下类将为您提高性能:

unit BufferedFileStream;

interface

uses
  SysUtils, Math, Classes, Windows;

type
  TBaseCachedFileStream = class(TStream)
  private
    function QueryInterface(const IID: TGUID; out Obj): HResult; stdcall;
    function _AddRef: Integer; stdcall;
    function _Release: Integer; stdcall;
  protected
    FHandle: THandle;
    FOwnsHandle: Boolean;
    FCache: PByte;
    FCacheSize: Integer;
    FPosition: Int64;//the current position in the file (relative to the beginning of the file)
    FCacheStart: Int64;//the postion in the file of the start of the cache (relative to the beginning of the file)
    FCacheEnd: Int64;//the postion in the file of the end of the cache (relative to the beginning of the file)
    FFileName: string;
    FLastError: DWORD;
    procedure HandleError(const Msg: string);
    procedure RaiseSystemError(const Msg: string; LastError: DWORD); overload;
    procedure RaiseSystemError(const Msg: string); overload;
    procedure RaiseSystemErrorFmt(const Msg: string; const Args: array of const);
    function CreateHandle(FlagsAndAttributes: DWORD): THandle; virtual; abstract;
    function GetFileSize: Int64; virtual;
    procedure SetSize(NewSize: Longint); override;
    procedure SetSize(const NewSize: Int64); override;
    function FileRead(var Buffer; Count: Longword): Integer;
    function FileWrite(const Buffer; Count: Longword): Integer;
    function FileSeek(const Offset: Int64; Origin: TSeekOrigin): Int64;
  public
    constructor Create(const FileName: string); overload;
    constructor Create(const FileName: string; CacheSize: Integer); overload;
    constructor Create(const FileName: string; CacheSize: Integer; Handle: THandle); overload; virtual;
    destructor Destroy; override;
    property CacheSize: Integer read FCacheSize;
    function Read(var Buffer; Count: Longint): Longint; override;
    function Write(const Buffer; Count: Longint): Longint; override;
    function Seek(const Offset: Int64; Origin: TSeekOrigin): Int64; override;
  end;
  TBaseCachedFileStreamClass = class of TBaseCachedFileStream;

  IDisableStreamReadCache = interface
    ['{0B6D0004-88D1-42D5-BC0F-447911C0FC21}']
    procedure DisableStreamReadCache;
    procedure EnableStreamReadCache;
  end;

  TReadOnlyCachedFileStream = class(TBaseCachedFileStream, IDisableStreamReadCache)
  (* This class works by filling the cache each time a call to Read is made and
     FPosition is outside the existing cache.  By filling the cache we mean
     reading from the file into the temporary cache.  Calls to Read when
     FPosition is in the existing cache are then dealt with by filling the
     buffer with bytes from the cache.
  *)
  private
    FUseAlignedCache: Boolean;
    FViewStart: Int64;
    FViewLength: Int64;
    FDisableStreamReadCacheRefCount: Integer;
    procedure DisableStreamReadCache;
    procedure EnableStreamReadCache;
    procedure FlushCache;
  protected
    function CreateHandle(FlagsAndAttributes: DWORD): THandle; override;
    function GetFileSize: Int64; override;
  public
    constructor Create(const FileName: string; CacheSize: Integer; Handle: THandle); overload; override;
    property UseAlignedCache: Boolean read FUseAlignedCache write FUseAlignedCache;
    function Read(var Buffer; Count: Longint): Longint; override;
    procedure SetViewWindow(const ViewStart, ViewLength: Int64);
  end;

  TWriteCachedFileStream = class(TBaseCachedFileStream, IDisableStreamReadCache)
  (* This class works by caching calls to Write.  By this we mean temporarily
     storing the bytes to be written in the cache.  As each call to Write is
     processed the cache grows.  The cache is written to file when:
       1.  A call to Write is made when the cache is full.
       2.  A call to Write is made and FPosition is outside the cache (this
           must be as a result of a call to Seek).
       3.  The class is destroyed.

     Note that data can be read from these streams but the reading is not
     cached and in fact a read operation will flush the cache before
     attempting to read the data.
  *)
  private
    FFileSize: Int64;
    FReadStream: TReadOnlyCachedFileStream;
    FReadStreamCacheSize: Integer;
    FReadStreamUseAlignedCache: Boolean;
    procedure DisableStreamReadCache;
    procedure EnableStreamReadCache;
    procedure CreateReadStream;
    procedure FlushCache;
  protected
    function CreateHandle(FlagsAndAttributes: DWORD): THandle; override;
    function GetFileSize: Int64; override;
  public
    constructor Create(const FileName: string; CacheSize, ReadStreamCacheSize: Integer; ReadStreamUseAlignedCache: Boolean); overload;
    destructor Destroy; override;
    function Read(var Buffer; Count: Longint): Longint; override;
    function Write(const Buffer; Count: Longint): Longint; override;
  end;

implementation

function GetFileSizeEx(hFile: THandle; var FileSize: Int64): BOOL; stdcall; external kernel32;
function SetFilePointerEx(hFile: THandle; DistanceToMove: Int64; lpNewFilePointer: PInt64; dwMoveMethod: DWORD): BOOL; stdcall; external kernel32;

{ TBaseCachedFileStream }

constructor TBaseCachedFileStream.Create(const FileName: string);
begin
  Create(FileName, 0);
end;

constructor TBaseCachedFileStream.Create(const FileName: string; CacheSize: Integer);
begin
  Create(FileName, CacheSize, 0);
end;

constructor TBaseCachedFileStream.Create(const FileName: string; CacheSize: Integer; Handle: THandle);
const
  DefaultCacheSize = 16*1024;
  //16kb - this was chosen empirically - don't make it too large otherwise the progress report is 'jerky'
begin
  inherited Create;
  FFileName := FileName;
  FOwnsHandle := Handle=0;
  if FOwnsHandle then begin
    FHandle := CreateHandle(FILE_ATTRIBUTE_NORMAL);
  end else begin
    FHandle := Handle;
  end;
  FCacheSize := CacheSize;
  if FCacheSize<=0 then begin
    FCacheSize := DefaultCacheSize;
  end;
  GetMem(FCache, FCacheSize);
end;

destructor TBaseCachedFileStream.Destroy;
begin
  FreeMem(FCache);
  if FOwnsHandle and (FHandle<>0) then begin
    CloseHandle(FHandle);
  end;
  inherited;
end;

function TBaseCachedFileStream.QueryInterface(const IID: TGUID; out Obj): HResult;
begin
  if GetInterface(IID, Obj) then begin
    Result := S_OK;
  end else begin
    Result := E_NOINTERFACE;
  end;
end;

function TBaseCachedFileStream._AddRef: Integer;
begin
  Result := -1;
end;

function TBaseCachedFileStream._Release: Integer;
begin
  Result := -1;
end;

procedure TBaseCachedFileStream.HandleError(const Msg: string);
begin
  if FLastError<>0 then begin
    RaiseSystemError(Msg, FLastError);
  end;
end;

procedure TBaseCachedFileStream.RaiseSystemError(const Msg: string; LastError: DWORD);
begin
  raise EStreamError.Create(Trim(Msg+'  ')+SysErrorMessage(LastError));
end;

procedure TBaseCachedFileStream.RaiseSystemError(const Msg: string);
begin
  RaiseSystemError(Msg, GetLastError);
end;

procedure TBaseCachedFileStream.RaiseSystemErrorFmt(const Msg: string; const Args: array of const);
var
  LastError: DWORD;
begin
  LastError := GetLastError; // must call GetLastError before Format
  RaiseSystemError(Format(Msg, Args), LastError);
end;

function TBaseCachedFileStream.GetFileSize: Int64;
begin
  if not GetFileSizeEx(FHandle, Result) then begin
    RaiseSystemErrorFmt('GetFileSizeEx failed for %s.', [FFileName]);
  end;
end;

procedure TBaseCachedFileStream.SetSize(NewSize: Longint);
begin
  SetSize(Int64(NewSize));
end;

procedure TBaseCachedFileStream.SetSize(const NewSize: Int64);
begin
  Seek(NewSize, soBeginning);
  if not Windows.SetEndOfFile(FHandle) then begin
    RaiseSystemErrorFmt('SetEndOfFile for %s.', [FFileName]);
  end;
end;

function TBaseCachedFileStream.FileRead(var Buffer; Count: Longword): Integer;
begin
  if Windows.ReadFile(FHandle, Buffer, Count, LongWord(Result), nil) then begin
    FLastError := 0;
  end else begin
    FLastError := GetLastError;
    Result := -1;
  end;
end;

function TBaseCachedFileStream.FileWrite(const Buffer; Count: Longword): Integer;
begin
  if Windows.WriteFile(FHandle, Buffer, Count, LongWord(Result), nil) then begin
    FLastError := 0;
  end else begin
    FLastError := GetLastError;
    Result := -1;
  end;
end;

function TBaseCachedFileStream.FileSeek(const Offset: Int64; Origin: TSeekOrigin): Int64;
begin
  if not SetFilePointerEx(FHandle, Offset, @Result, ord(Origin)) then begin
    RaiseSystemErrorFmt('SetFilePointerEx failed for %s.', [FFileName]);
  end;
end;

function TBaseCachedFileStream.Read(var Buffer; Count: Integer): Longint;
begin
  raise EAssertionFailed.Create('Cannot read from this stream');
end;

function TBaseCachedFileStream.Write(const Buffer; Count: Integer): Longint;
begin
  raise EAssertionFailed.Create('Cannot write to this stream');
end;

function TBaseCachedFileStream.Seek(const Offset: Int64; Origin: TSeekOrigin): Int64;
//Set FPosition to the value specified - if this has implications for the
//cache then overriden Write and Read methods must deal with those.
begin
  case Origin of
  soBeginning:
    FPosition := Offset;
  soEnd:
    FPosition := GetFileSize+Offset;
  soCurrent:
    inc(FPosition, Offset);
  end;
  Result := FPosition;
end;

{ TReadOnlyCachedFileStream }

constructor TReadOnlyCachedFileStream.Create(const FileName: string; CacheSize: Integer; Handle: THandle);
begin
  inherited;
  SetViewWindow(0, inherited GetFileSize);
end;

function TReadOnlyCachedFileStream.CreateHandle(FlagsAndAttributes: DWORD): THandle;
begin
  Result := Windows.CreateFile(
    PChar(FFileName),
    GENERIC_READ,
    FILE_SHARE_READ,
    nil,
    OPEN_EXISTING,
    FlagsAndAttributes,
    0
  );
  if Result=INVALID_HANDLE_VALUE then begin
    RaiseSystemErrorFmt('Cannot open %s.', [FFileName]);
  end;
end;

procedure TReadOnlyCachedFileStream.DisableStreamReadCache;
begin
  inc(FDisableStreamReadCacheRefCount);
end;

procedure TReadOnlyCachedFileStream.EnableStreamReadCache;
begin
  dec(FDisableStreamReadCacheRefCount);
end;

procedure TReadOnlyCachedFileStream.FlushCache;
begin
  FCacheStart := 0;
  FCacheEnd := 0;
end;

function TReadOnlyCachedFileStream.GetFileSize: Int64;
begin
  Result := FViewLength;
end;

procedure TReadOnlyCachedFileStream.SetViewWindow(const ViewStart, ViewLength: Int64);
begin
  if ViewStart<0 then begin
    raise EAssertionFailed.Create('Invalid view window');
  end;
  if (ViewStart+ViewLength)>inherited GetFileSize then begin
    raise EAssertionFailed.Create('Invalid view window');
  end;
  FViewStart := ViewStart;
  FViewLength := ViewLength;
  FPosition := 0;
  FCacheStart := 0;
  FCacheEnd := 0;
end;

function TReadOnlyCachedFileStream.Read(var Buffer; Count: Longint): Longint;
var
  NumOfBytesToCopy, NumOfBytesLeft, NumOfBytesRead: Longint;
  CachePtr, BufferPtr: PByte;
begin
  if FDisableStreamReadCacheRefCount>0 then begin
    FileSeek(FPosition+FViewStart, soBeginning);
    Result := FileRead(Buffer, Count);
    if Result=-1 then begin
      Result := 0;//contract is to return number of bytes that were read
    end;
    inc(FPosition, Result);
  end else begin
    Result := 0;
    NumOfBytesLeft := Count;
    BufferPtr := @Buffer;
    while NumOfBytesLeft>0 do begin
      if (FPosition<FCacheStart) or (FPosition>=FCacheEnd) then begin
        //the current position is not available in the cache so we need to re-fill the cache
        FCacheStart := FPosition;
        if UseAlignedCache then begin
          FCacheStart := FCacheStart - (FCacheStart mod CacheSize);
        end;
        FileSeek(FCacheStart+FViewStart, soBeginning);
        NumOfBytesRead := FileRead(FCache^, CacheSize);
        if NumOfBytesRead=-1 then begin
          exit;
        end;
        Assert(NumOfBytesRead>=0);
        FCacheEnd := FCacheStart+NumOfBytesRead;
        if NumOfBytesRead=0 then begin
          FLastError := ERROR_HANDLE_EOF;//must be at the end of the file
          break;
        end;
      end;

      //read from cache to Buffer
      NumOfBytesToCopy := Min(FCacheEnd-FPosition, NumOfBytesLeft);
      CachePtr := FCache;
      inc(CachePtr, FPosition-FCacheStart);
      Move(CachePtr^, BufferPtr^, NumOfBytesToCopy);
      inc(Result, NumOfBytesToCopy);
      inc(FPosition, NumOfBytesToCopy);
      inc(BufferPtr, NumOfBytesToCopy);
      dec(NumOfBytesLeft, NumOfBytesToCopy);
    end;
  end;
end;

{ TWriteCachedFileStream }

constructor TWriteCachedFileStream.Create(const FileName: string; CacheSize, ReadStreamCacheSize: Integer; ReadStreamUseAlignedCache: Boolean);
begin
  inherited Create(FileName, CacheSize);
  FReadStreamCacheSize := ReadStreamCacheSize;
  FReadStreamUseAlignedCache := ReadStreamUseAlignedCache;
end;

destructor TWriteCachedFileStream.Destroy;
begin
  FlushCache;//make sure that the final calls to Write get recorded in the file
  FreeAndNil(FReadStream);
  inherited;
end;

function TWriteCachedFileStream.CreateHandle(FlagsAndAttributes: DWORD): THandle;
begin
  Result := Windows.CreateFile(
    PChar(FFileName),
    GENERIC_READ or GENERIC_WRITE,
    0,
    nil,
    CREATE_ALWAYS,
    FlagsAndAttributes,
    0
  );
  if Result=INVALID_HANDLE_VALUE then begin
    RaiseSystemErrorFmt('Cannot create %s.', [FFileName]);
  end;
end;

procedure TWriteCachedFileStream.DisableStreamReadCache;
begin
  CreateReadStream;
  FReadStream.DisableStreamReadCache;
end;

procedure TWriteCachedFileStream.EnableStreamReadCache;
begin
  Assert(Assigned(FReadStream));
  FReadStream.EnableStreamReadCache;
end;

function TWriteCachedFileStream.GetFileSize: Int64;
begin
  Result := FFileSize;
end;

procedure TWriteCachedFileStream.CreateReadStream;
begin
  if not Assigned(FReadStream) then begin
    FReadStream := TReadOnlyCachedFileStream.Create(FFileName, FReadStreamCacheSize, FHandle);
    FReadStream.UseAlignedCache := FReadStreamUseAlignedCache;
  end;
end;

procedure TWriteCachedFileStream.FlushCache;
var
  NumOfBytesToWrite: Longint;
begin
  if Assigned(FCache) then begin
    NumOfBytesToWrite := FCacheEnd-FCacheStart;
    if NumOfBytesToWrite>0 then begin
      FileSeek(FCacheStart, soBeginning);
      if FileWrite(FCache^, NumOfBytesToWrite)<>NumOfBytesToWrite then begin
        RaiseSystemErrorFmt('FileWrite failed for %s.', [FFileName]);
      end;
      if Assigned(FReadStream) then begin
        FReadStream.FlushCache;
      end;
    end;
    FCacheStart := FPosition;
    FCacheEnd := FPosition;
  end;
end;

function TWriteCachedFileStream.Read(var Buffer; Count: Integer): Longint;
begin
  FlushCache;
  CreateReadStream;
  Assert(FReadStream.FViewStart=0);
  if FReadStream.FViewLength<>FFileSize then begin
    FReadStream.SetViewWindow(0, FFileSize);
  end;
  FReadStream.Position := FPosition;
  Result := FReadStream.Read(Buffer, Count);
  inc(FPosition, Result);
end;

function TWriteCachedFileStream.Write(const Buffer; Count: Longint): Longint;
var
  NumOfBytesToCopy, NumOfBytesLeft: Longint;
  CachePtr, BufferPtr: PByte;
begin
  Result := 0;
  NumOfBytesLeft := Count;
  BufferPtr := @Buffer;
  while NumOfBytesLeft>0 do begin
    if ((FPosition<FCacheStart) or (FPosition>FCacheEnd))//the current position is outside the cache
    or (FPosition-FCacheStart=FCacheSize)//the cache is full
    then begin
      FlushCache;
      Assert(FCacheStart=FPosition);
    end;

    //write from Buffer to the cache
    NumOfBytesToCopy := Min(FCacheSize-(FPosition-FCacheStart), NumOfBytesLeft);
    CachePtr := FCache;
    inc(CachePtr, FPosition-FCacheStart);
    Move(BufferPtr^, CachePtr^, NumOfBytesToCopy);
    inc(Result, NumOfBytesToCopy);
    inc(FPosition, NumOfBytesToCopy);
    FCacheEnd := Max(FCacheEnd, FPosition);
    inc(BufferPtr, NumOfBytesToCopy);
    dec(NumOfBytesLeft, NumOfBytesToCopy);
  end;
  FFileSize := Max(FFileSize, FPosition);
end;

end.
David Heffernan answered 2020-07-31T02:09:50Z
13 votes

GpHugeFile类在内部使用GpHugeFile函数,该函数始终使用缓冲区来管理文件,除非您指定GpHugeFile标志(请注意,您不能使用TFileStream直接指定此标志)。 对于更多信息,您可以检查这些链接

  • CreateFile函数
  • GpHugeFile

您也可以试试Primoz Gabrijelcic的GpHugeFile单元的一部分GpHugeFile

RRUZ answered 2020-07-31T02:10:20Z
7 votes

如果您有这种代码

而Stream.Position <Stream.Size做

您可以通过将FileStream.Size缓存到一个变量来优化它,它将加快速度。 Stream.Size使用三个虚拟函数调用来找出实际大小

干杯

APZ28 answered 2020-07-31T02:10:53Z
7 votes

为了大家的兴趣:Embarcadero在最新的Delphi 10.1 Berlin版本中添加了TBufferedFileStream(请参阅文档)。

不幸的是,由于我还没有购买此更新,所以我不能说它如何与此处提供的解决方案竞争。 我也知道这个问题是在Delphi 7上提出的,但是我确信对Delphi自己的实现的引用将来会很有用。

René Hoffmann answered 2020-07-31T02:11:17Z
0 votes

因此,TFileStream要减慢速度,它会从磁盘读取所有内容。而且TMemoryStream不能足够大。 (如果你这么说)

那么,为什么不使用将最多100MB的一个块加载到TMemoryStream中的TFileStream进行处理呢? 这可以通过一个简单的准备器来完成,该准备器仅查看数据中的大小标头,但这将恢复您的问题。

让您的代码意识到其大文件可能会发生错误并完全避免这种情况,这不是一件坏事:允许它处理TMemoryStream中的(不完整)块,这在必要时也可以查看线程增强功能(hdd访问不是瓶颈)。

Barry Staes answered 2020-07-31T02:11:47Z
translate from https://stackoverflow.com:/questions/5639531/buffered-files-for-faster-disk-access