Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windows下基于iocp的的异步文件IO #637

Open
holmes1412 opened this issue Nov 10, 2021 · 12 comments
Open

Windows下基于iocp的的异步文件IO #637

holmes1412 opened this issue Nov 10, 2021 · 12 comments
Labels
help wanted Extra attention is needed

Comments

@holmes1412
Copy link
Contributor

holmes1412 commented Nov 10, 2021

Workflow支持异步文件IO任务,具体实现目前在Linux下是由操作系统支持的异步IO系统,在非Linux的系统下是用多线程实现的。而Windows下目前也这个需求,所以欢迎熟悉iocp开发的小伙伴可以积极参与共建~

在此把原异步文件IO的流程大概梳理如下,以供参考:

  1. 用户层接口,我们以create_pread_task()为例子:
class WFTaskFactory                                                             
{
    static WFFileIOTask *create_pread_task(const std::string& pathname,                               
                                           void *buf,                              
                                           size_t count,                           
                                           off_t offset,                           
                                           fio_callback_t callback);
    ...
  1. Workflow内部都是行为派生,所以用户拿到的都是WFFileIOTask *类型的task,而内部会根据pread行为创建一个__WFFilepreadTask
WFFileIOTask *WFTaskFactory::create_pread_task(const std::string& pathname,                            
                                               void *buf,                          
                                               size_t count,                       
                                               off_t offset,                       
                                               fio_callback_t callback)            
{                                                                                  
    return new __WFFilepreadTask(pathname, buf, count, offset,                             
                                 WFGlobal::get_io_service(),                         
                                 std::move(callback));                          
}
  1. __WFFilepreadTask需要实现prepare(),供内部IOService调用,具体是做与异步文件相关的起始操作:
class __WFFilepreadTask : public WFFilepreadTask                          
{ 
protected:                                                                         
    virtual int prepare()
    {
        // 这里调用了IOSession层的prep_preadv(),不同系统实现不一样;
    }
  1. 在Linux和Windows中,WFFileTask和IORequest的定义都是一样的。不同点在于上面提到的IOService和IOSession。
    IOService是接管所有文件异步IO的服务,IOSession是一次IO请求的上下文,需要根据Windows下iocp的机制来具体实现。
    虽然Linux使用了系统的libaio,但大家要做的事情是类似:
class IOService                                                                    
{                                                                                  
public:   
    int request(IOSession *session); // 用于用户提交一个文件io任务

private:                                                                           
    int event_fd;  // 用于结合libaio机制的eventfd,多个IO事件也只用一个,希望在windows下也尽量少占用系统资源
                                                                
private:                                                                        
    struct list_head session_list; // 用链表管理了此时发出的多个任务                                    
                                                               
private:                                                                        
    static void *aio_finish(void *context); // 用于结合libaio机制的回调函数,有事件通知会回到这里

    ...
};
  1. 此IOService需要通过CommScheduler::io_bind()把自己的eventfd和回调绑定到通信器中,同理也需要io_unbind()。其他内部接口需要根据iocp的机制按需添加。目的是做到当系统有异步事件的时候,会通过注册到通信器的机制来告诉框架,框架调起当时的那片上下文的handle(),即可回到task的逻辑中:
void Communicator::handle_aio_result(struct poller_result *res) 
{
    ...
    session->handle(state, error);
  1. 如果希望默认使用此异步文件服务,可以参考现在的__FileIOService, 从IOService派生,并且在全局单例中提供接口供调用,这样也可以保证不用异步文件IO的用户不会创建相应资源:
class __CommManager
{
    IOService *get_io_service()
    {
        if (!fio_flag_)
            fio_service_ = new __FileIOService(&scheduler_);
        ...
    }
}

以上是整个异步文件IO的基本流程,希望在windows下的实现同时遵循Workflow一如既往的对资源的极度节制以及对高并发的严谨。如有了解iocp的小伙伴愿意尝试欢迎随时交流。

@holmes1412 holmes1412 added the help wanted Extra attention is needed label Nov 10, 2021
@Barenboim
Copy link
Contributor

这个help也太难了吧😓
完全实现kernel目录里IOService和IOSession两个类的接口语意就可以了,其它代码几乎不用改,除了把fd改成HANDLE。
现在有两个IOService的实现可以参考,分别是kernel目录下的IOService_linux.h和IOService_thread.h。其中IOService_thread.h用多线程模拟aio,主要用于macOS,Linux也可以用,但windows依然用不了。Windows下基于iocp可以实现。

@lainswork
Copy link

https://github.com/sogou/workflow/blob/master/docs/tutorial-09-http_file_server.md
文档中提到"我们正在研发一套文件管理,将来用户只需要传入文件名,对跨平台更友好"
为什么不将WFTaskFactory::create_pread_task中的第一个参数fd改为std::function来作为一个回调函数
这样create_pread_task中new WFFilepreadTask后调用 平台相关的文件接口打开文件,并将文件相关信息作为参数传给回调函数
用户可以在回调函数中判断如何修改自己的读取参数,还可以判断是否继续
根据返回值我们可以让 Task成为一个bad Task直接去结束自己
好处是用户不需要再自己去调用open CreateFile这类平台相关的函数来打开文件

@Barenboim
Copy link
Contributor

https://github.com/sogou/workflow/blob/master/docs/tutorial-09-http_file_server.md
文档中提到"我们正在研发一套文件管理,将来用户只需要传入文件名,对跨平台更友好"
为什么不将WFTaskFactory::create_pread_task中的第一个参数fd改为std::function来作为一个回调函数
这样create_pread_task中new WFFilepreadTask后调用 平台相关的文件接口打开文件,并将文件相关信息作为参数传给回调函数
用户可以在回调函数中判断如何修改自己的读取参数,还可以判断是否继续
根据返回值我们可以让 Task成为一个bad Task直接去结束自己
好处是用户不需要再自己去调用open CreateFile这类平台相关的函数来打开文件

文件名的接口已经写好了,忘了更新文档。


改std::function不现实。接口太难看了。文件名的话就可以跨平台了。

@lainswork
Copy link

嗯....传文件名字作为参数,那现在在读取前,仍然不知道文件的大小和相关信息,这代表create_pread_task前还是要先打开文件获取到文件大小再调用create_pread_task来读取。这难道不会导致create_pread_task使用文件名字作为参数没有意义嘛

@Barenboim
Copy link
Contributor

如果要读取整个文件,Linux下的做法是先用state函数,得到文件大小。
有用户问过这个问题:#632

@lainswork
Copy link

lainswork commented Nov 11, 2021

所以说我提出将一个回调作为参数,然后文件名也作为参数
这样我们在实现文件中通过平台相关api打开文件后,可以让用户处理接下来的操作,比如具体申请多大的内存
好处是我们将所有的与平台有关的内容都写到了cpp中,隐藏了真实的文件打开流程,方便用户编写跨平台代码。
这个需求其实是很现实的,需要考虑的是怎么优雅的实现,你可能更介意std::fun的不太好看,hahahah

@Barenboim
Copy link
Contributor

我们没有这种形式的接口。我们之前另一个想法是做一个WFFileSystem,用文件名创建任务的时候,由这个类来管理fd或HANDLE。你说的这些功能我更愿意用这个方式来解决。
不过现在考虑这些都太早了,毕竟还没有实现windows下的文件异步IO呢。IOSerivce的接口不是那么好实现,很多东西必须语义非常精确。

@lainswork
Copy link

能不能麻烦解释下IOService中的各个成员的作用
包括:
int init(unsigned int maxevents);中的maxevents参数
virtual void handle_stop(int error) { }
virtual void handle_unbound() = 0;
private:
void incref();
void decref();
private:
int event_fd;
int ref;

@lainswork
Copy link

能不能麻烦解释下IOService中的各个成员的作用 包括: int init(unsigned int maxevents);中的maxevents参数 virtual void handle_stop(int error) { } virtual void handle_unbound() = 0; private: void incref(); void decref(); private: int event_fd; int ref;

目前我已经理解了框架中的task如何提交到Service这种运作方式。
但是还不清楚框架源码里面的其他机制,比如单个IOSession被Service处理完成后task所在的series是如何确定它被执行完毕的
还有就是上面提到的IOService各个成员,目前对wf框架的理解还停留在“使用”阶段,我想理解其中的细节,以便可以实现wf的win平台异步文件部分。

@Barenboim
Copy link
Contributor

Barenboim commented Nov 14, 2021

IOService和series是两个层次的东西。最好不用混着一起看。如果你想实现iocp下的文件aio,只需要实现iocp版IOService和IOSession的接口就可以了。但这个真的挺难的。
使用上IOSerive被io_bind到Communicator,就可以通过IOService::request接口提交IOSession。而IOService使用结束后,需要调用Communicator::io_unbind。unbind过程又是异步的,unbind完成之后,IOService::handle_unbound会被调用,对象归还给外部。
已经绑定在Communicator上的IOService如果停止工作,IOService::handle_stop会被调用。停止工作有两种原因,一种是发生错误(stop的错误码为errno),另一种是Communicator::deinit()被调用(stop错误码为0)。handle_stop被调用时,IOService并未解绑,依然需要调用Communicator::io_unbind来解绑IOSerivce。
这部分代码逻辑太复杂了,几乎没有办法文字描述清楚。你有兴趣可以看看代码。IOService_thread的实现简单一些,当IOSession被请求是,实时创建一个线程进行IO。
另外,IOService和用于网络服务的CommService接口上是几乎一致的。IOService的请求来自本地调用,CommService的请求来自网络连接。

@lainswork
Copy link

IOService和series是两个层次的东西。最好不用混着一起看。如果你想实现iocp下的文件aio,只需要实现iocp版IOService和IOSession的接口就可以了。但这个真的挺难的。 使用上IOSerive被io_bind到Communitor,就可以通过IOService::request接口提及IOSession。而IOService使用结束后,需要调用Communicator::io_unbind。unbind过程又是异步的,unbind完成之后,IOService::handle_unbound会被调用,对象归还给外部。 已经绑定在Communicator上的IOService如果停止工作,IOService::handle_stop会被调用。停止工作有两种原因,一种是发生错误(stop的错误码为errno),另一种是Communiator::deinit()被调用(stop错误码为0)。handle_stop被调用时,IOService并未解绑,依然需要调用Communicator::io_unbind来解绑IOSerivce。 这部分代码逻辑太复杂了,几乎没有办法文字描述清楚。你有兴趣可以看看代码。IOService_thread的实现简单一些,当IOSession被请求是,实时创建一个线程进行IO。 另外,IOService和用于网络服务的CommService接口上是几乎一致的。IOService的请求来自本地调用,CommSession来自网络连接。

收到,阅读源码过程中最不好理解的就是这个部分,现在有了你的描述就好说了

@519984307
Copy link

519984307 commented Apr 7, 2024

class __WFFilepreadTask : public WFFilepreadTask
{
public:
    __WFFilepreadTask(const std::string& path, void *buf, size_t count,
                      off_t offset, IOService *service, fio_callback_t&& cb):
        WFFilepreadTask(-1, buf, count, offset, service, std::move(cb)),
        pathname(path)
    {
    }

protected:
    virtual int prepare()
    {

       HANDLE handle = CreateFile(
            this->pathname.c_str(),// 文件路径
            GENERIC_READ,// 打开文件以进行读取
            FILE_SHARE_READ,// 共享模式
            NULL,// 安全属性(可以为NULL)
            OPEN_EXISTING,// 打开现有文件
            FILE_ATTRIBUTE_NORMAL,// 文件属性
            NULL// 模板文件句柄(可以为NULL)
           );
       int fd = _open_osfhandle((intptr_t)handle, 0);
       this->args.fd =fd;

        if (this->args.fd < 0)
            return -1;

        return WFFilepreadTask::prepare();
    }

    virtual SubTask *done()
    {
        if (this->args.fd >= 0)
        {
              close(this->args.fd);
            this->args.fd = -1;
        }

        return WFFilepreadTask::done();
    }

protected:
    std::string pathname;
};

class __WFFilepwriteTask : public WFFilepwriteTask
{
public:
    __WFFilepwriteTask(const std::string& path, const void *buf, size_t count,
                       off_t offset, IOService *service, fio_callback_t&& cb):
        WFFilepwriteTask(-1, buf, count, offset, service, std::move(cb)),
        pathname(path)
    {
    }

protected:
    virtual int prepare()
    {
        HANDLE handle = CreateFile(
            this->pathname.c_str(),// 文件路径
            GENERIC_READ,// 打开文件以进行读取
            FILE_SHARE_READ,// 共享模式
            NULL,// 安全属性(可以为NULL)
            OPEN_EXISTING,// 打开现有文件
            FILE_ATTRIBUTE_NORMAL,// 文件属性
            NULL// 模板文件句柄(可以为NULL)
            );
        int fd = _open_osfhandle((intptr_t)handle, 0);
        this->args.fd =fd;
        if (this->args.fd < 0)
            return -1;

        return WFFilepwriteTask::prepare();
    }

    virtual SubTask *done()
    {
        if (this->args.fd >= 0)
        {
             close(this->args.fd);
            this->args.fd = -1;
        }

        return WFFilepwriteTask::done();
    }

protected:
    std::string pathname;
};

class __WFFilepreadvTask : public WFFilepreadvTask
{
public:
    __WFFilepreadvTask(const std::string& path, const struct iovec *iov,
                       int iovcnt, off_t offset, IOService *service,
                       fvio_callback_t&& cb) :
        WFFilepreadvTask(-1, iov, iovcnt, offset, service, std::move(cb)),
        pathname(path)
    {
    }

protected:
    virtual int prepare()
    {
        HANDLE handle = CreateFile(
            this->pathname.c_str(),// 文件路径
            GENERIC_READ,// 打开文件以进行读取
            FILE_SHARE_READ,// 共享模式
            NULL,// 安全属性(可以为NULL)
            OPEN_EXISTING,// 打开现有文件
            FILE_ATTRIBUTE_NORMAL,// 文件属性
            NULL// 模板文件句柄(可以为NULL)
            );
        int fd = _open_osfhandle((intptr_t)handle, 0);
        this->args.fd =fd;
        if (this->args.fd < 0)
            return -1;

        return WFFilepreadvTask::prepare();
    }

    virtual SubTask *done()
    {
        if (this->args.fd >= 0)
        {
           close(this->args.fd);
            this->args.fd = -1;
        }

        return WFFilepreadvTask::done();
    }

protected:
    std::string pathname;
};

class __WFFilepwritevTask : public WFFilepwritevTask
{
public:
    __WFFilepwritevTask(const std::string& path, const struct iovec *iov,
                        int iovcnt, off_t offset, IOService *service,
                        fvio_callback_t&& cb) :
        WFFilepwritevTask(-1, iov, iovcnt, offset, service, std::move(cb)),
        pathname(path)
    {
    }

protected:
    virtual int prepare()
    {
        HANDLE handle = CreateFile(
            this->pathname.c_str(),// 文件路径
            GENERIC_READ,// 打开文件以进行读取
            FILE_SHARE_READ,// 共享模式
            NULL,// 安全属性(可以为NULL)
            OPEN_EXISTING,// 打开现有文件
            FILE_ATTRIBUTE_NORMAL,// 文件属性
            NULL// 模板文件句柄(可以为NULL)
            );
        int fd = _open_osfhandle((intptr_t)handle, 0);
        this->args.fd =fd;
        if (this->args.fd < 0)
            return -1;

        return WFFilepwritevTask::prepare();
    }

protected:
    virtual SubTask *done()
    {
        if (this->args.fd >= 0)
        {
          close(this->args.fd);
            this->args.fd = -1;
        }

        return WFFilepwritevTask::done();
    }

protected:
    std::string pathname;
};

static int __writefile_io(IOCPData *iocp_data, int timeout)
{

	WriteContext *ctx = (WriteContext *)iocp_data->data.context;

	int ret = WriteFile(iocp_data->data.handle, ctx->entry, ctx->count, NULL,
						&iocp_data->overlap);
	if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
	{
		if (ret != 0 && timeout == 0)
			CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);

		return -1;
	}
	errno = WSAGetLastError();
	return 0; // 成功启动异步操作
}

static int __readfile_io(IOCPData *iocp_data, int timeout)
{

	ReadContext *ctx = (ReadContext *)iocp_data->data.context;
	int ret = ReadFile(iocp_data->data.handle, ctx->entry, ctx->msgsize, NULL,
					   &iocp_data->overlap);

	if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
	{
		if (ret != 0 && timeout == 0)
			CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);

		return -1;
	}
	errno = WSAGetLastError();
	return 0; // 成功启动异步操作
}

各位大神看看这个可以不

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

4 participants