Skip to content

Commit

Permalink
Merge pull request #40 from wildfoundry/poll
Browse files Browse the repository at this point in the history
Poll
  • Loading branch information
willmcgugan authored Apr 5, 2018
2 parents 6d16b28 + 891e6e0 commit 5093017
Show file tree
Hide file tree
Showing 14 changed files with 319 additions and 162 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ compliance/reports
.python-version
reports
_*
.pytest_cache
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Changed
- Lomond now uses Poll or KQueue depending on platform, rather than select
- Fail fast on invalid utf-8

## [0.1.13] - 2018-01-29

### Added
Expand Down
5 changes: 2 additions & 3 deletions compliance/runtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def run_tests():
def run_ws(url):
"""Run a websocket until close."""
ws = WebSocket(url)
for event in ws:
for event in ws.connect(ping_rate=0):
try:
if event.name == 'text':
ws.send_text(event.text)
Expand All @@ -64,8 +64,7 @@ def run_test(test_no):
def run_test_cases(case_tuples):
"""Run a number of test cases from their 'case tuple'"""
for case_tuple in case_tuples:
print("\n[{}]".format(case_tuple))
qs = urlencode({'agent': USER_AGENT})
qs = urlencode({'agent': USER_AGENT, 'case': case_tuple})
url = server + '/runCase?{}'.format(qs)
run_ws(url)
update_report()
Expand Down
2 changes: 1 addition & 1 deletion lomond/_version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from __future__ import unicode_literals

__version__ = "0.1.13"
__version__ = "0.1.14"
34 changes: 32 additions & 2 deletions lomond/frame_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
import logging
import struct

from six import text_type

from . import errors
from .frame import Frame
from .parser import Parser
from .parser import ParseError, Parser
from .utf8validator import Utf8Validator


log = logging.getLogger('lomond')
Expand All @@ -25,6 +28,8 @@ class FrameParser(Parser):
def __init__(self, frame_class=Frame, parse_headers=True):
self._frame_class = frame_class
self._parse_headers = parse_headers
self._is_text = False
self._utf8_validator = Utf8Validator()
super(FrameParser, self).__init__()

def parse(self):
Expand Down Expand Up @@ -65,6 +70,31 @@ def parse(self):
rsv2=rsv2,
rsv3=rsv3
)

if frame.is_text:
self._is_text = True

if payload_length:
frame.payload = yield self.read(payload_length)
_is_text_continuation = (
frame.is_continuation and self._is_text
)
if frame.is_text or _is_text_continuation:
frame.payload = yield self.read_utf8(
payload_length,
self._utf8_validator
)
else:
frame.payload = yield self.read(payload_length)

if frame.fin and (frame.is_text or frame.is_continuation):
_, ends_on_codepoint, _, _ = (
self._utf8_validator.validate(b'')
)
self._utf8_validator.reset()
self._is_text = False
if not ends_on_codepoint:
raise errors.CriticalProtocolError(
'invalid utf-8; does not end on codepoint'
)

yield frame
2 changes: 1 addition & 1 deletion lomond/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def from_payload(cls, payload):
text = payload.decode('utf-8')
except UnicodeDecodeError as error:
raise errors.CriticalProtocolError(
'payload contains invalid utf-8 ({})',
'payload contains invalid utf-8; {}',
error
)
return cls(text)
Expand Down
72 changes: 51 additions & 21 deletions lomond/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,45 @@
from __future__ import unicode_literals


class ParseError(Exception):
"""Stream failed to parse."""


class _Awaitable(object):
"""An operation that effectively suspends the coroutine."""
# Analogous to Python3 asyncio concept

def validate(self, chunk):
"""Raise any ParseErrors"""


class _ReadBytes(_Awaitable):
"""Reads a fixed number of bytes."""
__slots__ = ['remaining']
def __init__(self, count):
self.remaining = count


class _ReadUtf8(_ReadBytes):
"""Reads a fixed number of bytes, validates utf-8."""
__slots__ = ['utf8_validator']
def __init__(self, count, utf8_validator):
self.remaining = count
self.utf8_validator = utf8_validator

def validate(self, data):
valid, _, _, _ = self.utf8_validator.validate(data)
if not valid:
raise ParseError('invalid utf8')


class _ReadUntil(_Awaitable):
"""Read until a separator."""
__slots__ = ['sep']
def __init__(self, sep):
self.sep = sep


class Parser(object):
"""
Coroutine based stream parser.
Expand Down Expand Up @@ -37,23 +76,8 @@ def __init__(self):
self._closed = False
self.reset()

class _Awaitable(object):
"""An operation that effectively suspends the coroutine."""
# Analogous to Python3 asyncio concept

class _ReadBytes(_Awaitable):
"""Reads a fixed number of bytes."""
__slots__ = ['remaining']
def __init__(self, count):
self.remaining = count

class _ReadUntil(_Awaitable):
"""Read until a separator."""
__slots__ = ['sep']
def __init__(self, sep):
self.sep = sep

read = _ReadBytes
read_utf8 = _ReadUtf8
read_until = _ReadUntil

def __del__(self):
Expand Down Expand Up @@ -81,27 +105,34 @@ def feed(self, data):
pos = 0
while pos < len(data):
# Awaiting a read of a fixed number of bytes
if isinstance(self._awaiting, self.read):
if isinstance(self._awaiting, _ReadBytes):
# This many bytes left to read
remaining = self._awaiting.remaining
# Bite off remaining bytes
chunk = data[pos:pos + remaining]
chunk_size = len(chunk)
pos += chunk_size
try:
# Validate new data
self._awaiting.validate(chunk)
except ParseError as error:
# Raises an exception in parse()
self._gen.throw(error)
# Add to buffer
self._buffer.append(chunk)
remaining -= chunk_size
if remaining:
# Await more bytes
self._awaiting = self.read(remaining)
self._awaiting.remaining = remaining
else:
# Got all the bytes we need in buffer
send_bytes = b''.join(self._buffer)
del self._buffer[:]
# Send to coroutine, get new 'awaitable'
self._awaiting = self._gen.send(send_bytes)

# Awaiting a read until a terminator
elif isinstance(self._awaiting, self.read_until):
elif isinstance(self._awaiting, _ReadUntil):
# Reading to separator
chunk = data[pos:]
self._until += chunk
Expand All @@ -122,11 +153,10 @@ def feed(self, data):
# Send bytes to coroutine, get new 'awaitable'
self._awaiting = self._gen.send(send_bytes)
# Yield any non-awaitables...
while not isinstance(self._awaiting, self._Awaitable):
while not isinstance(self._awaiting, _Awaitable):
yield self._awaiting
self._awaiting = next(self._gen)


def parse(self):
"""
A generator to parse incoming stream.
Expand Down
Loading

0 comments on commit 5093017

Please sign in to comment.