Skip to content

Commit

Permalink
socket: created connect, accept, recv, send, shutdown, bind,
Browse files Browse the repository at this point in the history
`listen`, `getsockname`, `setsockopt`, `getsockopt` and test and example
  • Loading branch information
YoSTEALTH committed Jul 13, 2024
1 parent ecadc17 commit af27d87
Show file tree
Hide file tree
Showing 5 changed files with 360 additions and 6 deletions.
51 changes: 48 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ Shakti will be providing developers with fast & powerful yet easy to use Python

* Mostly all events are planned to go through ``io_uring`` backend, this is a design choice.

This is when ``io_uring`` starts becoming fun to use!


*****NOTE*****
--------------

Work in progress... This project is in early ``planning`` state, so... its ok to play around with it but not for any type of serious development, yet.
Work in progress... This project is in early ``planning`` state, so... its ok to play around with it but not for any type of serious development, yet!


Requires
Expand Down Expand Up @@ -133,6 +131,53 @@ __
if __name__ == '__main__':
run(main())
Socket
______

.. code-block:: python
from shakti import SOL_SOCKET, SO_REUSEADDR, run, socket, bind, listen, accept, \
connect, recv, send, shutdown, close, sleep, setsockopt # task,
async def echo_server(host, port):
server_fd = await socket()
try:
print('Starting Server')
await setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, True)
await bind(server_fd, host, port)
await listen(server_fd, 1)
while client_fd := await accept(server_fd):
await client_handler(client_fd) # temp solution
# await task(client_handler(client_fd)) # TODO
break
finally:
await close(server_fd)
print('Closed Server')
async def client_handler(client_fd):
try:
print('server recv:', await recv(client_fd, 1024))
print('server sent:', await send(client_fd, b'hi from server'))
await shutdown(client_fd)
finally:
await close(client_fd)
async def echo_client(host, port):
await sleep(.001) # wait for `echo_server` to start up.
client_fd = await socket()
await connect(client_fd, host, port)
print('client sent:', await send(client_fd, b'hi from client'))
print('client recv:', await recv(client_fd, 1024))
await close(client_fd)
if __name__ == '__main__':
host = '127.0.0.1'
port = 12345
run(echo_server(host, port), echo_client(host, port))
.. _Liburing: https://github.com/YoSTEALTH/Liburing

Expand Down
42 changes: 42 additions & 0 deletions example/socket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from shakti import SOL_SOCKET, SO_REUSEADDR, run, socket, bind, listen, accept, \
connect, recv, send, shutdown, close, sleep, setsockopt # task,


async def echo_server(host, port):
server_fd = await socket()
try:
print('Starting Server')
await setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, True)
await bind(server_fd, host, port)
await listen(server_fd, 1)
while client_fd := await accept(server_fd):
await client_handler(client_fd) # temp solution
# await task(client_handler(client_fd)) # TODO
break
finally:
await close(server_fd)
print('Closed Server')


async def client_handler(client_fd):
try:
print('server recv:', await recv(client_fd, 1024))
print('server sent:', await send(client_fd, b'hi from server'))
await shutdown(client_fd)
finally:
await close(client_fd)


async def echo_client(host, port):
await sleep(.001) # wait for `echo_server` to start up.
client_fd = await socket()
await connect(client_fd, host, port)
print('client sent:', await send(client_fd, b'hi from client'))
print('client recv:', await recv(client_fd, 1024))
await close(client_fd)


if __name__ == '__main__':
host = '127.0.0.1'
port = 12345
run(echo_server(host, port), echo_client(host, port))
10 changes: 9 additions & 1 deletion src/shakti/io/socket.pxd
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from libc.errno cimport ENFILE
from cpython.array cimport array
from liburing.lib.socket cimport *
from liburing.socket cimport io_uring_prep_socket, io_uring_prep_socket_direct_alloc
from liburing.lib.type cimport bool as bool_t
from liburing.socket cimport sockaddr, io_uring_prep_socket, io_uring_prep_socket_direct_alloc, \
io_uring_prep_shutdown, io_uring_prep_send, io_uring_prep_recv, \
io_uring_prep_accept, io_uring_prep_connect, \
io_uring_prep_setsockopt, io_uring_prep_getsockopt
from liburing.socket_extra cimport bind as _bind, getsockname as _getsockname, listen as _listen, \
getpeername as _getpeername, getaddrinfo as _getaddrinfo, isIP
from liburing.time cimport timespec, io_uring_prep_link_timeout
from liburing.error cimport raise_error
from ..event.entry cimport SQE

Expand Down
182 changes: 181 additions & 1 deletion src/shakti/io/socket.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ async def socket(int family=__AF_INET, int type=__SOCK_STREAM, int protocol=0, u
''' Create Socket
Example
>>> sock_fd = await socket(AF_INET, SOCK_STREAM)
>>> sock_fd = await socket() # default: AF_INET, SOCK_STREAM
... ...
Note
- Setting `direct=True` will return direct descriptor index.
Expand All @@ -21,3 +22,182 @@ async def socket(int family=__AF_INET, int type=__SOCK_STREAM, int protocol=0, u
raise_error(sqe.result, 'Either file table is full or register file not enabled!')
raise_error(sqe.result)


async def connect(int sockfd, str host, in_port_t port=80):
'''
Example
>>> sockfd = await socket()
>>> await connect(sockfd, 'domain.ext')
# or
>>> await connect(sockfd, '0.0.0.0', 12345)
# or
>>> await connect(sockfd, '/path')
...
>>> await close(sockfd)
'''
cdef: # get family
SQE sqe = SQE()
bytes _host = host.encode()
sockaddr addr
socklen_t size = sizeof(__sockaddr_storage)
__sockaddr sa

__getsockname(sockfd, &sa, &size)

if sa.sa_family == __AF_UNIX:
addr = sockaddr(sa.sa_family, _host, port)
io_uring_prep_connect(sqe, sockfd, addr)
await sqe
elif sa.sa_family in (__AF_INET, __AF_INET6):
if isIP(sa.sa_family, _host):
addr = sockaddr(sa.sa_family, _host, port)
io_uring_prep_connect(sqe, sockfd, addr)
await sqe
else:
for af_, sock_, proto, canon, addr in _getaddrinfo(_host, str(port).encode()):
try:
io_uring_prep_connect(sqe, sockfd, addr)
await sqe
except OSError:
continue
else:
break
else:
raise NotImplementedError


async def accept(int sockfd, int flags=0)-> int:
'''
Example
>>> client_fd = await accept(socket_fd)
'''
cdef SQE sqe = SQE()
io_uring_prep_accept(sqe, sockfd, None, flags)
await sqe
return sqe.result


async def recv(int sockfd, unsigned int bufsize, int flags=0):
'''
Example
>>> await recv(client_fd, 13)
b'received data'
'''
cdef:
SQE sqe = SQE()
memoryview buf = memoryview(bytearray(bufsize))
io_uring_prep_recv(sqe, sockfd, buf, bufsize, flags)
await sqe
return bytes(buf[:sqe.result])


async def send(int sockfd, const unsigned char[:] buf, int flags=0):
'''
Example
>>> await send(client_fd, b'send data')
10
'''
cdef:
SQE sqe = SQE()
size_t length = len(buf)
io_uring_prep_send(sqe, sockfd, buf, length, flags)
await sqe
return sqe.result


async def shutdown(int sockfd, int how=__SHUT_RDWR):
'''
How
SHUT_RD
SHUT_WR
SHUT_RDWR # (default)
'''
cdef SQE sqe = SQE()
io_uring_prep_shutdown(sqe, sockfd, how)
await sqe


async def bind(int sockfd, str host, in_port_t port)-> object:
'''
Example
>>> sockfd = await socket()
>>> addr = await bind(sock_fd, '0.0.0.0', 12345)
>>> await getsockname(sockfd, addr)
'0.0.0.0', 12345
# or
>>> addr = await bind(sock_fd, '0.0.0.0', 0) # random port
>>> await getsockname(sockfd, addr)
'0.0.0.0', 6744 # random port
>>> await close(sockfd)
'''
cdef: # get family
__sockaddr sa
socklen_t size = sizeof(__sockaddr_storage)

__getsockname(sockfd, &sa, &size)

cdef sockaddr addr = sockaddr(sa.sa_family, host.encode(), port)
_bind(sockfd, addr)
return addr


async def listen(int sockfd, int backlog)-> int:
return _listen(sockfd, backlog)


async def getsockname(int sockfd, sockaddr addr)-> tuple[str, int]:
cdef:
bytes ip
int port
ip, port = _getsockname(sockfd, addr)
return ip.decode(), port


async def setsockopt(int sockfd, int level, int optname, object optval):
'''
Example
>>> await setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, 1)
Warning
- This function is still flawed, needs more testing.
'''
cdef:
array val
str t = type(optval).__name__
# note: have to use `str` to check as `bool` type does not work well!

if t in ('int', 'bool'):
val = array('i', [optval])
elif t == 'str':
val = array('B', [optval.encode()])
elif t == 'bytes':
val = array('B', [optval])
else:
raise TypeError(f'`setsockopt` received `optval` type {t!r}, not supported')
cdef SQE sqe = SQE()
io_uring_prep_setsockopt(sqe, sockfd, level, optname, val)
await sqe


async def getsockopt(int sockfd, int level, int optname)-> int:
'''
Example
>>> await getsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR)
1
Warning
- This function is still flawed, needs more testing.
'''
cdef:
timespec ts = timespec(.001)
SQE sqe = SQE(2, False)
array optval = array('i', [0])
io_uring_prep_getsockopt(sqe, sockfd, level, optname, optval)
io_uring_prep_link_timeout(sqe[1], ts, 0)
# note: timeout is needed or else `getsockopt` will hang forever!
await sqe
return optval[0]
Loading

0 comments on commit af27d87

Please sign in to comment.