也谈IO完成端口
众所周知,合理使用线程能显著提高程序的运行效率,如果在多处理器的机器上为每个线程指定一个处理器,则可以同时执行多个操作,吞吐量能够大大提高。看着相当理想,但如果线程中有IO操作,情况就变得不一样了。相对于CPU的运算速度,IO操作(比如从文件或者网络中读取数据或者向文件或者网络中写入数据)的速度是相当慢的,因此如果线程以同步的方式(阻塞)进行IO操作,在操作没有完成之前,它就挂在那儿闲着了,干不了别的,对性能有一定的影响。为了不让线程闲下来,则必须采用异步(非阻塞)的方式,下面看下异步操作的一般流程。
异步操作的流程: 线程向设备发出一个异步IO请求,IO请求被传给设备驱动程序(加入设备驱动程序的队列中),紧接着设备驱动程序会完成实际的IO操作,此时线程并没有挂起继续干它的活儿,等到设备驱动程序完成了IO请求的处理时,它会将相应的状态告知应用程序(数据已发送,数据已收到,或发生了错误)。
可见,异步IO操作实际上包括两大步骤:
第一,把异步IO请求加入设备驱动程序队列。首先,在 CreateFile 的 dwFlagsAndAttributes 参数中指定 FILE_FLAG_OVERLAPPED 标志来打开设备;然后,使用 ReadFile 和 WriteFile 函数将IO请求加入设备驱动程序的队列中。
第二,接收设备驱动程序IO请求完成的通知。Windows提供了4种不同的方法来接收IO请求已经完成的通知,如下表所示。本文将重点分析IO完成端口技术,后面会写篇文章分析其它的三种技术。
技术
|
摘要
|
触发设备内核对象
|
1. 当向一个设备发出多个IO请求的时候,该方法没用;
2. 允许一个线程发出请求,另一个线程对结果进行处理
|
触发事件内核对象
|
1. 允许向一个设备同时发出多个IO请求;
2. 允许一个线程发出请求,另一个线程对结果进行处理
|
使用可提醒IO
|
1. 允许向一个设备同时发出多个IO请求;
2. 发出IO请求的线程必须对结果进行处理
|
使用IO完成端口
|
1. 允许向一个设备同时发出多个IO请求;
2. 允许一个线程发出请求,另一个线程对结果进行处理
|
一. IO端口相关的API
1. CreateIoCompletionPort,该函数主要用于创建一个IO完成端口
WINBASEAPI __out HANDLE WINAPI CreateIoCompletionPort( __in HANDLE FileHandle, // 文件句柄或INVALID_HANDLE_VALUE __in_opt HANDLE ExistingCompletionPort, // 是已经存在的完成端口。如果为NULL,则为新建一个IO完成端口IOCP。 __in ULONG_PTR CompletionKey, // 完成键 __in DWORD NumberOfConcurrentThreads // 表示有多少个线程在访问这个消息队列,如果设置为0,也就是允许并发执行的线程数量等于主机CPU的数量 ); // 关于返回值: // 返回一个IOCP的句柄。若为NULL则创建失败,不为NULL则创建成功。
举例:
// 下面这段代码的功能是:创建一个IO完成端口的同时将一个文件与之关联起来,所有发到该文件的IO请求在完成的时候都有一个值为1的完成键, // 同时表明端口最多允许两个线程并发执行 HANDLE hFile = CreateFile(...); HANDLE hCompletionPort = CreateIoCompletionPort(hFile, NULL, 1, 2);
2. GetQueuedCompletionStatus,将调用线程切换到睡眠状态(进入等待线程队列),直到指定的完成端口的IO完成队列中出现一项,或者等待超时,就会被唤醒
BOOL GetQueuedCompletionStatus( HANDLE CompletionPort, // 表示线程希望对哪个完成端口进行监视 LPDWORD lpNumberOfBytes, // 一次完成后的I/O操作所传送数据的字节数 PULONG_PTR lpCompletionKey, // 当文件I/O操作完成后,用于存放与之关联的CK LPOVERLAPPED *lpOverlapped, // 为调用IOCP机制所引用的OVERLAPPED结构 DWORD dwMilliseconds); // 指定调用者等待的时间 // 关于返回值 // 调用成功,则返回非零数值,且相关数据存于lpNumberOfBytes、lpCompletionKey、 // lpoverlapped变量中。 // 失败则返回零值。
举例:
DWORD dwNumBytes; ULONG_PTR CompletionKey; OVERLAPPED* pOverlapped; BOOL bOK = GetQueuedCompletionStatus(hIOCP, &dwNumBytes, &CompletionKey, &pOverlapped, 1000); DWORD dwError = GetLastError(); if (bOK) { // 处理一个成功的IO完成请求 } else { if (dwError == WAIT_TIMEOUT) { // 等待IO完成请求的时候超时了 } else { // 调用GetQueuedCompletionStatus失败了,dwError包含了失败的原因 } }
3. GetQueuedCompletionStatusEx,与GetQueuedCompletionStatus类似,但是它可以同时取得多个IO请求的结果,不必让许多线程等待完成端口,可以避免由此产生的上下文切换所带来的开销
BOOL GetQueueCompletionStatusEx( HANDLE hCompletionPort, // 表明线程希望对哪个完成端口进行监视,当本函数被调用的时候, // 它会取出指定的完成端口的IO完成队列中存在的各项(IO请求完成的时候,IO完成队列中有内容), // 将它们的信息复制到pCompletionPortEntries数组参数中 LPOVERLAPPED_ENTRY pCompletionPortEntries, // OVERLAPED_ENRY的定义见下文,本数组的内容 ULONG ulCount, // 表明最多可以复制多少项到数组中 PULONG pulNumEnriesRemoved, // 接收IO完成队列中被移除的IO请求的确切数量 DWORD dwMilliseconds, // 超时时间 BOOL bAlertable // FALSE: 该函数一直等待一个已经完成的IO请求被添加到端口,知道超出指定的等待时间为止; // TRUE: 队列中没有已完成的IO请求的时候,线程将进入可提醒状态 );
typedef struct _OVERLAPPED_ENTRY{ ULONG_PTR lpCompletionKey; // 完成键 LPOVERLAPPED lpOverlapped; // OVERLAPPED结构地址 ULONG_PTR Internal; // 没有明确含义 DWORD dwNumberOfBytesTransferd; // 已传输的字节数 }OVERLAPPED_ENTRY, *LPOVERLAPPED_ENTRY;
4. PostQueuedCompletionStatus,用来将一个已经完成的IO通知追加到IO完成队列中
BOOL PostQueuedCompletionStatus( HANDLE hCompletionPort, // 将该项添加到哪个IO完成端口的队列中 DWORD dwNumBytes, // 下面这三个参数的内容都是返回给调用了 // GetQueuedCompletionStatus 的线程 ULONG_PTR CompletionKey, OVERLAPPED* pOverlapped );
二. IO完成端口的内部运作
创建一个IO完成端口的时候,系统内核会创建5个不同的数据结构:第一个是设备列表,调用CreateCompletionPort的时候如果指定了设备,则会将设备和完成键添加到设备列表中;第二个是IO完成队列,当设备的一个异步IO请求完成的时候,系统会检查设备是否与一个IO完成端口相关联,如果设备与一个IO完成端口相关联,那么系统会将该项已完成的IO请求追加到IO完成端口的IO完成队列的末尾;第三个是等待线程队列,线程调用GetQueuedCompletionStatus的时候,调用线程的线程标识符会被添加到这个等待线程队列中。当端口的IO完成队列中出现一项的时候,该完成端口会唤醒等待线程队列中的一个线程;第四个是已释放线程队列,当完成端口唤醒一个线程的时候,会将该线程的标识符保存在已释放线程列表中;第五个是已暂停线程列表,当一个已经释放的线程调用任何函数将该线程切换到等待状态,那么完成端口会检测到这一情况,将线程的标识符移入已暂停线程列表中。
三. 例子
下面给出了一个使用IO完成端口技术实现的文件复制程序(Windows核心编程中例子程序的精简版),已给出详细注释
// 使用IO完成端口对文件进行复制 #define BUFFERSIZE (64 * 1024) #define CK_READ 1 #define CK_WRITE 2 #define MAX_PENDING_IO_REQS 4 // The maximum # of I/Os // Each I/O Request needs an OVERLAPPED structure and a data buffer class CIOReq : public OVERLAPPED { public: CIOReq() { Internal = InternalHigh = 0; Offset = OffsetHigh = 0; hEvent = NULL; m_nBuffSize = 0; m_pvData = NULL; } ~CIOReq() { if (m_pvData != NULL) VirtualFree(m_pvData, 0, MEM_RELEASE); } BOOL AllocBuffer(SIZE_T nBuffSize) { m_nBuffSize = nBuffSize; m_pvData = VirtualAlloc(NULL, m_nBuffSize, MEM_COMMIT, PAGE_READWRITE); return(m_pvData != NULL); } BOOL Read(HANDLE hDevice, PLARGE_INTEGER pliOffset = NULL) { if (pliOffset != NULL) { Offset = pliOffset->LowPart; OffsetHigh = pliOffset->HighPart; } return(::ReadFile(hDevice, m_pvData, m_nBuffSize, NULL, this)); } BOOL Write(HANDLE hDevice, PLARGE_INTEGER pliOffset = NULL) { if (pliOffset != NULL) { Offset = pliOffset->LowPart; OffsetHigh = pliOffset->HighPart; } return(::WriteFile(hDevice, m_pvData, m_nBuffSize, NULL, this)); } private: SIZE_T m_nBuffSize; PVOID m_pvData; }; int main(int argc, char *argv[]) { BOOL bOk = FALSE; // 刚开始假设文件拷贝失败 PCTSTR pszFileSrc = _T("E:\\test.dat"); PCTSTR pszFileDst = _T("E:\\test2.dat"); LARGE_INTEGER liFileSizeSrc = { 0 }, liFileSizeDst; try{ { // 获取源文件的大小 HANDLE hFileSrc = CreateFile(pszFileSrc, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED, NULL); if (hFileSrc == INVALID_HANDLE_VALUE) goto leave; GetFileSizeEx(hFileSrc, &liFileSizeSrc); // 目的文件的大小取整到64KB的整数倍 liFileSizeDst.QuadPart = ( liFileSizeSrc.QuadPart / BUFFERSIZE ) * BUFFERSIZE + (liFileSizeSrc.QuadPart % BUFFERSIZE > 0 ? BUFFERSIZE : 0); // 设置目标文件的大小 HANDLE hFileDst = CreateFile(pszFileDst, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED, hFileSrc); if (hFileDst == INVALID_HANDLE_VALUE) goto leave; SetFilePointerEx(hFileDst, liFileSizeDst, NULL, FILE_BEGIN); SetEndOfFile(hFileDst); // 创建IO完成端口(第一步) HANDLE hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (hComPort == NULL) goto leave; // 将设备与端口关联(第二步) CreateIoCompletionPort(hFileSrc, hComPort, CK_READ, 0); CreateIoCompletionPort(hFileDst, hComPort, CK_WRITE, 0); CIOReq ior[MAX_PENDING_IO_REQS]; LARGE_INTEGER liNextReadOffset = { 0 }; int nReadsInProgress = 0; int nWritesInProgress = 0; // 为了向源文件发出读取请求,这里往IO完成端口里添加了4个CK_WRITE来模拟完成通知 for (int nIOReq = 0; nIOReq < _countof(ior); nIOReq++) { // 每个IO请求都需要一个缓冲区 ior[nIOReq].AllocBuffer(BUFFERSIZE); nWritesInProgress++; // 模拟IO完成通知,但是把已传输字节数都设置为0 PostQueuedCompletionStatus(hComPort, 0, CK_WRITE, &ior[nIOReq]); } BOOL bResult = FALSE; while ((nReadsInProgress > 0) || (nWritesInProgress > 0)) { // Suspend the thread until an I/O completes ULONG_PTR CompletionKey; DWORD dwNumBytes; CIOReq* pior; // 使本线程进入休眠状态,直到有IO请求到来为止 GetQueuedCompletionStatus(hComPort, &dwNumBytes, &CompletionKey, (OVERLAPPED**) &pior, INFINITE); switch (CompletionKey) { case CK_READ: // 完成了读,往目的文件写内容 nReadsInProgress--; bResult = pior->Write(hFileDst); nWritesInProgress++; break; case CK_WRITE: // 完成了写,从源文件读内容 nWritesInProgress--; if (liNextReadOffset.QuadPart < liFileSizeDst.QuadPart) { // 从源文件从读内容 bResult = pior->Read(hFileSrc, &liNextReadOffset); nReadsInProgress++; liNextReadOffset.QuadPart += BUFFERSIZE; } break; } } bOk = TRUE; CloseHandle(hFileDst); CloseHandle(hFileSrc); CloseHandle(hComPort); } leave:; }// try catch(...) { ; } if (bOk) { // 修复目标文件的大小,使之与源文件的大小相同 // 方法:不指定 FILE_FLAG_NO_BUFFERING 标志,使得文件操作不在扇区边界上进行 // 可以将目标文件的大小缩减为源文件的大小 HANDLE hFileDst = CreateFile(pszFileDst, GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, NULL); if (hFileDst != INVALID_HANDLE_VALUE) { SetFilePointerEx(hFileDst, liFileSizeSrc, NULL, FILE_BEGIN); SetEndOfFile(hFileDst); } } return bOk; }
One Comment
Comments are closed.
cool