Skip to content

Commit

Permalink
replace make_function with lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
philoinovsky committed Jul 11, 2021
1 parent 914a18d commit 570a387
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 92 deletions.
48 changes: 39 additions & 9 deletions include/boost/python/eventloop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ class event_loop
// TODO: An instance of asyncio.Handle is returned, which can be used later to cancel the callback.
inline void call_soon(object f)
{
_strand.post([f, loop=this] {f();});
return;
_strand.post([f]{f();});
}

// TODO: implement this
Expand All @@ -57,22 +56,22 @@ class event_loop

inline void add_reader(int fd, object f)
{
_add_reader_or_writer(fd, f, fd * 2);
_async_wait_fd(fd, f, _read_key(fd));
}

inline void remove_reader(int fd)
{
_remove_reader_or_writer(fd * 2);
_descriptor_map.erase(_read_key(fd));
}

inline void add_writer(int fd, object f)
{
_add_reader_or_writer(fd, f, fd * 2 + 1);
_async_wait_fd(fd, f, _write_key(fd));
}

inline void remove_writer(int fd)
{
_remove_reader_or_writer(fd * 2 + 1);
_descriptor_map.erase(_write_key(fd));
}


Expand Down Expand Up @@ -119,16 +118,47 @@ class event_loop
object _pymod_ssl = object();
object _pymod_socket = import("socket");
object _pymod_traceback = import("traceback");
object _pymod_logger = import("asyncio.log").attr("logger");
object _py_wrap_future = import("asyncio").attr("wrap_future");
object _py_logger = import("asyncio.log").attr("logger");
object _pymod_concurrent_future = import("concurrent").attr("futures");
object _exception_handler = object();
boost::asio::io_context::strand _strand;
// read: key = fd * 2 + 0, write: key = fd * 2 + 1
std::unordered_map<int, std::unique_ptr<boost::asio::posix::stream_descriptor>> _descriptor_map;
std::chrono::steady_clock::time_point _created_time;

void _add_reader_or_writer(int fd, object f, int key);
void _remove_reader_or_writer(int key);
inline int _read_key(int fd)
{
return fd * 2;
}

inline int _write_key(int fd)
{
return fd * 2 + 1;
}

template<typename F>
void _async_wait_fd(int fd, F f, int key)
{
// add descriptor
if (_descriptor_map.find(key) == _descriptor_map.end())
{
_descriptor_map.emplace(key,
std::move(std::make_unique<boost::asio::posix::stream_descriptor>(_strand.context(), fd))
);
}

_descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read,
boost::asio::bind_executor(_strand, [this, key, f] (const boost::system::error_code& ec)
{
_descriptor_map.erase(key);
f();
}));
return;
}

static void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr);
static void _sock_accept(event_loop& loop, object fut, object sock);
};

}}}
Expand Down
122 changes: 39 additions & 83 deletions src/eventloop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ bool _hasattr(object o, const char* name)
return PyObject_HasAttrString(o.ptr(), name);
}

void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr)
}

void event_loop::_sock_connect_cb(object pymod_socket, object fut, object sock, object addr)
{
try
{
Expand Down Expand Up @@ -61,11 +63,10 @@ void _sock_connect_cb(object pymod_socket, object fut, object sock, object addr)
}
}

void _sock_accept(event_loop& loop, object fut, object sock)
void event_loop::_sock_accept(event_loop& loop, object fut, object sock)
{
int fd = extract<int>(sock.attr("fileno")());
object conn;
object address;
object conn, address;
try
{
object ret = sock.attr("accept")();
Expand All @@ -80,9 +81,7 @@ void _sock_accept(event_loop& loop, object fut, object sock)
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
{
PyErr_Clear();
loop.add_reader(fd, make_function(bind(
_sock_accept, boost::ref(loop), fut, sock),
default_call_policies(), boost::mpl::vector<void, object>()));
loop._async_wait_fd(fd, bind(_sock_accept, boost::ref(loop), fut, sock), loop._write_key(fd));
}
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
Expand All @@ -94,43 +93,6 @@ void _sock_accept(event_loop& loop, object fut, object sock)
PyErr_Clear();
fut.attr("set_exception")(std::current_exception());
}
}
}

}

void event_loop::_add_reader_or_writer(int fd, object f, int key)
{
// add descriptor
if (_descriptor_map.find(key) == _descriptor_map.end())
{
_descriptor_map.emplace(key,
std::move(std::make_unique<boost::asio::posix::stream_descriptor>(_strand.context(), fd))
);
}

_descriptor_map.find(key)->second->async_wait(boost::asio::posix::descriptor::wait_type::wait_read,
boost::asio::bind_executor(_strand, [key, f, loop=this] (const boost::system::error_code& ec)
{
// move descriptor
auto iter = loop->_descriptor_map.find(key);
if (iter != loop->_descriptor_map.end())
{
iter->second->release();
loop->_descriptor_map.erase(iter);
}
loop->call_soon(f);
}));
return;
}

void event_loop::_remove_reader_or_writer(int key)
{
auto iter = _descriptor_map.find(key);
if (iter != _descriptor_map.end())
{
iter->second->release();
_descriptor_map.erase(iter);
}
}

Expand All @@ -155,14 +117,14 @@ object event_loop::sock_recv(object sock, size_t nbytes)
{
int fd = extract<int>(sock.attr("fileno")());
int fd_dup = dup(fd);
object py_fut = _pymod_concurrent_future.attr("Future")();
add_reader(fd_dup, make_function(
[py_fut, nbytes, fd=fd_dup] (object obj) {
object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")());
_async_wait_fd(fd_dup,
[py_fut, nbytes, fd=fd_dup] {
std::vector<char> buffer(nbytes);
read(fd, buffer.data(), nbytes);
py_fut.attr("set_result")(object(handle<>(PyBytes_FromStringAndSize(buffer.data(), nbytes))));
},
default_call_policies(), boost::mpl::vector<void, object>()));
_read_key(fd));
return py_fut;
}

Expand All @@ -171,14 +133,14 @@ object event_loop::sock_recv_into(object sock, object buffer)
int fd = extract<int>(sock.attr("fileno")());
int fd_dup = dup(fd);
ssize_t nbytes = len(buffer);
object py_fut = _pymod_concurrent_future.attr("Future")();
add_reader(fd_dup, make_function(
[py_fut, nbytes, fd=fd_dup] (object obj) {
object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")());
_async_wait_fd(fd_dup,
[py_fut, nbytes, fd=fd_dup] {
std::vector<char> buffer(nbytes);
ssize_t nbytes_read = read(fd, buffer.data(), nbytes);
py_fut.attr("set_result")(nbytes_read);
},
default_call_policies(), boost::mpl::vector<void, object>()));
},
_read_key(fd));
return py_fut;
}

Expand All @@ -188,13 +150,13 @@ object event_loop::sock_sendall(object sock, object data)
int fd_dup = dup(fd);
char const* py_str = extract<char const*>(data.attr("decode")());
ssize_t py_str_len = len(data);
object py_fut = _pymod_concurrent_future.attr("Future")();
add_writer(fd_dup, make_function(
[py_fut, fd, py_str, py_str_len] (object obj) {
object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")());
_async_wait_fd(fd_dup,
[py_fut, fd, py_str, py_str_len] {
write(fd, py_str, py_str_len);
py_fut.attr("set_result")(object());
},
default_call_policies(), boost::mpl::vector<void, object>()));
},
_write_key(fd));
return py_fut;
}

Expand All @@ -205,22 +167,20 @@ object event_loop::sock_connect(object sock, object address)
{
// TODO: _ensure_resolve
}
object fut = _pymod_concurrent_future.attr("Future")();
object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")());
int fd = extract<int>(sock.attr("fileno")());
try
{
sock.attr("connect")(address);
fut.attr("set_result")(object());
py_fut.attr("set_result")(object());
}
catch (const error_already_set& e)
{
if (PyErr_ExceptionMatches(PyExc_BlockingIOError)
|| PyErr_ExceptionMatches(PyExc_InterruptedError))
{
PyErr_Clear();
add_writer(dup(fd), make_function(bind(
_sock_connect_cb, _pymod_socket, fut, sock, address),
default_call_policies(), boost::mpl::vector<void, object>()));
_async_wait_fd(dup(fd), bind(_sock_connect_cb, _pymod_socket, py_fut, sock, address), _write_key(fd));
}
else if (PyErr_ExceptionMatches(PyExc_SystemExit)
|| PyErr_ExceptionMatches(PyExc_KeyboardInterrupt))
Expand All @@ -230,17 +190,17 @@ object event_loop::sock_connect(object sock, object address)
else
{
PyErr_Clear();
fut.attr("set_exception")(std::current_exception());
py_fut.attr("set_exception")(std::current_exception());
}
}
return fut;
return py_fut;
}

object event_loop::sock_accept(object sock)
{
object fut = _pymod_concurrent_future.attr("Future")();
_sock_accept(*this, fut, sock);
return fut;
object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")());
_sock_accept(*this, py_fut, sock);
return py_fut;
}

// TODO: implement this
Expand All @@ -262,27 +222,23 @@ object event_loop::start_tls(object transport, object protocol, object sslcontex

object event_loop::getaddrinfo(object host, int port, int family, int type, int proto, int flags)
{
object py_fut = _pymod_concurrent_future.attr("Future")();
call_soon(make_function(
[this, py_fut, host, port, family, type, proto, flags] (object obj) {
object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")());
_strand.post(
[this, py_fut, host, port, family, type, proto, flags] {
object res = _pymod_socket.attr("getaddrinfo")(host, port, family, type, proto, flags);
py_fut.attr("set_result")(res);
},
default_call_policies(),
boost::mpl::vector<void, object>()));
});
return py_fut;
}

object event_loop::getnameinfo(object sockaddr, int flags)
{
object py_fut = _pymod_concurrent_future.attr("Future")();
call_soon(make_function(
[this, py_fut, sockaddr, flags] (object obj) {
object py_fut = _py_wrap_future(_pymod_concurrent_future.attr("Future")());
_strand.post(
[this, py_fut, sockaddr, flags] {
object res = _pymod_socket.attr("getnameinfo")(sockaddr, flags);
py_fut.attr("set_result")(res);
},
default_call_policies(),
boost::mpl::vector<void, object>()));
});
return py_fut;
}

Expand Down Expand Up @@ -345,7 +301,7 @@ void event_loop::default_exception_handler(object context)
dict kwargs;
args.append(str("\n").join(log_lines));
kwargs["exc_info"] = exc_info;
_pymod_logger.attr("error")(tuple(args), **kwargs);
_py_logger.attr("error")(tuple(args), **kwargs);
}

void event_loop::call_exception_handler(object context)
Expand All @@ -370,7 +326,7 @@ void event_loop::call_exception_handler(object context)
dict kwargs;
args.append(str("Exception in default exception handler"));
kwargs["exc_info"] = true;
_pymod_logger.attr("error")(tuple(args), **kwargs);
_py_logger.attr("error")(tuple(args), **kwargs);
}
}
}
Expand Down Expand Up @@ -416,7 +372,7 @@ void event_loop::call_exception_handler(object context)
boost::python::dict kwargs;
args.append(str("Exception in default exception handler"));
kwargs["exc_info"] = true;
_pymod_logger.attr("error")(tuple(args), **kwargs);
_py_logger.attr("error")(tuple(args), **kwargs);
}
}
}
Expand Down

0 comments on commit 570a387

Please sign in to comment.