From 570a387dab3db4860b1625634ddda378ebffa292 Mon Sep 17 00:00:00 2001 From: philoinovsky <1129410550@qq.com> Date: Sun, 11 Jul 2021 09:45:44 +0000 Subject: [PATCH] replace make_function with lambda --- include/boost/python/eventloop.hpp | 48 +++++++++--- src/eventloop.cpp | 122 +++++++++-------------------- 2 files changed, 78 insertions(+), 92 deletions(-) diff --git a/include/boost/python/eventloop.hpp b/include/boost/python/eventloop.hpp index 58b9e7bf3..a9c813ff5 100644 --- a/include/boost/python/eventloop.hpp +++ b/include/boost/python/eventloop.hpp @@ -37,8 +37,7 @@ class event_loop // TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback. inline void call_soon(object f) { - _strand.post([f, loop=this] {f();}); - return; + _strand.post([f]{f();}); } // TODO: implement this @@ -57,22 +56,22 @@ class event_loop inline void add_reader(int fd, object f) { - _add_reader_or_writer(fd, f, fd * 2); + _async_wait_fd(fd, f, _read_key(fd)); } inline void remove_reader(int fd) { - _remove_reader_or_writer(fd * 2); + _descriptor_map.erase(_read_key(fd)); } inline void add_writer(int fd, object f) { - _add_reader_or_writer(fd, f, fd * 2 + 1); + _async_wait_fd(fd, f, _write_key(fd)); } inline void remove_writer(int fd) { - _remove_reader_or_writer(fd * 2 + 1); + _descriptor_map.erase(_write_key(fd)); } @@ -119,7 +118,8 @@ class event_loop object _pymod_ssl = object(); object _pymod_socket = import("socket"); object _pymod_traceback = import("traceback"); - object _pymod_logger = import("asyncio.log").attr("logger"); + object _py_wrap_future = import("asyncio").attr("wrap_future"); + object _py_logger = import("asyncio.log").attr("logger"); object _pymod_concurrent_future = import("concurrent").attr("futures"); object _exception_handler = object(); boost::asio::io_context::strand _strand; @@ -127,8 +127,38 @@ class event_loop std::unordered_map> _descriptor_map; std::chrono::steady_clock::time_point _created_time; - void _add_reader_or_writer(int fd, object f, int key); - void _remove_reader_or_writer(int key); + inline int _read_key(int fd) + { + return fd * 2; + } + + inline int _write_key(int fd) + { + return fd * 2 + 1; + } + + template + void _async_wait_fd(int fd, F f, int key) + { + // add descriptor + if (_descriptor_map.find(key) == _descriptor_map.end()) + { + _descriptor_map.emplace(key, + std::move(std::make_unique(_strand.context(), fd)) + ); + } + + _descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read, + boost::asio::bind_executor(_strand, [this, key, f] (const boost::system::error_code& ec) + { + _descriptor_map.erase(key); + f(); + })); + return; + } + + static void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr); + static void _sock_accept(event_loop& loop, object fut, object sock); }; }}} diff --git a/src/eventloop.cpp b/src/eventloop.cpp index 913506b46..3bfeb9aa2 100644 --- a/src/eventloop.cpp +++ b/src/eventloop.cpp @@ -27,7 +27,9 @@ bool _hasattr(object o, const char* name) return PyObject_HasAttrString(o.ptr(), name); } -void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr) +} + +void event_loop::_sock_connect_cb(object pymod_socket, object fut, object sock, object addr) { try { @@ -61,11 +63,10 @@ void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr) } } -void _sock_accept(event_loop& loop, object fut, object sock) +void event_loop::_sock_accept(event_loop& loop, object fut, object sock) { int fd = extract(sock.attr("fileno")()); - object conn; - object address; + object conn, address; try { object ret = sock.attr("accept")(); @@ -80,9 +81,7 @@ void _sock_accept(event_loop& loop, object fut, object sock) || PyErr_ExceptionMatches(PyExc_InterruptedError)) { PyErr_Clear(); - loop.add_reader(fd, make_function(bind( - _sock_accept, boost::ref(loop), fut, sock), - default_call_policies(), boost::mpl::vector())); + loop._async_wait_fd(fd, bind(_sock_accept, boost::ref(loop), fut, sock), loop._write_key(fd)); } else if (PyErr_ExceptionMatches(PyExc_SystemExit) || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) @@ -94,43 +93,6 @@ void _sock_accept(event_loop& loop, object fut, object sock) PyErr_Clear(); fut.attr("set_exception")(std::current_exception()); } - } -} - -} - -void event_loop::_add_reader_or_writer(int fd, object f, int key) -{ - // add descriptor - if (_descriptor_map.find(key) == _descriptor_map.end()) - { - _descriptor_map.emplace(key, - std::move(std::make_unique(_strand.context(), fd)) - ); - } - - _descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read, - boost::asio::bind_executor(_strand, [key, f, loop=this] (const boost::system::error_code& ec) - { - // move descriptor - auto iter = loop->_descriptor_map.find(key); - if (iter != loop->_descriptor_map.end()) - { - iter->second->release(); - loop->_descriptor_map.erase(iter); - } - loop->call_soon(f); - })); - return; -} - -void event_loop::_remove_reader_or_writer(int key) -{ - auto iter = _descriptor_map.find(key); - if (iter != _descriptor_map.end()) - { - iter->second->release(); - _descriptor_map.erase(iter); } } @@ -155,14 +117,14 @@ object event_loop::sock_recv(object sock, size_t nbytes) { int fd = extract(sock.attr("fileno")()); int fd_dup = dup(fd); - object py_fut = _pymod_concurrent_future.attr("Future")(); - add_reader(fd_dup, make_function( - [py_fut, nbytes, fd=fd_dup] (object obj) { + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _async_wait_fd(fd_dup, + [py_fut, nbytes, fd=fd_dup] { std::vector buffer(nbytes); read(fd, buffer.data(), nbytes); py_fut.attr("set_result")(object(handle<>(PyBytes_FromStringAndSize(buffer.data(), nbytes)))); }, - default_call_policies(), boost::mpl::vector())); + _read_key(fd)); return py_fut; } @@ -171,14 +133,14 @@ object event_loop::sock_recv_into(object sock, object buffer) int fd = extract(sock.attr("fileno")()); int fd_dup = dup(fd); ssize_t nbytes = len(buffer); - object py_fut = _pymod_concurrent_future.attr("Future")(); - add_reader(fd_dup, make_function( - [py_fut, nbytes, fd=fd_dup] (object obj) { + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _async_wait_fd(fd_dup, + [py_fut, nbytes, fd=fd_dup] { std::vector buffer(nbytes); ssize_t nbytes_read = read(fd, buffer.data(), nbytes); py_fut.attr("set_result")(nbytes_read); - }, - default_call_policies(), boost::mpl::vector())); + }, + _read_key(fd)); return py_fut; } @@ -188,13 +150,13 @@ object event_loop::sock_sendall(object sock, object data) int fd_dup = dup(fd); char const* py_str = extract(data.attr("decode")()); ssize_t py_str_len = len(data); - object py_fut = _pymod_concurrent_future.attr("Future")(); - add_writer(fd_dup, make_function( - [py_fut, fd, py_str, py_str_len] (object obj) { + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _async_wait_fd(fd_dup, + [py_fut, fd, py_str, py_str_len] { write(fd, py_str, py_str_len); py_fut.attr("set_result")(object()); - }, - default_call_policies(), boost::mpl::vector())); + }, + _write_key(fd)); return py_fut; } @@ -205,12 +167,12 @@ object event_loop::sock_connect(object sock, object address) { // TODO: _ensure_resolve } - object fut = _pymod_concurrent_future.attr("Future")(); + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); int fd = extract(sock.attr("fileno")()); try { sock.attr("connect")(address); - fut.attr("set_result")(object()); + py_fut.attr("set_result")(object()); } catch (const error_already_set& e) { @@ -218,9 +180,7 @@ object event_loop::sock_connect(object sock, object address) || PyErr_ExceptionMatches(PyExc_InterruptedError)) { PyErr_Clear(); - add_writer(dup(fd), make_function(bind( - _sock_connect_cb, _pymod_socket, fut, sock, address), - default_call_policies(), boost::mpl::vector())); + _async_wait_fd(dup(fd), bind(_sock_connect_cb, _pymod_socket, py_fut, sock, address), _write_key(fd)); } else if (PyErr_ExceptionMatches(PyExc_SystemExit) || PyErr_ExceptionMatches(PyExc_KeyboardInterrupt)) @@ -230,17 +190,17 @@ object event_loop::sock_connect(object sock, object address) else { PyErr_Clear(); - fut.attr("set_exception")(std::current_exception()); + py_fut.attr("set_exception")(std::current_exception()); } } - return fut; + return py_fut; } object event_loop::sock_accept(object sock) { - object fut = _pymod_concurrent_future.attr("Future")(); - _sock_accept(*this, fut, sock); - return fut; + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _sock_accept(*this, py_fut, sock); + return py_fut; } // TODO: implement this @@ -262,27 +222,23 @@ object event_loop::start_tls(object transport, object protocol, object sslcontex object event_loop::getaddrinfo(object host, int port, int family, int type, int proto, int flags) { - object py_fut = _pymod_concurrent_future.attr("Future")(); - call_soon(make_function( - [this, py_fut, host, port, family, type, proto, flags] (object obj) { + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _strand.post( + [this, py_fut, host, port, family, type, proto, flags] { object res = _pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags); py_fut.attr("set_result")(res); - }, - default_call_policies(), - boost::mpl::vector())); + }); return py_fut; } object event_loop::getnameinfo(object sockaddr, int flags) { - object py_fut = _pymod_concurrent_future.attr("Future")(); - call_soon(make_function( - [this, py_fut, sockaddr, flags] (object obj) { + object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")()); + _strand.post( + [this, py_fut, sockaddr, flags] { object res = _pymod_socket.attr("getnameinfo")(sockaddr, flags); py_fut.attr("set_result")(res); - }, - default_call_policies(), - boost::mpl::vector())); + }); return py_fut; } @@ -345,7 +301,7 @@ void event_loop::default_exception_handler(object context) dict kwargs; args.append(str("\n").join(log_lines)); kwargs["exc_info"] = exc_info; - _pymod_logger.attr("error")(tuple(args), **kwargs); + _py_logger.attr("error")(tuple(args), **kwargs); } void event_loop::call_exception_handler(object context) @@ -370,7 +326,7 @@ void event_loop::call_exception_handler(object context) dict kwargs; args.append(str("Exception in default exception handler")); kwargs["exc_info"] = true; - _pymod_logger.attr("error")(tuple(args), **kwargs); + _py_logger.attr("error")(tuple(args), **kwargs); } } } @@ -416,7 +372,7 @@ void event_loop::call_exception_handler(object context) boost::python::dict kwargs; args.append(str("Exception in default exception handler")); kwargs["exc_info"] = true; - _pymod_logger.attr("error")(tuple(args), **kwargs); + _py_logger.attr("error")(tuple(args), **kwargs); } } }