diff --git a/pykafka/broker.py b/pykafka/broker.py index 547852ca9..6896d0e6f 100644 --- a/pykafka/broker.py +++ b/pykafka/broker.py @@ -29,7 +29,7 @@ OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceResponse) - +from .utils.compat import range, iteritems log = logging.getLogger(__name__) @@ -274,7 +274,7 @@ def request_metadata(self, topics=None): :type topics: Iterable of int """ max_retries = 3 - for i in xrange(max_retries): + for i in range(max_retries): if i > 0: log.debug("Retrying") time.sleep(i) @@ -283,11 +283,11 @@ def request_metadata(self, topics=None): response = future.get(MetadataResponse) errored = False - for name, topic_metadata in response.topics.iteritems(): + for name, topic_metadata in iteritems(response.topics): if topic_metadata.err == LeaderNotAvailable.ERROR_CODE: log.warning("Leader not available.") errored = True - for pid, partition_metadata in topic_metadata.partitions.iteritems(): + for pid, partition_metadata in iteritems(topic_metadata.partitions): if partition_metadata.err == LeaderNotAvailable.ERROR_CODE: log.warning("Leader not available.") errored = True diff --git a/pykafka/cluster.py b/pykafka/cluster.py index 2ca8368b4..e56dc6f6f 100644 --- a/pykafka/cluster.py +++ b/pykafka/cluster.py @@ -29,7 +29,7 @@ UnknownTopicOrPartition) from .protocol import ConsumerMetadataRequest, ConsumerMetadataResponse from .topic import Topic - +from .utils.compat import iteritems, range log = logging.getLogger(__name__) @@ -43,6 +43,10 @@ def __init__(self, cluster, *args, **kwargs): def __missing__(self, key): log.warning('Topic %s not found. Attempting to auto-create.', key) + + if hasattr(key, 'encode'): + key = key.encode('utf-8') + if self._create_topic(key): return self[key] else: @@ -56,16 +60,19 @@ def _create_topic(self, topic_name): with settings and everything, we'll implement that. To expose just this now would be disingenuous, since it's features would be hobbled. """ + if hasattr(topic_name, 'encode'): + topic_name = topic_name.encode('utf-8') + if len(self._cluster.brokers) == 0: log.warning("No brokers found. This is probably because of " "KAFKA-2154, which will be fixed in Kafka 0.8.3") raise KafkaException("Unable to retrieve metdata. Can't auto-create topic. See log for details.") # Auto-creating will take a moment, so we try 5 times. - for i in xrange(5): + for i in range(5): # Auto-creating is as simple as issuing a metadata request # solely for that topic. The update is just to be sure # our `Cluster` knows about it. - self._cluster.brokers[self._cluster.brokers.keys()[0]].request_metadata(topics=[topic_name]) + self._cluster.brokers[list(self._cluster.brokers.keys())[0]].request_metadata(topics=[topic_name]) self._cluster.update() if topic_name in self: log.info('Topic %s successfully auto-created.', topic_name) @@ -189,7 +196,7 @@ def _update_brokers(self, broker_metadata): # Add/update current brokers if len(broker_metadata) > 0: log.info('Discovered %d brokers', len(broker_metadata)) - for id_, meta in broker_metadata.iteritems(): + for id_, meta in iteritems(broker_metadata): if id_ not in self._brokers: log.debug('Discovered broker id %s: %s:%s', id_, meta.host, meta.port) self._brokers[id_] = Broker.from_metadata( @@ -225,7 +232,7 @@ def _update_topics(self, metadata): # Add/update partition information if len(metadata) > 0: log.info("Discovered %d topics", len(metadata)) - for name, meta in metadata.iteritems(): + for name, meta in iteritems(metadata): if not self._should_exclude_topic(name): if name not in self._topics: self._topics[name] = Topic(self, meta) @@ -237,7 +244,7 @@ def _should_exclude_topic(self, topic_name): """Should this topic be excluded from the list shown to the client?""" if not self._exclude_internal_topics: return False - return topic_name.startswith("__") + return topic_name.startswith(b"__") def get_offset_manager(self, consumer_group): """Get the broker designated as the offset manager for this consumer group. @@ -254,7 +261,7 @@ def get_offset_manager(self, consumer_group): broker = self.brokers[random.choice(self.brokers.keys())] MAX_RETRIES = 5 - for i in xrange(MAX_RETRIES): + for i in range(MAX_RETRIES): if i > 0: log.debug("Retrying offset manager discovery") time.sleep(i * 2) diff --git a/pykafka/connection.py b/pykafka/connection.py index 5bcee51f2..905bf60d7 100644 --- a/pykafka/connection.py +++ b/pykafka/connection.py @@ -24,6 +24,7 @@ from .exceptions import SocketDisconnectedError from .utils.socket import recvall_into +from .utils.compat import buffer log = logging.getLogger(__name__) diff --git a/pykafka/handlers.py b/pykafka/handlers.py index 1dae43ea4..d6e340e12 100644 --- a/pykafka/handlers.py +++ b/pykafka/handlers.py @@ -126,6 +126,7 @@ def worker(): task = self._requests.get() try: self.connection.request(task.request) + if task.future: res = self.connection.response() task.future.set_response(res) diff --git a/pykafka/producer.py b/pykafka/producer.py index fec37a26e..329837091 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -31,7 +31,7 @@ ) from .partitioners import random_partitioner from .protocol import Message, ProduceRequest - +from .utils.compat import string_types, get_bytes log = logging.getLogger(__name__) @@ -190,14 +190,17 @@ def _partition_messages(self, messages): :param messages: Iterable of messages to publish. :returns: Generator of ((key, value), partition_id) """ - partitions = self._topic.partitions.values() + partitions = list(self._topic.partitions.values()) + for message in messages: - if isinstance(message, basestring): + if isinstance(message, string_types): key = None value = message else: key, value = message - value = str(value) + + value = get_bytes(value) + yield (key, value), self._partitioner(partitions, message).id def _produce(self, message_partition_tups, attempt): diff --git a/pykafka/protocol.py b/pykafka/protocol.py index 04b52d2c0..13b120022 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -63,6 +63,7 @@ from .common import CompressionType, Message from .exceptions import ERROR_CODES, NoMessagesConsumedError from .utils import Serializable, compression, struct_helpers +from .utils.compat import iteritems, itervalues, buffer, get_bytes log = logging.getLogger(__name__) @@ -71,7 +72,7 @@ class Request(Serializable): """Base class for all Requests. Handles writing header information""" HEADER_LEN = 21 # constant for all messages - CLIENT_ID = 'pykafka' + CLIENT_ID = b'pykafka' def _write_header(self, buff, api_version=0, correlation_id=0): """Write the header for an outgoing message. @@ -191,12 +192,15 @@ def pack_into(self, buff, offset): if self.partition_key is None: fmt = '!BBii%ds' % len(self.value) args = (self.MAGIC, self.compression_type, -1, - len(self.value), self.value) + len(self.value), get_bytes(self.value)) else: fmt = '!BBi%dsi%ds' % (len(self.partition_key), len(self.value)) args = (self.MAGIC, self.compression_type, - len(self.partition_key), self.partition_key, - len(self.value), self.value) + len(self.partition_key), + get_bytes(self.partition_key), + len(self.value), + get_bytes(self.value)) + struct.pack_into(fmt, buff, offset + 4, *args) fmt_size = struct.calcsize(fmt) crc = crc32(buffer(buff[(offset + 4):(offset + 4 + fmt_size)])) @@ -334,7 +338,16 @@ def __init__(self, topics=None): :param topics: Topics to query. Leave empty for all available topics. """ - self.topics = topics or [] + self._topics = topics or [] + + @property + def topics(self): + if self._topics: + return [ + t.encode('utf-8') for t in self._topics if hasattr(t, 'encode') + ] + + return self._topics def __len__(self): """Length of the serialized message, in bytes""" @@ -454,11 +467,11 @@ def __init__(self, def __len__(self): """Length of the serialized message, in bytes""" size = self.HEADER_LEN + 2 + 4 + 4 # acks + timeout + len(topics) - for topic, parts in self.msets.iteritems(): + for topic, parts in iteritems(self.msets): # topic name size += 2 + len(topic) + 4 # topic name + len(parts) # partition + mset size + len(mset) - size += sum(4 + 4 + len(mset) for mset in parts.itervalues()) + size += sum(4 + 4 + len(mset) for mset in itervalues(parts)) return size @property @@ -471,8 +484,8 @@ def messages(self): """Iterable of all messages in the Request""" return itertools.chain.from_iterable( mset.messages - for topic, partitions in self.msets.iteritems() - for partition_id, mset in partitions.iteritems() + for topic, partitions in iteritems(self.msets) + for partition_id, mset in iteritems(partitions) ) def add_message(self, message, topic_name, partition_id): @@ -497,11 +510,13 @@ def get_bytes(self): struct.pack_into('!hii', output, offset, self.required_acks, self.timeout, len(self.msets)) offset += 10 - for topic_name, partitions in self.msets.iteritems(): + for topic_name, partitions in iteritems(self.msets): + topic_name = get_bytes(topic_name) fmt = '!h%dsi' % len(topic_name) - struct.pack_into(fmt, output, offset, len(topic_name), topic_name, len(partitions)) + struct.pack_into(fmt, output, offset, len(topic_name), + topic_name, len(partitions)) offset += struct.calcsize(fmt) - for partition_id, message_set in partitions.iteritems(): + for partition_id, message_set in iteritems(partitions): mset_len = len(message_set) struct.pack_into('!ii', output, offset, partition_id, mset_len) offset += 8 @@ -617,7 +632,7 @@ def __len__(self): """Length of the serialized message, in bytes""" # replica + max wait + min bytes + len(topics) size = self.HEADER_LEN + 4 + 4 + 4 + 4 - for topic, parts in self._reqs.iteritems(): + for topic, parts in iteritems(self._reqs): # topic name + len(parts) size += 2 + len(topic) + 4 # partition + fetch offset + max bytes => for each partition @@ -641,11 +656,15 @@ def get_bytes(self): struct.pack_into('!iiii', output, offset, -1, self.timeout, self.min_bytes, len(self._reqs)) offset += 16 - for topic_name, partitions in self._reqs.iteritems(): + for topic_name, partitions in iteritems(self._reqs): + topic_name = get_bytes(topic_name) fmt = '!h%dsi' % len(topic_name) - struct.pack_into(fmt, output, offset, len(topic_name), topic_name, len(partitions)) + struct.pack_into( + fmt, output, offset, len(topic_name), topic_name, + len(partitions) + ) offset += struct.calcsize(fmt) - for partition_id, (fetch_offset, max_bytes) in partitions.iteritems(): + for partition_id, (fetch_offset, max_bytes) in iteritems(partitions): struct.pack_into('!iqi', output, offset, partition_id, fetch_offset, max_bytes) offset += 16 @@ -749,7 +768,7 @@ def __len__(self): """Length of the serialized message, in bytes""" # Header + replicaId + len(topics) size = self.HEADER_LEN + 4 + 4 - for topic, parts in self._reqs.iteritems(): + for topic, parts in iteritems(self._reqs): # topic name + len(parts) size += 2 + len(topic) + 4 # partition + fetch offset + max bytes => for each partition @@ -772,12 +791,13 @@ def get_bytes(self): offset = self.HEADER_LEN struct.pack_into('!ii', output, offset, -1, len(self._reqs)) offset += 8 - for topic_name, partitions in self._reqs.iteritems(): + for topic_name, partitions in iteritems(self._reqs): fmt = '!h%dsi' % len(topic_name) struct.pack_into(fmt, output, offset, len(topic_name), - topic_name, len(partitions)) + get_bytes(topic_name), len(partitions)) + offset += struct.calcsize(fmt) - for pnum, (offsets_before, max_offsets) in partitions.iteritems(): + for pnum, (offsets_before, max_offsets) in iteritems(partitions): struct.pack_into('!iqi', output, offset, pnum, offsets_before, max_offsets) offset += 16 @@ -824,7 +844,7 @@ class ConsumerMetadataRequest(Request): """ def __init__(self, consumer_group): """Create a new consumer metadata request""" - self.consumer_group = consumer_group + self.consumer_group = get_bytes(consumer_group) def __len__(self): """Length of the serialized message, in bytes""" @@ -933,13 +953,13 @@ def __len__(self): size = self.HEADER_LEN + 2 + len(self.consumer_group) # + generation id + string size + consumer_id size + array length size += 4 + 2 + len(self.consumer_id) + 4 - for topic, parts in self._reqs.iteritems(): + for topic, parts in iteritems(self._reqs): # topic name + len(parts) size += 2 + len(topic) + 4 # partition + offset + timestamp => for each partition size += (4 + 8 + 8) * len(parts) # metadata => for each partition - for partition, (_, _, metadata) in parts.iteritems(): + for partition, (_, _, metadata) in iteritems(parts): size += 2 + len(metadata) return size @@ -958,19 +978,27 @@ def get_bytes(self): self._write_header(output, api_version=1) offset = self.HEADER_LEN fmt = '!h%dsih%dsi' % (len(self.consumer_group), len(self.consumer_id)) + + consumer_group = get_bytes(self.consumer_group) + consumer_id = get_bytes(self.consumer_id) + struct.pack_into(fmt, output, offset, - len(self.consumer_group), self.consumer_group, + len(consumer_group), + consumer_group, self.consumer_group_generation_id, - len(self.consumer_id), self.consumer_id, + len(consumer_id), consumer_id, len(self._reqs)) + offset += struct.calcsize(fmt) - for topic_name, partitions in self._reqs.iteritems(): + for topic_name, partitions in iteritems(self._reqs): + topic_name = get_bytes(topic_name) fmt = '!h%dsi' % len(topic_name) struct.pack_into(fmt, output, offset, len(topic_name), topic_name, len(partitions)) offset += struct.calcsize(fmt) - for pnum, (poffset, timestamp, metadata) in partitions.iteritems(): + for pnum, (poffset, timestamp, metadata) in iteritems(partitions): fmt = '!iqq' + metadata = get_bytes(metadata) struct.pack_into(fmt, output, offset, pnum, poffset, timestamp) offset += struct.calcsize(fmt) @@ -980,6 +1008,7 @@ def get_bytes(self): if metalen != -1: fmt += '%ds' % metalen pack_args = [fmt, output, offset, metalen, metadata] + struct.pack_into(*pack_args) offset += struct.calcsize(fmt) return output @@ -1054,7 +1083,7 @@ def __len__(self): """Length of the serialized message, in bytes""" # Header + consumer group + len(topics) size = self.HEADER_LEN + 2 + len(self.consumer_group) + 4 - for topic, parts in self._reqs.iteritems(): + for topic, parts in iteritems(self._reqs): # topic name + len(parts) size += 2 + len(topic) + 4 # partition => for each partition @@ -1077,13 +1106,14 @@ def get_bytes(self): offset = self.HEADER_LEN fmt = '!h%dsi' % len(self.consumer_group) struct.pack_into(fmt, output, offset, - len(self.consumer_group), self.consumer_group, + len(self.consumer_group), + get_bytes(self.consumer_group), len(self._reqs)) offset += struct.calcsize(fmt) - for topic_name, partitions in self._reqs.iteritems(): + for topic_name, partitions in iteritems(self._reqs): fmt = '!h%dsi' % len(topic_name) struct.pack_into(fmt, output, offset, len(topic_name), - topic_name, len(partitions)) + get_bytes(topic_name), len(partitions)) offset += struct.calcsize(fmt) for pnum in partitions: fmt = '!i' diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index 769eb8889..e383fb553 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -239,7 +239,7 @@ def topic(self): def partitions(self): """A list of the partitions that this consumer consumes""" return {id_: partition.partition - for id_, partition in iteritems(self._partitions_by_id.iteritems)} + for id_, partition in iteritems(self._partitions_by_id)} @property def held_offsets(self): diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index 0777dc1dc..0a0946374 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -29,6 +29,8 @@ from testinstances import utils from testinstances.exceptions import ProcessNotStartingError from testinstances.managed_instance import ManagedInstance +from pykafka.utils.compat import range + log = logging.getLogger(__name__) @@ -223,7 +225,7 @@ def _start_process(self): # Process is started when the port isn't free anymore all_ports = [zk_port] + broker_ports - for i in xrange(10): + for i in range(10): if all(not self._is_port_free(port) for port in all_ports): log.info('Kafka cluster started.') return # hooray! success @@ -253,8 +255,8 @@ def _start_brokers(self): self._broker_procs = [] ports = self._port_generator(9092) used_ports = [] - for i in xrange(self._num_instances): - port = ports.next() + for i in range(self._num_instances): + port = next(ports) used_ports.append(port) log.info('Starting Kafka on port %i.', port) @@ -279,7 +281,7 @@ def _start_brokers(self): return used_ports def _start_zookeeper(self): - port = self._port_generator(2181).next() + port = next(self._port_generator(2181)) log.info('Starting zookeeper on port %i.', port) conf = os.path.join(self._conf_dir, 'zk.properties') diff --git a/pykafka/topic.py b/pykafka/topic.py index 560763820..e02247209 100644 --- a/pykafka/topic.py +++ b/pykafka/topic.py @@ -26,6 +26,7 @@ from .producer import Producer from .protocol import PartitionOffsetRequest from .simpleconsumer import SimpleConsumer +from .utils.compat import iteritems log = logging.getLogger(__name__) @@ -92,7 +93,7 @@ def fetch_offset_limits(self, offsets_before, max_offsets=1): self.name, part.id, offsets_before, max_offsets )) output = {} - for broker, reqs in requests.iteritems(): + for broker, reqs in iteritems(requests): res = broker.request_offset_limits(reqs) output.update(res.topics[self.name]) return output @@ -125,7 +126,7 @@ def update(self, metadata): brokers = self._cluster.brokers if len(p_metas) > 0: log.info("Adding %d partitions", len(p_metas)) - for id_, meta in p_metas.iteritems(): + for id_, meta in iteritems(p_metas): if meta.id not in self._partitions: log.debug('Adding partition %s/%s', self.name, meta.id) self._partitions[meta.id] = Partition( diff --git a/pykafka/utils/compat.py b/pykafka/utils/compat.py index 288639fdc..912946e52 100644 --- a/pykafka/utils/compat.py +++ b/pykafka/utils/compat.py @@ -4,6 +4,18 @@ PY3 = sys.version_info[0] >= 3 + +def get_bytes(value): + if hasattr(value, 'encode'): + try: + value = value.encode('utf-8') + except: + # if we can't encode the value just pass it along + pass + + return value + + if PY3: from threading import Semaphore from queue import Queue, Empty @@ -15,6 +27,9 @@ def iteritems(d, **kw): def itervalues(d, **kw): return iter(d.values(**kw)) + + buffer = memoryview + string_types = str, else: range = xrange from threading import Condition, Lock @@ -29,6 +44,9 @@ def iteritems(d, **kw): def itervalues(d, **kw): return iter(d.itervalues(**kw)) + buffer = buffer + string_types = basestring, + # -- begin unmodified backport of threading.Semaphore from Python 3.4 -- # class Semaphore: """This class implements semaphore objects. diff --git a/pykafka/utils/compression.py b/pykafka/utils/compression.py index a297a75f3..5d93d00cc 100644 --- a/pykafka/utils/compression.py +++ b/pykafka/utils/compression.py @@ -21,7 +21,7 @@ import logging import struct -from .compat import StringIO +from .compat import StringIO, range, get_bytes, buffer try: import snappy @@ -30,7 +30,7 @@ log = logging.getLogger(__name__) # constants used in snappy xerial encoding/decoding -_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1) +_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' @@ -79,15 +79,20 @@ def encode_snappy(buff, xerial_compatible=False, xerial_blocksize=32 * 1024): Adapted from kafka-python https://github.com/mumrah/kafka-python/pull/127/files """ + buff = get_bytes(buff) + if snappy is None: raise ImportError("Please install python-snappy") if xerial_compatible: def _chunker(): - for i in xrange(0, len(buff), xerial_blocksize): + for i in range(0, len(buff), xerial_blocksize): yield buff[i:i + xerial_blocksize] out = StringIO() - header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat - in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + full_data = list(zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)) + header = b''.join( + [struct.pack('!' + fmt, dat) for fmt, dat in full_data + ]) + out.write(header) for chunk in _chunker(): block = snappy.compress(chunk) @@ -113,14 +118,17 @@ def decode_snappy(buff): raise ImportError("Please install python-snappy") if _detect_xerial_stream(buff): out = StringIO() - body = buffer(buff[16:]) + body = bytes(buffer(buff[16:])) length = len(body) cursor = 0 while cursor < length: block_size = struct.unpack_from('!i', body[cursor:])[0] cursor += 4 end = cursor + block_size - out.write(snappy.decompress(body[cursor:end])) + try: + out.write(snappy.decompress(body[cursor:end])) + except: + import pdb; pdb.set_trace() cursor = end out.seek(0) return out.read() diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index c6dbf7f23..b14ffdc09 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -19,6 +19,7 @@ __all__ = ["unpack_from"] import itertools import struct +from .compat import range, get_bytes def unpack_from(fmt, buff, offset=0): @@ -68,6 +69,7 @@ def _unpack(fmt, buff, offset, count=1): """ items = [] array_fmt = None + buff = get_bytes(buff) for i, ch in enumerate(fmt): if array_fmt is not None: if ch == ']': @@ -110,7 +112,7 @@ def _unpack_array(fmt, buff, offset, count): :type count: int """ output = [] - for i in xrange(count): + for i in range(count): item, offset = _unpack(fmt, buff, offset) output.append(item) if len(fmt) == 1: diff --git a/tests/conftest.py b/tests/conftest.py index df19bd551..5c136b325 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,2 +1,2 @@ import logging -logging.basicConfig() +logging.basicConfig(level=logging.DEBUG) diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka/test_balancedconsumer.py index 6387e681a..ec6f90874 100644 --- a/tests/pykafka/test_balancedconsumer.py +++ b/tests/pykafka/test_balancedconsumer.py @@ -59,8 +59,8 @@ def test_decide_partitions(self): # Set up partitions, cluster, etc num_participants = i + 1 num_partitions = 100 - i - participants = ['test-debian:{p}'.format(p=p) - for p in range(num_participants)] + participants = sorted(['test-debian:{p}'.format(p=p) + for p in range(num_participants)]) cns, topic = buildMockConsumer(num_partitions=num_partitions, num_participants=num_participants) @@ -77,7 +77,9 @@ def test_decide_partitions(self): idx = participants.index(cns._consumer_id) parts_per_consumer = num_partitions / num_participants parts_per_consumer = math.floor(parts_per_consumer) + num_parts = parts_per_consumer + (0 if (idx + 1 > remainder_ppc) else 1) + self.assertEqual(len(partitions), int(num_parts)) # Validate all partitions were assigned once and only once diff --git a/tests/pykafka/test_producer.py b/tests/pykafka/test_producer.py index f29f40ea4..a97489e97 100644 --- a/tests/pykafka/test_producer.py +++ b/tests/pykafka/test_producer.py @@ -20,6 +20,8 @@ def tearDownClass(cls): stop_cluster(cls.kafka) def test_produce(self): + consumer = None + try: # unique bytes, just to be absolutely sure we're not fetching data # produced in a previous test @@ -35,7 +37,8 @@ def test_produce(self): message = consumer.consume() self.assertTrue(message.value == payload) finally: - consumer.stop() + if consumer: + consumer.stop() if __name__ == "__main__": unittest.main() diff --git a/tests/pykafka/test_protocol.py b/tests/pykafka/test_protocol.py index 03f5fa9b2..7cb8c5c0d 100644 --- a/tests/pykafka/test_protocol.py +++ b/tests/pykafka/test_protocol.py @@ -3,6 +3,7 @@ from pykafka import exceptions from pykafka import protocol from pykafka.common import CompressionType +from pykafka.utils.compat import buffer class TestMetadataAPI(unittest.TestCase): @@ -18,30 +19,30 @@ def test_request(self): def test_response(self): cluster = protocol.MetadataResponse( - buffer('\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x00\x00\x04test\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') + buffer(b'\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x00\x00\x04test\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') ) - self.assertEqual(cluster.brokers[0].host, 'localhost') + self.assertEqual(cluster.brokers[0].host, b'localhost') self.assertEqual(cluster.brokers[0].port, 9092) - self.assertEqual(cluster.topics['test'].partitions[0].leader, + self.assertEqual(cluster.topics[b'test'].partitions[0].leader, cluster.brokers[0].id) - self.assertEqual(cluster.topics['test'].partitions[0].replicas, + self.assertEqual(cluster.topics[b'test'].partitions[0].replicas, [cluster.brokers[0].id]) - self.assertEqual(cluster.topics['test'].partitions[0].isr, + self.assertEqual(cluster.topics[b'test'].partitions[0].isr, [cluster.brokers[0].id]) def test_partition_error(self): # Response has a UnknownTopicOrPartition error for test/0 response = protocol.MetadataResponse( - buffer('\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x00\x00\x04test\x00\x00\x00\x02\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') + buffer(b'\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x00\x00\x04test\x00\x00\x00\x02\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') ) - self.assertEqual(response.topics['test'].partitions[0].err, 3) + self.assertEqual(response.topics[b'test'].partitions[0].err, 3) def test_topic_error(self): # Response has a UnknownTopicOrPartition error for test/0 response = protocol.MetadataResponse( - buffer('\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x03\x00\x04test\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') + buffer(b'\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x03\x00\x04test\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') ) - self.assertEqual(response.topics['test'].err, 3) + self.assertEqual(response.topics[b'test'].err, 3) class TestProduceAPI(unittest.TestCase): @@ -78,17 +79,17 @@ def test_snappy_compression(self): def test_partition_error(self): # Response has a UnknownTopicOrPartition error for test/0 response = protocol.ProduceResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02') ) - self.assertEqual(response.topics['test'][0].err, 3) + self.assertEqual(response.topics[b'test'][0].err, 3) def test_response(self): response = protocol.ProduceResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02') ) self.assertEqual( response.topics, - {'test': {0: protocol.ProducePartitionResponse(0, 2)}} + {b'test': {0: protocol.ProducePartitionResponse(0, 2)}} ) @@ -107,19 +108,19 @@ def test_request(self): def test_partition_error(self): # Response has a UnknownTopicOrPartition error for test/0 response = protocol.FetchResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006\xa3 ^B\x00\x00\x00\x00\x00\x12test_partition_key\x00\x00\x00\x16this is a test message') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006\xa3 ^B\x00\x00\x00\x00\x00\x12test_partition_key\x00\x00\x00\x16this is a test message') ) - self.assertEqual(response.topics['test'][0].err, 3) + self.assertEqual(response.topics[b'test'][0].err, 3) def test_response(self): resp = protocol.FetchResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006\xa3 ^B\x00\x00\x00\x00\x00\x12test_partition_key\x00\x00\x00\x16this is a test message') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006\xa3 ^B\x00\x00\x00\x00\x00\x12test_partition_key\x00\x00\x00\x16this is a test message') ) - self.assertEqual(len(resp.topics['test'][0].messages), 1) - self.assertEqual(resp.topics['test'][0].max_offset, 2) - message = resp.topics['test'][0].messages[0] - self.assertEqual(message.value, 'this is a test message') - self.assertEqual(message.partition_key, 'test_partition_key') + self.assertEqual(len(resp.topics[b'test'][0].messages), 1) + self.assertEqual(resp.topics[b'test'][0].max_offset, 2) + message = resp.topics[b'test'][0].messages[0] + self.assertEqual(message.value, b'this is a test message') + self.assertEqual(message.partition_key, b'test_partition_key') self.assertEqual(message.compression_type, 0) self.assertEqual(message.offset, 1) @@ -172,15 +173,15 @@ def test_request(self): def test_partition_error(self): # Response has a UnknownTopicOrPartition error for test/0 response = protocol.OffsetResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02') ) - self.assertEqual(response.topics['test'][0].err, 3) + self.assertEqual(response.topics[b'test'][0].err, 3) def test_response(self): resp = protocol.OffsetResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02') ) - self.assertEqual(resp.topics['test'][0].offset, [2]) + self.assertEqual(resp.topics[b'test'][0].offset, [2]) class TestOffsetCommitFetchAPI(unittest.TestCase): @@ -196,10 +197,10 @@ def test_consumer_metadata_request(self): def test_consumer_metadata_response(self): response = protocol.ConsumerMetadataResponse( - buffer('\x00\x00\x00\x00\x00\x00\x00\remmett-debian\x00\x00#\x84') + buffer(b'\x00\x00\x00\x00\x00\x00\x00\remmett-debian\x00\x00#\x84') ) self.assertEqual(response.coordinator_id, 0) - self.assertEqual(response.coordinator_host, 'emmett-debian') + self.assertEqual(response.coordinator_host, b'emmett-debian') self.assertEqual(response.coordinator_port, 9092) def test_offset_commit_request(self): @@ -214,9 +215,9 @@ def test_offset_commit_request(self): def test_offset_commit_response(self): response = protocol.OffsetCommitResponse( - buffer('\x00\x00\x00\x01\x00\x0cemmett.dummy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00') + buffer(b'\x00\x00\x00\x01\x00\x0cemmett.dummy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00') ) - self.assertEqual(response.topics['emmett.dummy'][0].err, 0) + self.assertEqual(response.topics[b'emmett.dummy'][0].err, 0) def test_offset_fetch_request(self): preq = protocol.PartitionOffsetFetchRequest('testtopic', 0) @@ -229,10 +230,10 @@ def test_offset_fetch_request(self): def test_offset_fetch_response(self): response = protocol.OffsetFetchResponse( - buffer('\x00\x00\x00\x01\x00\x0cemmett.dummy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') + buffer(b'\x00\x00\x00\x01\x00\x0cemmett.dummy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') ) - self.assertEqual(response.topics['emmett.dummy'][0].metadata, '') - self.assertEqual(response.topics['emmett.dummy'][0].offset, 1) + self.assertEqual(response.topics[b'emmett.dummy'][0].metadata, b'') + self.assertEqual(response.topics[b'emmett.dummy'][0].offset, 1) if __name__ == '__main__': diff --git a/tests/pykafka/utils/test_compression.py b/tests/pykafka/utils/test_compression.py index 2144ed414..f0e2308f3 100644 --- a/tests/pykafka/utils/test_compression.py +++ b/tests/pykafka/utils/test_compression.py @@ -3,7 +3,7 @@ class CompressionTests(unittest.TestCase): """Keeping these simple by verifying what goes in is what comes out.""" - text = "The man in black fled across the desert, and the gunslinger followed." + text = b"The man in black fled across the desert, and the gunslinger followed." def test_gzip(self): encoded = compression.encode_gzip(self.text) diff --git a/tests/pykafka/utils/test_struct_helpers.py b/tests/pykafka/utils/test_struct_helpers.py index dee37b28e..c19760db3 100644 --- a/tests/pykafka/utils/test_struct_helpers.py +++ b/tests/pykafka/utils/test_struct_helpers.py @@ -12,11 +12,11 @@ def test_basic_unpack(self): def test_string_encoding(self): output = struct_helpers.unpack_from('S', b'\x00\x04test') - self.assertEqual(output, ('test',)) + self.assertEqual(output, (b'test',)) def test_bytearray_unpacking(self): output = struct_helpers.unpack_from('Y', b'\x00\x00\x00\x04test') - self.assertEqual(output, ('test',)) + self.assertEqual(output, (b'test',)) def test_array_unpacking(self): output = struct_helpers.unpack_from(