Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
More work towards getting py34 compliant
Browse files Browse the repository at this point in the history
  • Loading branch information
sontek committed Aug 20, 2015
1 parent 868656d commit eac55b9
Show file tree
Hide file tree
Showing 18 changed files with 179 additions and 100 deletions.
8 changes: 4 additions & 4 deletions pykafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse,
ProduceResponse)

from .utils.compat import range, iteritems

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -274,7 +274,7 @@ def request_metadata(self, topics=None):
:type topics: Iterable of int
"""
max_retries = 3
for i in xrange(max_retries):
for i in range(max_retries):
if i > 0:
log.debug("Retrying")
time.sleep(i)
Expand All @@ -283,11 +283,11 @@ def request_metadata(self, topics=None):
response = future.get(MetadataResponse)

errored = False
for name, topic_metadata in response.topics.iteritems():
for name, topic_metadata in iteritems(response.topics):
if topic_metadata.err == LeaderNotAvailable.ERROR_CODE:
log.warning("Leader not available.")
errored = True
for pid, partition_metadata in topic_metadata.partitions.iteritems():
for pid, partition_metadata in iteritems(topic_metadata.partitions):
if partition_metadata.err == LeaderNotAvailable.ERROR_CODE:
log.warning("Leader not available.")
errored = True
Expand Down
21 changes: 14 additions & 7 deletions pykafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
UnknownTopicOrPartition)
from .protocol import ConsumerMetadataRequest, ConsumerMetadataResponse
from .topic import Topic

from .utils.compat import iteritems, range

log = logging.getLogger(__name__)

Expand All @@ -43,6 +43,10 @@ def __init__(self, cluster, *args, **kwargs):

def __missing__(self, key):
log.warning('Topic %s not found. Attempting to auto-create.', key)

if hasattr(key, 'encode'):
key = key.encode('utf-8')

if self._create_topic(key):
return self[key]
else:
Expand All @@ -56,16 +60,19 @@ def _create_topic(self, topic_name):
with settings and everything, we'll implement that. To expose just
this now would be disingenuous, since it's features would be hobbled.
"""
if hasattr(topic_name, 'encode'):
topic_name = topic_name.encode('utf-8')

if len(self._cluster.brokers) == 0:
log.warning("No brokers found. This is probably because of "
"KAFKA-2154, which will be fixed in Kafka 0.8.3")
raise KafkaException("Unable to retrieve metdata. Can't auto-create topic. See log for details.")
# Auto-creating will take a moment, so we try 5 times.
for i in xrange(5):
for i in range(5):
# Auto-creating is as simple as issuing a metadata request
# solely for that topic. The update is just to be sure
# our `Cluster` knows about it.
self._cluster.brokers[self._cluster.brokers.keys()[0]].request_metadata(topics=[topic_name])
self._cluster.brokers[list(self._cluster.brokers.keys())[0]].request_metadata(topics=[topic_name])
self._cluster.update()
if topic_name in self:
log.info('Topic %s successfully auto-created.', topic_name)
Expand Down Expand Up @@ -189,7 +196,7 @@ def _update_brokers(self, broker_metadata):
# Add/update current brokers
if len(broker_metadata) > 0:
log.info('Discovered %d brokers', len(broker_metadata))
for id_, meta in broker_metadata.iteritems():
for id_, meta in iteritems(broker_metadata):
if id_ not in self._brokers:
log.debug('Discovered broker id %s: %s:%s', id_, meta.host, meta.port)
self._brokers[id_] = Broker.from_metadata(
Expand Down Expand Up @@ -225,7 +232,7 @@ def _update_topics(self, metadata):
# Add/update partition information
if len(metadata) > 0:
log.info("Discovered %d topics", len(metadata))
for name, meta in metadata.iteritems():
for name, meta in iteritems(metadata):
if not self._should_exclude_topic(name):
if name not in self._topics:
self._topics[name] = Topic(self, meta)
Expand All @@ -237,7 +244,7 @@ def _should_exclude_topic(self, topic_name):
"""Should this topic be excluded from the list shown to the client?"""
if not self._exclude_internal_topics:
return False
return topic_name.startswith("__")
return topic_name.startswith(b"__")

def get_offset_manager(self, consumer_group):
"""Get the broker designated as the offset manager for this consumer group.
Expand All @@ -254,7 +261,7 @@ def get_offset_manager(self, consumer_group):
broker = self.brokers[random.choice(self.brokers.keys())]
MAX_RETRIES = 5

for i in xrange(MAX_RETRIES):
for i in range(MAX_RETRIES):
if i > 0:
log.debug("Retrying offset manager discovery")
time.sleep(i * 2)
Expand Down
1 change: 1 addition & 0 deletions pykafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from .exceptions import SocketDisconnectedError
from .utils.socket import recvall_into
from .utils.compat import buffer

log = logging.getLogger(__name__)

Expand Down
1 change: 1 addition & 0 deletions pykafka/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def worker():
task = self._requests.get()
try:
self.connection.request(task.request)

if task.future:
res = self.connection.response()
task.future.set_response(res)
Expand Down
11 changes: 7 additions & 4 deletions pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
)
from .partitioners import random_partitioner
from .protocol import Message, ProduceRequest

from .utils.compat import string_types, get_bytes

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -190,14 +190,17 @@ def _partition_messages(self, messages):
:param messages: Iterable of messages to publish.
:returns: Generator of ((key, value), partition_id)
"""
partitions = self._topic.partitions.values()
partitions = list(self._topic.partitions.values())

for message in messages:
if isinstance(message, basestring):
if isinstance(message, string_types):
key = None
value = message
else:
key, value = message
value = str(value)

value = get_bytes(value)

yield (key, value), self._partitioner(partitions, message).id

def _produce(self, message_partition_tups, attempt):
Expand Down
92 changes: 61 additions & 31 deletions pykafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from .common import CompressionType, Message
from .exceptions import ERROR_CODES, NoMessagesConsumedError
from .utils import Serializable, compression, struct_helpers
from .utils.compat import iteritems, itervalues, buffer, get_bytes


log = logging.getLogger(__name__)
Expand All @@ -71,7 +72,7 @@
class Request(Serializable):
"""Base class for all Requests. Handles writing header information"""
HEADER_LEN = 21 # constant for all messages
CLIENT_ID = 'pykafka'
CLIENT_ID = b'pykafka'

def _write_header(self, buff, api_version=0, correlation_id=0):
"""Write the header for an outgoing message.
Expand Down Expand Up @@ -191,12 +192,15 @@ def pack_into(self, buff, offset):
if self.partition_key is None:
fmt = '!BBii%ds' % len(self.value)
args = (self.MAGIC, self.compression_type, -1,
len(self.value), self.value)
len(self.value), get_bytes(self.value))
else:
fmt = '!BBi%dsi%ds' % (len(self.partition_key), len(self.value))
args = (self.MAGIC, self.compression_type,
len(self.partition_key), self.partition_key,
len(self.value), self.value)
len(self.partition_key),
get_bytes(self.partition_key),
len(self.value),
get_bytes(self.value))

struct.pack_into(fmt, buff, offset + 4, *args)
fmt_size = struct.calcsize(fmt)
crc = crc32(buffer(buff[(offset + 4):(offset + 4 + fmt_size)]))
Expand Down Expand Up @@ -334,7 +338,16 @@ def __init__(self, topics=None):
:param topics: Topics to query. Leave empty for all available topics.
"""
self.topics = topics or []
self._topics = topics or []

@property
def topics(self):
if self._topics:
return [
t.encode('utf-8') for t in self._topics if hasattr(t, 'encode')
]

return self._topics

def __len__(self):
"""Length of the serialized message, in bytes"""
Expand Down Expand Up @@ -454,11 +467,11 @@ def __init__(self,
def __len__(self):
"""Length of the serialized message, in bytes"""
size = self.HEADER_LEN + 2 + 4 + 4 # acks + timeout + len(topics)
for topic, parts in self.msets.iteritems():
for topic, parts in iteritems(self.msets):
# topic name
size += 2 + len(topic) + 4 # topic name + len(parts)
# partition + mset size + len(mset)
size += sum(4 + 4 + len(mset) for mset in parts.itervalues())
size += sum(4 + 4 + len(mset) for mset in itervalues(parts))
return size

@property
Expand All @@ -471,8 +484,8 @@ def messages(self):
"""Iterable of all messages in the Request"""
return itertools.chain.from_iterable(
mset.messages
for topic, partitions in self.msets.iteritems()
for partition_id, mset in partitions.iteritems()
for topic, partitions in iteritems(self.msets)
for partition_id, mset in iteritems(partitions)
)

def add_message(self, message, topic_name, partition_id):
Expand All @@ -497,11 +510,13 @@ def get_bytes(self):
struct.pack_into('!hii', output, offset,
self.required_acks, self.timeout, len(self.msets))
offset += 10
for topic_name, partitions in self.msets.iteritems():
for topic_name, partitions in iteritems(self.msets):
topic_name = get_bytes(topic_name)
fmt = '!h%dsi' % len(topic_name)
struct.pack_into(fmt, output, offset, len(topic_name), topic_name, len(partitions))
struct.pack_into(fmt, output, offset, len(topic_name),
topic_name, len(partitions))
offset += struct.calcsize(fmt)
for partition_id, message_set in partitions.iteritems():
for partition_id, message_set in iteritems(partitions):
mset_len = len(message_set)
struct.pack_into('!ii', output, offset, partition_id, mset_len)
offset += 8
Expand Down Expand Up @@ -617,7 +632,7 @@ def __len__(self):
"""Length of the serialized message, in bytes"""
# replica + max wait + min bytes + len(topics)
size = self.HEADER_LEN + 4 + 4 + 4 + 4
for topic, parts in self._reqs.iteritems():
for topic, parts in iteritems(self._reqs):
# topic name + len(parts)
size += 2 + len(topic) + 4
# partition + fetch offset + max bytes => for each partition
Expand All @@ -641,11 +656,15 @@ def get_bytes(self):
struct.pack_into('!iiii', output, offset,
-1, self.timeout, self.min_bytes, len(self._reqs))
offset += 16
for topic_name, partitions in self._reqs.iteritems():
for topic_name, partitions in iteritems(self._reqs):
topic_name = get_bytes(topic_name)
fmt = '!h%dsi' % len(topic_name)
struct.pack_into(fmt, output, offset, len(topic_name), topic_name, len(partitions))
struct.pack_into(
fmt, output, offset, len(topic_name), topic_name,
len(partitions)
)
offset += struct.calcsize(fmt)
for partition_id, (fetch_offset, max_bytes) in partitions.iteritems():
for partition_id, (fetch_offset, max_bytes) in iteritems(partitions):
struct.pack_into('!iqi', output, offset,
partition_id, fetch_offset, max_bytes)
offset += 16
Expand Down Expand Up @@ -749,7 +768,7 @@ def __len__(self):
"""Length of the serialized message, in bytes"""
# Header + replicaId + len(topics)
size = self.HEADER_LEN + 4 + 4
for topic, parts in self._reqs.iteritems():
for topic, parts in iteritems(self._reqs):
# topic name + len(parts)
size += 2 + len(topic) + 4
# partition + fetch offset + max bytes => for each partition
Expand All @@ -772,12 +791,13 @@ def get_bytes(self):
offset = self.HEADER_LEN
struct.pack_into('!ii', output, offset, -1, len(self._reqs))
offset += 8
for topic_name, partitions in self._reqs.iteritems():
for topic_name, partitions in iteritems(self._reqs):
fmt = '!h%dsi' % len(topic_name)
struct.pack_into(fmt, output, offset, len(topic_name),
topic_name, len(partitions))
get_bytes(topic_name), len(partitions))

offset += struct.calcsize(fmt)
for pnum, (offsets_before, max_offsets) in partitions.iteritems():
for pnum, (offsets_before, max_offsets) in iteritems(partitions):
struct.pack_into('!iqi', output, offset,
pnum, offsets_before, max_offsets)
offset += 16
Expand Down Expand Up @@ -824,7 +844,7 @@ class ConsumerMetadataRequest(Request):
"""
def __init__(self, consumer_group):
"""Create a new consumer metadata request"""
self.consumer_group = consumer_group
self.consumer_group = get_bytes(consumer_group)

def __len__(self):
"""Length of the serialized message, in bytes"""
Expand Down Expand Up @@ -933,13 +953,13 @@ def __len__(self):
size = self.HEADER_LEN + 2 + len(self.consumer_group)
# + generation id + string size + consumer_id size + array length
size += 4 + 2 + len(self.consumer_id) + 4
for topic, parts in self._reqs.iteritems():
for topic, parts in iteritems(self._reqs):
# topic name + len(parts)
size += 2 + len(topic) + 4
# partition + offset + timestamp => for each partition
size += (4 + 8 + 8) * len(parts)
# metadata => for each partition
for partition, (_, _, metadata) in parts.iteritems():
for partition, (_, _, metadata) in iteritems(parts):
size += 2 + len(metadata)
return size

Expand All @@ -958,19 +978,27 @@ def get_bytes(self):
self._write_header(output, api_version=1)
offset = self.HEADER_LEN
fmt = '!h%dsih%dsi' % (len(self.consumer_group), len(self.consumer_id))

consumer_group = get_bytes(self.consumer_group)
consumer_id = get_bytes(self.consumer_id)

struct.pack_into(fmt, output, offset,
len(self.consumer_group), self.consumer_group,
len(consumer_group),
consumer_group,
self.consumer_group_generation_id,
len(self.consumer_id), self.consumer_id,
len(consumer_id), consumer_id,
len(self._reqs))

offset += struct.calcsize(fmt)
for topic_name, partitions in self._reqs.iteritems():
for topic_name, partitions in iteritems(self._reqs):
topic_name = get_bytes(topic_name)
fmt = '!h%dsi' % len(topic_name)
struct.pack_into(fmt, output, offset, len(topic_name),
topic_name, len(partitions))
offset += struct.calcsize(fmt)
for pnum, (poffset, timestamp, metadata) in partitions.iteritems():
for pnum, (poffset, timestamp, metadata) in iteritems(partitions):
fmt = '!iqq'
metadata = get_bytes(metadata)
struct.pack_into(fmt, output, offset,
pnum, poffset, timestamp)
offset += struct.calcsize(fmt)
Expand All @@ -980,6 +1008,7 @@ def get_bytes(self):
if metalen != -1:
fmt += '%ds' % metalen
pack_args = [fmt, output, offset, metalen, metadata]

struct.pack_into(*pack_args)
offset += struct.calcsize(fmt)
return output
Expand Down Expand Up @@ -1054,7 +1083,7 @@ def __len__(self):
"""Length of the serialized message, in bytes"""
# Header + consumer group + len(topics)
size = self.HEADER_LEN + 2 + len(self.consumer_group) + 4
for topic, parts in self._reqs.iteritems():
for topic, parts in iteritems(self._reqs):
# topic name + len(parts)
size += 2 + len(topic) + 4
# partition => for each partition
Expand All @@ -1077,13 +1106,14 @@ def get_bytes(self):
offset = self.HEADER_LEN
fmt = '!h%dsi' % len(self.consumer_group)
struct.pack_into(fmt, output, offset,
len(self.consumer_group), self.consumer_group,
len(self.consumer_group),
get_bytes(self.consumer_group),
len(self._reqs))
offset += struct.calcsize(fmt)
for topic_name, partitions in self._reqs.iteritems():
for topic_name, partitions in iteritems(self._reqs):
fmt = '!h%dsi' % len(topic_name)
struct.pack_into(fmt, output, offset, len(topic_name),
topic_name, len(partitions))
get_bytes(topic_name), len(partitions))
offset += struct.calcsize(fmt)
for pnum in partitions:
fmt = '!i'
Expand Down
Loading

0 comments on commit eac55b9

Please sign in to comment.