Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Started porting everything to py3
Browse files Browse the repository at this point in the history
  • Loading branch information
sontek committed Aug 19, 2015
1 parent dd6997e commit 868656d
Show file tree
Hide file tree
Showing 23 changed files with 198 additions and 92 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ dist/
.*.swp
.*.un~
*.bak
.coverage
coverage.xml
*#*#*
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include *.txt
16 changes: 8 additions & 8 deletions pykafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from broker import Broker
from simpleconsumer import SimpleConsumer
from cluster import Cluster
from partition import Partition
from producer import Producer
from topic import Topic
from client import KafkaClient
from balancedconsumer import BalancedConsumer
from .broker import Broker
from .simpleconsumer import SimpleConsumer
from .cluster import Cluster
from .partition import Partition
from .producer import Producer
from .topic import Topic
from .client import KafkaClient
from .balancedconsumer import BalancedConsumer

__version__ = '1.1.1'

Expand Down
6 changes: 3 additions & 3 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ def _decide_partitions(self, participants):
# Freeze and sort partitions so we always have the same results
p_to_str = lambda p: '-'.join([p.topic.name, str(p.leader.id), str(p.id)])
all_parts = self._topic.partitions.values()
all_parts.sort(key=p_to_str)
all_parts = sorted(all_parts, key=p_to_str)

# get start point, # of partitions, and remainder
participants.sort() # just make sure it's sorted.
participants = sorted(participants) # just make sure it's sorted.
idx = participants.index(self._consumer_id)
parts_per_consumer = math.floor(len(all_parts) / len(participants))
remainder_ppc = len(all_parts) % len(participants)
Expand Down Expand Up @@ -343,7 +343,7 @@ def _get_participants(self):
participants.append(id_)
except NoNodeException:
pass # disappeared between ``get_children`` and ``get``
participants.sort()
participants = sorted(participants)
return participants

def _set_watches(self):
Expand Down
6 changes: 3 additions & 3 deletions pykafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
"""

__all__ = ["KafkaClient"]
import handlers
from .handlers import ThreadingHandler
import logging
from cluster import Cluster
from .cluster import Cluster

try:
import rd_kafka
Expand Down Expand Up @@ -71,7 +71,7 @@ def __init__(self,
self._source_address = source_address
self._socket_timeout_ms = socket_timeout_ms
self._offsets_channel_socket_timeout_ms = offsets_channel_socket_timeout_ms
self._handler = None if use_greenlets else handlers.ThreadingHandler()
self._handler = None if use_greenlets else ThreadingHandler()
self._use_rdkafka = rd_kafka and not ignore_rdkafka
if self._use_rdkafka:
log.info('Using rd_kafka extensions.')
Expand Down
9 changes: 4 additions & 5 deletions pykafka/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
__all__ = ["ResponseFuture", "Handler", "ThreadingHandler", "RequestHandler"]
import atexit
import threading
import Queue

from .utils.compat import Queue, Empty
from collections import namedtuple


Expand Down Expand Up @@ -68,8 +67,8 @@ def spawn(self, target, *args, **kwargs):

class ThreadingHandler(Handler):
"""A handler. that uses a :class:`threading.Thread` to perform its work"""
QueueEmptyError = Queue.Empty
Queue = Queue.Queue
QueueEmptyError = Empty
Queue = Queue
Event = threading.Event
Lock = threading.Lock

Expand Down Expand Up @@ -130,7 +129,7 @@ def worker():
if task.future:
res = self.connection.response()
task.future.set_response(res)
except Exception, e:
except Exception as e:
if task.future:
task.future.set_error(e)
finally:
Expand Down
35 changes: 18 additions & 17 deletions pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
import time
import threading
from collections import defaultdict
from Queue import Queue, Empty

from .common import OffsetType
from .utils.compat import Semaphore
from .utils.compat import Semaphore, Queue, Empty, iteritems, itervalues
from .exceptions import (OffsetOutOfRangeError, UnknownTopicOrPartition,
OffsetMetadataTooLarge, OffsetsLoadInProgress,
NotCoordinatorForConsumer, SocketDisconnectedError,
Expand Down Expand Up @@ -162,12 +161,12 @@ def __init__(self,
else:
self._partitions = {topic.partitions[k]:
OwnedPartition(p, self._messages_arrived)
for k, p in topic.partitions.iteritems()}
for k, p in iteritems(topic.partitions)}
self._partitions_by_id = {p.partition.id: p
for p in self._partitions.itervalues()}
for p in itervalues(self._partitions)}
# Organize partitions by leader for efficient queries
self._partitions_by_leader = defaultdict(list)
for p in self._partitions.itervalues():
for p in itervalues(self._partitions):
self._partitions_by_leader[p.partition.leader].append(p)
self.partition_cycle = itertools.cycle(self._partitions.values())

Expand Down Expand Up @@ -240,13 +239,13 @@ def topic(self):
def partitions(self):
"""A list of the partitions that this consumer consumes"""
return {id_: partition.partition
for id_, partition in self._partitions_by_id.iteritems()}
for id_, partition in iteritems(self._partitions_by_id.iteritems)}

@property
def held_offsets(self):
"""Return a map from partition id to held offset for each partition"""
return {p.partition.id: p.last_offset_consumed
for p in self._partitions_by_id.itervalues()}
for p in itervalues(self._partitions_by_id)}

def __del__(self):
"""Stop consumption and workers when object is deleted"""
Expand Down Expand Up @@ -357,13 +356,15 @@ def commit_offsets(self):
log.error("Error committing offsets for topic %s (errors: %s)",
self._topic.name,
{ERROR_CODES[err]: [op.partition.id for op, _ in parts]
for err, parts in parts_by_error.iteritems()})
for err, parts in iteritems(parts_by_error)})

# retry only the partitions that errored
if 0 in parts_by_error:
parts_by_error.pop(0)
errored_partitions = [op for code, err_group in parts_by_error.iteritems()
for op, res in err_group]
errored_partitions = [
op for code, err_group in iteritems(parts_by_error)
for op, res in err_group
]
reqs = [p.build_offset_commit_request() for p in errored_partitions]

def fetch_offsets(self):
Expand Down Expand Up @@ -408,7 +409,7 @@ def _handle_success(parts):
log.error("Error fetching offsets for topic %s (errors: %s)",
self._topic.name,
{ERROR_CODES[err]: [op.partition.id for op, _ in parts]
for err, parts in parts_by_error.iteritems()})
for err, parts in iteritems(parts_by_error)})

time.sleep(i * (self._offsets_channel_backoff_ms / 1000))

Expand Down Expand Up @@ -488,7 +489,7 @@ def _handle_success(parts):
for i in xrange(self._offsets_reset_max_retries):
# group partitions by leader
by_leader = defaultdict(list)
for partition, offset in owned_partition_offsets.iteritems():
for partition, offset in iteritems(owned_partition_offsets):
# acquire lock for each partition to stop fetching during offset
# reset
if partition.fetch_lock.acquire(True):
Expand All @@ -498,7 +499,7 @@ def _handle_success(parts):
by_leader[partition.partition.leader].append((partition, offset))

# get valid offset ranges for each partition
for broker, offsets in by_leader.iteritems():
for broker, offsets in iteritems(by_leader):
reqs = [owned_partition.build_offset_request(offset)
for owned_partition, offset in offsets]
response = broker.request_offset_limits(reqs)
Expand All @@ -517,11 +518,11 @@ def _handle_success(parts):
log.error("Error resetting offsets for topic %s (errors: %s)",
self._topic.name,
{ERROR_CODES[err]: [op.partition.id for op, _ in parts]
for err, parts in parts_by_error.iteritems()})
for err, parts in iteritems(parts_by_error)})

time.sleep(i * (self._offsets_channel_backoff_ms / 1000))

for errcode, owned_partitions in parts_by_error.iteritems():
for errcode, owned_partitions in iteritems(parts_by_error):
if errcode != 0:
for owned_partition in owned_partitions:
owned_partition.fetch_lock.release()
Expand Down Expand Up @@ -554,7 +555,7 @@ def _handle_success(parts):
owned_partition.partition.id,
owned_partition.message_count)

for broker, owned_partitions in self._partitions_by_leader.iteritems():
for broker, owned_partitions in iteritems(self._partitions_by_leader):
partition_reqs = {}
for owned_partition in owned_partitions:
# attempt to acquire lock, just pass if we can't
Expand All @@ -571,7 +572,7 @@ def _handle_success(parts):
if partition_reqs:
try:
response = broker.fetch_messages(
[a for a in partition_reqs.itervalues() if a],
[a for a in itervalues(partition_reqs) if a],
timeout=self._fetch_wait_max_ms,
min_bytes=self._fetch_min_bytes
)
Expand Down
26 changes: 17 additions & 9 deletions pykafka/test/kafka_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,14 @@ class KafkaInstance(ManagedInstance):
def __init__(self,
num_instances=1,
kafka_version='0.8.2.1',
scala_version='2.10',
bin_dir='/tmp/kafka-bin',
name='kafka',
use_gevent=False):
"""Start kafkainstace with given settings"""
self._num_instances = num_instances
self._kafka_version = kafka_version
self._scala_version = scala_version
self._bin_dir = bin_dir
self._processes = []
self.zookeeper = None
Expand Down Expand Up @@ -174,9 +176,15 @@ def _download_kafka(self):
log.info('Downloading Kafka.')
curr_dir = os.getcwd()
os.chdir(self._bin_dir)
url = 'http://mirror.reverse.net/pub/apache/kafka/{version}/kafka_2.10-{version}.tgz'.format(version=self._kafka_version)
url_fmt = 'http://mirror.reverse.net/pub/apache/kafka/{kafka_version}/kafka_{scala_version}-{kafka_version}.tgz'
url = url_fmt.format(
scala_version=self._scala_version,
kafka_version=self._kafka_version
)
p1 = subprocess.Popen(['curl', '-vs', url], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['tar', 'xvz', '-C', self._bin_dir, '--strip-components', '1'], stdin=p1.stdout, stdout=subprocess.PIPE)
p2 = subprocess.Popen(['tar', 'xvz', '-C', self._bin_dir,
'--strip-components', '1'],
stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
output, err = p2.communicate()
os.chdir(curr_dir)
Expand All @@ -189,7 +197,7 @@ def _is_port_free(self, port):
s = socket.create_connection(('localhost', port))
s.close()
return False
except IOError, err:
except IOError as err:
return err.errno == errno.ECONNREFUSED

def _port_generator(self, start):
Expand Down Expand Up @@ -341,18 +349,18 @@ def produce_messages(self, topic_name, messages):
def _catch_sigint(signum, frame):
global _exiting
_exiting = True
print 'SIGINT received.'
print('SIGINT received.')
signal.signal(signal.SIGINT, _catch_sigint)

cluster = KafkaInstance(num_instances=args.num_brokers,
kafka_version=args.kafka_version,
bin_dir=args.download_dir)
print 'Cluster started.'
print 'Brokers: {brokers}'.format(brokers=cluster.brokers)
print 'Zookeeper: {zk}'.format(zk=cluster.zookeeper)
print 'Waiting for SIGINT to exit.'
print('Cluster started.')
print('Brokers: {brokers}'.format(brokers=cluster.brokers))
print('Zookeeper: {zk}'.format(zk=cluster.zookeeper))
print('Waiting for SIGINT to exit.')
while True:
if _exiting:
print 'Exiting.'
print('Exiting.')
sys.exit(0)
time.sleep(1)
10 changes: 10 additions & 0 deletions pykafka/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

from pykafka.test.kafka_instance import KafkaInstance, KafkaConnection

try:
import unittest2 as unittest
except ImportError:
import unittest

try:
from unittest import mock
except ImportError:
import mock


def get_cluster():
"""Gets a Kafka cluster for testing, using one already running is possible.
Expand Down
19 changes: 18 additions & 1 deletion pykafka/utils/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,30 @@

PY3 = sys.version_info[0] >= 3


if PY3:
from threading import Semaphore
from queue import Queue, Empty
from io import BytesIO as StringIO
range = range

def iteritems(d, **kw):
return iter(d.items(**kw))

def itervalues(d, **kw):
return iter(d.values(**kw))
else:
range = xrange
from threading import Condition, Lock
# could use monotonic.monotonic() backport as well here...
from time import time as _time
from Queue import Queue, Empty
from StringIO import StringIO

def iteritems(d, **kw):
return iter(d.iteritems(**kw))

def itervalues(d, **kw):
return iter(d.itervalues(**kw))

# -- begin unmodified backport of threading.Semaphore from Python 3.4 -- #
class Semaphore:
Expand Down
2 changes: 1 addition & 1 deletion pykafka/utils/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import logging
import struct

from cStringIO import StringIO
from .compat import StringIO

try:
import snappy
Expand Down
29 changes: 29 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,32 @@
logging-clear-handlers = 1
verbosity = 2
detailed-errors = 1

[pytest]
norecursedirs = build docs/_build *.egg .tox *.venv requirements/
addopts =
# Shows a line for every test
# You probably want to turn this off if you use pytest-sugar.
# Or you can keep it and run `py.test -q`.
--verbose

# Shorter tracebacks are sometimes easier to read
# --tb=short

# Turn on --capture to have brief, less noisy output.
# You will only see output if the test fails.
# Use --capture no (same as -s) if you want to see it all or have problems
# debugging.
# --capture=fd
# --capture=no

# Show extra test summary info as specified by chars (f)ailed, (E)error, (s)skipped, (x)failed, (X)passed.
-rfEsxX

# FIXME: This is commented out for now while doing TDD
# Measure code coverage
--cov=pykafka --cov-report=xml --cov-report=term-missing

# Previous versions included the following, but it's a bad idea because it
# hard-codes the value and makes it hard to change from the command-line
# tests/
Loading

0 comments on commit 868656d

Please sign in to comment.