也谈IO完成端口

众所周知,合理使用线程能显著提高程序的运行效率,如果在多处理器的机器上为每个线程指定一个处理器,则可以同时执行多个操作,吞吐量能够大大提高。看着相当理想,但如果线程中有IO操作,情况就变得不一样了。相对于CPU的运算速度,IO操作(比如从文件或者网络中读取数据或者向文件或者网络中写入数据)的速度是相当慢的,因此如果线程以同步的方式(阻塞)进行IO操作,在操作没有完成之前,它就挂在那儿闲着了,干不了别的,对性能有一定的影响。为了不让线程闲下来,则必须采用异步(非阻塞)的方式,下面看下异步操作的一般流程。

异步操作的流程:  线程向设备发出一个异步IO请求,IO请求被传给设备驱动程序(加入设备驱动程序的队列中),紧接着设备驱动程序会完成实际的IO操作,此时线程并没有挂起继续干它的活儿,等到设备驱动程序完成了IO请求的处理时,它会将相应的状态告知应用程序(数据已发送,数据已收到,或发生了错误)。

 

可见,异步IO操作实际上包括两大步骤:

第一,把异步IO请求加入设备驱动程序队列。首先,在 CreateFile 的 dwFlagsAndAttributes 参数中指定 FILE_FLAG_OVERLAPPED 标志来打开设备;然后,使用 ReadFileWriteFile 函数将IO请求加入设备驱动程序的队列中。

第二,接收设备驱动程序IO请求完成的通知。Windows提供了4种不同的方法来接收IO请求已经完成的通知,如下表所示。本文将重点分析IO完成端口技术,后面会写篇文章分析其它的三种技术。

技术
摘要
触发设备内核对象
1. 当向一个设备发出多个IO请求的时候,该方法没用;
2. 允许一个线程发出请求,另一个线程对结果进行处理
触发事件内核对象
1. 允许向一个设备同时发出多个IO请求;
2. 允许一个线程发出请求,另一个线程对结果进行处理
使用可提醒IO
1. 允许向一个设备同时发出多个IO请求;
2. 发出IO请求的线程必须对结果进行处理
使用IO完成端口
1. 允许向一个设备同时发出多个IO请求;
2. 允许一个线程发出请求,另一个线程对结果进行处理

一. IO端口相关的API

1. CreateIoCompletionPort,该函数主要用于创建一个IO完成端口

WINBASEAPI __out HANDLE WINAPI CreateIoCompletionPort(
    __in HANDLE FileHandle,     // 文件句柄或INVALID_HANDLE_VALUE
    __in_opt HANDLE ExistingCompletionPort, // 是已经存在的完成端口。如果为NULL,则为新建一个IO完成端口IOCP。
    __in ULONG_PTR CompletionKey, // 完成键
    __in DWORD NumberOfConcurrentThreads // 表示有多少个线程在访问这个消息队列,如果设置为0,也就是允许并发执行的线程数量等于主机CPU的数量
);       

// 关于返回值:
// 返回一个IOCP的句柄。若为NULL则创建失败,不为NULL则创建成功。

举例:

// 下面这段代码的功能是:创建一个IO完成端口的同时将一个文件与之关联起来,所有发到该文件的IO请求在完成的时候都有一个值为1的完成键,
// 同时表明端口最多允许两个线程并发执行
HANDLE  hFile = CreateFile(...);
HANDLE  hCompletionPort = CreateIoCompletionPort(hFile, NULL, 1, 2);

2. GetQueuedCompletionStatus,将调用线程切换到睡眠状态(进入等待线程队列),直到指定的完成端口的IO完成队列中出现一项,或者等待超时,就会被唤醒

BOOL GetQueuedCompletionStatus(
    HANDLE CompletionPort,         // 表示线程希望对哪个完成端口进行监视
    LPDWORD lpNumberOfBytes,  // 一次完成后的I/O操作所传送数据的字节数
    PULONG_PTR lpCompletionKey, // 当文件I/O操作完成后,用于存放与之关联的CK
    LPOVERLAPPED *lpOverlapped, // 为调用IOCP机制所引用的OVERLAPPED结构
    DWORD dwMilliseconds);  // 指定调用者等待的时间

// 关于返回值
// 调用成功,则返回非零数值,且相关数据存于lpNumberOfBytes、lpCompletionKey、
// lpoverlapped变量中。
// 失败则返回零值。

举例:

DWORD dwNumBytes;
ULONG_PTR CompletionKey;
OVERLAPPED* pOverlapped;

BOOL bOK = GetQueuedCompletionStatus(hIOCP, &dwNumBytes, &CompletionKey, 
    &pOverlapped, 1000);

DWORD dwError = GetLastError();

if (bOK)
{
    // 处理一个成功的IO完成请求
}
else
{
    if (dwError == WAIT_TIMEOUT)
    {
        // 等待IO完成请求的时候超时了
    }
    else
    {
        // 调用GetQueuedCompletionStatus失败了,dwError包含了失败的原因
    }
}

3. GetQueuedCompletionStatusEx,与GetQueuedCompletionStatus类似,但是它可以同时取得多个IO请求的结果,不必让许多线程等待完成端口,可以避免由此产生的上下文切换所带来的开销

BOOL GetQueueCompletionStatusEx(
    HANDLE hCompletionPort,  // 表明线程希望对哪个完成端口进行监视,当本函数被调用的时候,
                             // 它会取出指定的完成端口的IO完成队列中存在的各项(IO请求完成的时候,IO完成队列中有内容),
                             // 将它们的信息复制到pCompletionPortEntries数组参数中
    LPOVERLAPPED_ENTRY pCompletionPortEntries,  // OVERLAPED_ENRY的定义见下文,本数组的内容
    ULONG ulCount,  // 表明最多可以复制多少项到数组中
    PULONG pulNumEnriesRemoved,  // 接收IO完成队列中被移除的IO请求的确切数量
    DWORD dwMilliseconds,  // 超时时间
    BOOL bAlertable  // FALSE: 该函数一直等待一个已经完成的IO请求被添加到端口,知道超出指定的等待时间为止;
                     // TRUE: 队列中没有已完成的IO请求的时候,线程将进入可提醒状态
);
typedef struct _OVERLAPPED_ENTRY{
    ULONG_PTR lpCompletionKey; // 完成键
    LPOVERLAPPED lpOverlapped; // OVERLAPPED结构地址
    ULONG_PTR Internal;   // 没有明确含义
    DWORD dwNumberOfBytesTransferd; // 已传输的字节数
}OVERLAPPED_ENTRY, *LPOVERLAPPED_ENTRY;

4. PostQueuedCompletionStatus,用来将一个已经完成的IO通知追加到IO完成队列中

BOOL PostQueuedCompletionStatus(
    HANDLE hCompletionPort,    // 将该项添加到哪个IO完成端口的队列中
    DWORD dwNumBytes,    // 下面这三个参数的内容都是返回给调用了
                         // GetQueuedCompletionStatus 的线程
    ULONG_PTR CompletionKey,
    OVERLAPPED* pOverlapped
);

二. IO完成端口的内部运作

创建一个IO完成端口的时候,系统内核会创建5个不同的数据结构:第一个是设备列表,调用CreateCompletionPort的时候如果指定了设备,则会将设备和完成键添加到设备列表中;第二个是IO完成队列,当设备的一个异步IO请求完成的时候,系统会检查设备是否与一个IO完成端口相关联,如果设备与一个IO完成端口相关联,那么系统会将该项已完成的IO请求追加到IO完成端口的IO完成队列的末尾;第三个是等待线程队列,线程调用GetQueuedCompletionStatus的时候,调用线程的线程标识符会被添加到这个等待线程队列中。当端口的IO完成队列中出现一项的时候,该完成端口会唤醒等待线程队列中的一个线程;第四个是已释放线程队列,当完成端口唤醒一个线程的时候,会将该线程的标识符保存在已释放线程列表中;第五个是已暂停线程列表,当一个已经释放的线程调用任何函数将该线程切换到等待状态,那么完成端口会检测到这一情况,将线程的标识符移入已暂停线程列表中。

 

三. 例子

下面给出了一个使用IO完成端口技术实现的文件复制程序(Windows核心编程中例子程序的精简版),已给出详细注释

// 使用IO完成端口对文件进行复制
#define BUFFERSIZE (64 * 1024)
#define CK_READ  1
#define CK_WRITE 2
#define MAX_PENDING_IO_REQS   4           // The maximum # of I/Os
// Each I/O Request needs an OVERLAPPED structure and a data buffer
class CIOReq : public OVERLAPPED {
public:
    CIOReq() {
        Internal = InternalHigh = 0;   
        Offset = OffsetHigh = 0;   
        hEvent = NULL;
        m_nBuffSize = 0;
        m_pvData = NULL;
    }

    ~CIOReq() {
        if (m_pvData != NULL)
            VirtualFree(m_pvData, 0, MEM_RELEASE);
    }

    BOOL AllocBuffer(SIZE_T nBuffSize) {
        m_nBuffSize = nBuffSize;
        m_pvData = VirtualAlloc(NULL, m_nBuffSize, MEM_COMMIT, PAGE_READWRITE);
        return(m_pvData != NULL);
    }

    BOOL Read(HANDLE hDevice, PLARGE_INTEGER pliOffset = NULL) {
        if (pliOffset != NULL) {
            Offset     = pliOffset->LowPart;
            OffsetHigh = pliOffset->HighPart;
        }
        return(::ReadFile(hDevice, m_pvData, m_nBuffSize, NULL, this));
    }

    BOOL Write(HANDLE hDevice, PLARGE_INTEGER pliOffset = NULL) {
        if (pliOffset != NULL) {
            Offset     = pliOffset->LowPart;
            OffsetHigh = pliOffset->HighPart;
        }
        return(::WriteFile(hDevice, m_pvData, m_nBuffSize, NULL, this));
    }

private:
    SIZE_T m_nBuffSize;
    PVOID  m_pvData;
};

int main(int argc, char *argv[])
{
    BOOL bOk = FALSE;  // 刚开始假设文件拷贝失败

    PCTSTR pszFileSrc = _T("E:\\test.dat");
    PCTSTR pszFileDst = _T("E:\\test2.dat");
    
    LARGE_INTEGER liFileSizeSrc = { 0 }, liFileSizeDst;

    try{
        {
            // 获取源文件的大小
            HANDLE hFileSrc = CreateFile(pszFileSrc, GENERIC_READ, 
                FILE_SHARE_READ, NULL, OPEN_EXISTING, 
                FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED, NULL);
            if (hFileSrc == INVALID_HANDLE_VALUE)    goto leave;
            
            GetFileSizeEx(hFileSrc, &liFileSizeSrc);
            
            // 目的文件的大小取整到64KB的整数倍
            liFileSizeDst.QuadPart = ( liFileSizeSrc.QuadPart / BUFFERSIZE ) 
               * BUFFERSIZE + (liFileSizeSrc.QuadPart % BUFFERSIZE > 0 ? 
                      BUFFERSIZE : 0);

            
            // 设置目标文件的大小
            HANDLE hFileDst = CreateFile(pszFileDst, GENERIC_WRITE, 
                0, NULL, CREATE_ALWAYS,
                FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED, hFileSrc);
            if (hFileDst == INVALID_HANDLE_VALUE)    goto leave;

            SetFilePointerEx(hFileDst, liFileSizeDst, NULL, FILE_BEGIN);
            SetEndOfFile(hFileDst);

            // 创建IO完成端口(第一步)
            HANDLE hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
                         0, 0);
            if (hComPort == NULL)        goto leave;
            
            // 将设备与端口关联(第二步)
            CreateIoCompletionPort(hFileSrc, hComPort, CK_READ, 0);
            CreateIoCompletionPort(hFileDst, hComPort, CK_WRITE, 0);
            
            CIOReq ior[MAX_PENDING_IO_REQS];
            LARGE_INTEGER liNextReadOffset = { 0 };
            int nReadsInProgress  = 0;
            int nWritesInProgress = 0;
            
            // 为了向源文件发出读取请求,这里往IO完成端口里添加了4个CK_WRITE来模拟完成通知
            for (int nIOReq = 0; nIOReq < _countof(ior); nIOReq++) 
            {
                // 每个IO请求都需要一个缓冲区
                ior[nIOReq].AllocBuffer(BUFFERSIZE);
                nWritesInProgress++;   

                // 模拟IO完成通知,但是把已传输字节数都设置为0
                PostQueuedCompletionStatus(hComPort, 0, CK_WRITE, &ior[nIOReq]);   
            }
            
             BOOL bResult = FALSE;
             
            while ((nReadsInProgress > 0) || (nWritesInProgress > 0)) 
            {
                // Suspend the thread until an I/O completes
                ULONG_PTR CompletionKey;
                DWORD dwNumBytes;
                CIOReq* pior;
                
                // 使本线程进入休眠状态,直到有IO请求到来为止
                GetQueuedCompletionStatus(hComPort, &dwNumBytes, &CompletionKey, 
                        (OVERLAPPED**) &pior, INFINITE);


                switch (CompletionKey) 
                {
                case CK_READ:  // 完成了读,往目的文件写内容
                    nReadsInProgress--;
                    bResult = pior->Write(hFileDst); 
                    nWritesInProgress++;
                    break;

                case CK_WRITE: // 完成了写,从源文件读内容
                    nWritesInProgress--;
                    if (liNextReadOffset.QuadPart < liFileSizeDst.QuadPart) 
                    {
                        // 从源文件从读内容
                        bResult = pior->Read(hFileSrc, &liNextReadOffset);
                        nReadsInProgress++;
                        liNextReadOffset.QuadPart += BUFFERSIZE; 
                    }
                    break;
                }
            }
            bOk = TRUE;
            CloseHandle(hFileDst);
            CloseHandle(hFileSrc);
            CloseHandle(hComPort);
        }
leave:;
    }// try
    catch(...)
    {
        ;
    }
    
    if (bOk) 
    {
        // 修复目标文件的大小,使之与源文件的大小相同
        // 方法:不指定 FILE_FLAG_NO_BUFFERING 标志,使得文件操作不在扇区边界上进行
        //        可以将目标文件的大小缩减为源文件的大小
        HANDLE hFileDst = CreateFile(pszFileDst, GENERIC_WRITE, 
            0, NULL, OPEN_EXISTING, 0, NULL);
        if (hFileDst != INVALID_HANDLE_VALUE) 
        {

            SetFilePointerEx(hFileDst, liFileSizeSrc, NULL, FILE_BEGIN);
            SetEndOfFile(hFileDst);
        }
    }

    return bOk;
}

 




One Comment

  • Comments are closed.