diff --git a/pykafka/protocol/message.py b/pykafka/protocol/message.py index 8b4724e42..262953cc4 100644 --- a/pykafka/protocol/message.py +++ b/pykafka/protocol/message.py @@ -1,13 +1,13 @@ # - coding: utf-8 - -from datetime import datetime import struct +from datetime import datetime from pkg_resources import parse_version from six import integer_types from zlib import crc32 from ..common import CompressionType, Message from ..exceptions import MessageSetDecodeFailure -from ..utils import Serializable, struct_helpers, compression +from ..utils import Serializable, struct_helpers, compression, msg_protocol_version from ..utils.compat import buffer @@ -171,6 +171,92 @@ def set_timestamp(self, ts): raise RuntimeError() +class Record(Message): + """ + Specification:: + + Record => + Length => varint + Attributes => int8 + TimestampDelta => varint + OffsetDelta => varint + KeyLen => varint + Key => data + ValueLen => varint + Value => data + Headers => [Header] + """ + __slots__ = Message.__slots__ + ["headers"] + + def __init__(self, + value, + partition_key=None, + compression_type=CompressionType.NONE, + offset=-1, + partition_id=-1, + produce_attempt=0, + protocol_version=2, + timestamp=None, + delivery_report_q=None, + headers=None): + super(Record, self).__init__(value, partition_key=partition_key, + compression_type=compression_type, + offset=offset, partition_id=partition_id, + produce_attempt=produce_attempt, + protocol_version=protocol_version, + timestamp=timestamp, + delivery_report_q=delivery_report_q) + self.headers = headers + + def timestamp_delta(self, base_timestamp=0): + return self.timestamp - base_timestamp + + def offset_delta(self, base_offset=0): + return self.offset - base_offset + + def __len__(self): + size = 1 + size += struct_helpers.get_varint_size(self.timestamp_delta) + size += struct_helpers.get_varint_size(self.offset_delta) + if self.partition_key is not None: + size += struct_helpers.get_varint_size(len(self.partition_key)) + size += len(self.partition_key) + if self.value is not None: + size += struct_helpers.get_varint_size(len(self.value)) + len(self.value) + size += 4 + for hkey, hval in self.headers: + size += struct_helpers.get_varint_size(len(hkey)) + len(hkey) + size += struct_helpers.get_varint_size(len(hval)) + len(hval) + return size + + @classmethod + def decode(self, buff, base_timestamp=0, base_offset=0, partition_id=-1, + compression_type=CompressionType.NONE): + (length, attr, timestamp_delta, + offset_delta, partition_key, value, + headers) = struct_helpers.unpack_from('VBVVGG [GG]', buff, 0) + return Record(value, + partition_key=partition_key, + compression_type=compression_type, + offset=base_offset + offset_delta, + protocol_version=1, # XXX + timestamp=base_timestamp + timestamp_delta, + partition_id=partition_id) + + def pack_into(self, buff, offset, base_timestamp=0, base_offset=0): + fmt = '!VBVVV%dsV%dsi' % (len(self.partition_key), len(self.value)) + args = (len(self), 0, self.timestamp_delta(base_timestamp), + self.offset_delta(base_offset), len(self.partition_key), + self.partition_key, len(self.value), self.value, len(self.headers)) + struct_helpers.pack_into(fmt, buff, offset, *args) + offset += struct_helpers.calcsize(fmt, *args) + for hkey, hval in self.headers: + fmt = '!V%dsV%ds' % (len(hkey), len(hval)) + args = (len(hkey), hkey, len(hval), hval) + struct_helpers.pack_into(fmt, buff, offset, *args) + offset += struct_helpers.calcsize(fmt, *args) + + class MessageSet(Serializable): """Representation of a set of messages in Kafka @@ -225,6 +311,9 @@ def messages(self): self._compressed = None return self._messages + def add_message(self, message): + self.messages.append(message) + def _get_compressed(self): """Get a compressed representation of all current messages. @@ -297,3 +386,147 @@ def pack_into(self, buff, offset): offset += 12 message.pack_into(buff, offset) offset += mlen + + +class RecordBatch(MessageSet): + """Representation of a Kafka RecordBatch + + Specification:: + + RecordBatch => + FirstOffset => int64 + Length => int32 + PartitionLeaderEpoch => int32 + Magic => int8 + CRC => int32 + Attributes => int16 + LastOffsetDelta => int32 + FirstTimestamp => int64 + MaxTimestamp => int64 + ProducerId => int64 + ProducerEpoch => int16 + FirstSequence => int32 + Records => [Record] + """ + def __init__(self, messages=None, compression_type=CompressionType.NONE, + broker_version='0.9.0', first_offset=-1, last_offset_delta=-1, + first_timestamp=-1, max_timestamp=-1, protocol_version=None): + super(RecordBatch, self).__init__(messages=messages, + broker_version=broker_version, + compression_type=compression_type) + if protocol_version is not None: + self.protocol_version = protocol_version + else: + self.protocol_version = msg_protocol_version(broker_version) + self.first_offset = first_offset + self.last_offset_delta = last_offset_delta + self.first_timestamp = first_timestamp + self.max_timestamp = max_timestamp + + def __len__(self): + if self.compression_type == CompressionType.NONE: + messages = self._messages + else: + if self._compressed is None: + self._compressed = self._get_compressed() + messages = [self._compressed] + length = 8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4 + # XXX does this logic still apply to RecordBatch? + length += sum(len(m) for m in messages) + return length + + @property + def records(self): + self._compressed = None + return self._messages + + def add_message(self, message): + self._messages.append(message) + if message.offset < self.first_offset: + self.first_offset = message.offset + msg_offset_delta = message.offset - self.first_offset + if msg_offset_delta > self.last_offset_delta: + self.last_offset_delta = msg_offset_delta + if message.timestamp < self.first_timestamp: + self.first_timestamp = message.timestamp + if message.timestamp > self.max_timestamp: + self.max_timestamp = message.timestamp + + def _get_compressed(self): + assert self.compression_type != CompressionType.NONE + tmp_mset = RecordBatch(messages=self._messages) + uncompressed = bytearray(len(tmp_mset)) + tmp_mset.pack_record_array_into(uncompressed, 0) + if self.compression_type == CompressionType.GZIP: + compressed = compression.encode_gzip(buffer(uncompressed)) + elif self.compression_type == CompressionType.SNAPPY: + compressed = compression.encode_snappy(buffer(uncompressed)) + elif self.compression_type == CompressionType.LZ4: + if parse_version(self._broker_version) >= parse_version('0.10.0'): + compressed = compression.encode_lz4(buffer(uncompressed)) + else: + compressed = compression.encode_lz4_old_kafka(buffer(uncompressed)) + else: + raise TypeError("Unknown compression: %s" % self.compression_type) + protocol_version = max((m.protocol_version for m in self._messages)) + return Record(compressed, compression_type=self.compression_type, + protocol_version=protocol_version) + + @classmethod + def decode(cls, buff, partition_id=-1): + offset = 0 + fmt = '!qiiBihiqqqhii' + (first_offset, _, _, protocol_version, _, attr, last_offset_delta, + first_timestamp, max_timestamp, _, _, _, + records_count) = struct_helpers.unpack_from(fmt, buff, offset) + offset += struct.calcsize(fmt) + + messages = [] + while offset < len(buff): + size = struct.unpack_from('V', buff, offset) + message = Record.decode(buff[offset:offset + size], + partition_id=partition_id) + messages.append(message) + offset += size + + return RecordBatch(messages=messages, first_offset=first_offset, + protocol_version=protocol_version, compression_type=attr, + last_offset_delta=last_offset_delta, + first_timestamp=first_timestamp, max_timestamp=max_timestamp) + + def pack_into(self, buff, offset): + if self.compression_type == CompressionType.NONE: + records = self._records + else: + if self._compressed is None: + self._compressed = self._get_compressed() + records = [self._compressed] + attr = self.compression_type + offset = 0 + fmt = '!qiiB' + args = (self.first_offset, len(self), -1, self.protocol_version) + struct.pack_into(fmt, buff, offset, *args) + offset += struct.calcsize(fmt) + + crc_offset = offset + fmt = '!hiqqqhii' + args = (attr, self.last_offset_delta, self.first_timestamp, self.max_timestamp, + # NB these -1s are for currently unsupported fields introduced in 0.11 + -1, -1, -1, len(records)) + struct.pack_into(fmt, buff, offset + 4, *args) + offset += struct.calcsize(fmt) + 4 + + # TODO replace this with call to pack_record_array_into + for record in records: + record.pack_into(buff, offset, base_timestamp=self.first_timestamp, + base_offset=self.first_offset) + offset += len(record) + end_offset = offset + + data = buffer(buff[(crc_offset + 4):end_offset]) + crc = crc32(data) & 0xffffffff + struct.pack_into('!I', buff, crc_offset, crc) + + def pack_record_array_into(self, buff, offset): + """Pack only the array of Records, ignoring headers""" + pass diff --git a/pykafka/protocol/produce.py b/pykafka/protocol/produce.py index 3ea49715f..30d845ea0 100644 --- a/pykafka/protocol/produce.py +++ b/pykafka/protocol/produce.py @@ -75,7 +75,7 @@ def add_message(self, message, topic_name, partition_id): :param topic_name: the name of the topic to publish to :param partition_id: the partition to publish to """ - self.msets[topic_name][partition_id].messages.append(message) + self.msets[topic_name][partition_id].add_message(message) self._message_count += 1 def get_bytes(self): diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index bc7d63d3b..00407e1ba 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -18,6 +18,8 @@ """ __all__ = ["unpack_from"] import itertools +import math +import re import struct from .compat import range @@ -55,6 +57,9 @@ def unpack_from(fmt, buff, offset=0): return output +BYTES_PREFIXES = {'S': '!h', 'Y': '!i', 'G': 'V'} + + def _unpack(fmt, buff, offset, count=1): """Recursive call for unpacking @@ -84,11 +89,15 @@ def _unpack(fmt, buff, offset, count=1): array_fmt += ch elif ch == '[': array_fmt = '' # starts building string for array unpack + elif ch == 'V': + len_, unpacked = unpack_varint_from(buff, offset) + items.append(unpacked) + offset += len_ else: - if ch in 'SY': - len_fmt = '!h' if ch == 'S' else '!i' - len_ = struct.unpack_from(len_fmt, buff, offset)[0] - offset += struct.calcsize(len_fmt) + if ch in 'SYG': + len_fmt = BYTES_PREFIXES[ch] + len_ = unpack_from(len_fmt, buff, offset)[0] + offset += calcsize(len_fmt, len_) if len_ == -1: items.append(None) continue @@ -117,3 +126,85 @@ def _unpack_array(fmt, buff, offset, count): if len(fmt) == 1: output = list(itertools.chain.from_iterable(output)) return output, offset + + +def unpack_varint_from(buff, offset): + size = 0 + shift = 0 + result = 0 + while True: + size += 1 + i = ord(buff[offset:offset + 1]) + offset += 1 + result |= (i & 0x7f) << shift + shift += 7 + if not (i & 0x80): + break + return size, result + + +def _split_struct_fmt(fmt, args): + if 'V' in fmt: + for i, fmt_part in enumerate([p for p in re.split('(V)', fmt) if p]): + if fmt_part != "V": + args_only_fmt = re.sub(NOARG_STRUCT_FMTS, '', fmt_part) + part_args = [args.pop(0) for _ in range(len(args_only_fmt))] + prefixed = "!" + fmt_part if fmt.startswith("!") and i != 0 else fmt_part + yield prefixed, part_args + else: + yield fmt_part, [args.pop(0)] + else: + yield fmt, args + + +NOARG_STRUCT_FMTS = re.compile(r'[^xcbB\?hHiIlLqQfdspP]') + + +def pack_into(fmt, buff, offset, *args): + if 'V' in fmt: + args = list(args) + for part_fmt, part_args in _split_struct_fmt(fmt, args): + if part_fmt != "V": + struct.pack_into(part_fmt, buff, offset, *part_args) + fmtsize = struct.calcsize(part_fmt) + else: + pack_varint_into(buff, offset, *part_args) + fmtsize = calcsize(part_fmt, *part_args) + offset += fmtsize + else: + return struct.pack_into(fmt, buff, offset, *args) + + +def calcsize(fmt, *args): + if 'V' in fmt: + size = 0 + args = list(args) + for part_fmt, part_args in _split_struct_fmt(fmt, args): + if part_fmt != "V": + size += struct.calcsize(part_fmt) + else: + for arg in part_args: + size += get_varint_size(arg) + return size + else: + return struct.calcsize(fmt) + + +def get_varint_size(val): + return math.ceil(math.log(val, 128)) + + +def pack_varint_into(buff, offset, val): + size = 0 + while True: + towrite = val & 0x7f + val >>= 7 + size += 1 + if val: + struct.pack_into('B', buff, offset, chr(towrite | 0x80)) + offset += 1 + else: + struct.pack_into('B', buff, offset, chr(towrite)) + offset += 1 + break + return size diff --git a/tests/pykafka/utils/test_struct_helpers.py b/tests/pykafka/utils/test_struct_helpers.py index 95a46f805..1d84ec629 100644 --- a/tests/pykafka/utils/test_struct_helpers.py +++ b/tests/pykafka/utils/test_struct_helpers.py @@ -27,5 +27,32 @@ def test_array_unpacking(self): # A 1-length tuple with a 4-length tuple as the element self.assertEqual(output, [1, 2, 3, 4]) + def test_varint_encode(self): + buff = bytearray(2) + val = 300 + expected = b'\xac\x02' + size = struct_helpers.pack_into("V", buff, 0, val) + self.assertEqual(size, 2) + self.assertEqual(buff, expected) + + def test_varint_simple(self): + buff = bytearray(4) + offset = 0 + val = 69 + fmt = 'V' + struct_helpers.pack_into(fmt, buff, offset, val) + output = struct_helpers.unpack_from(fmt, buff) + self.assertEqual(output, (val,)) + + def test_varint_advanced(self): + buff = bytearray(20) + offset = 0 + vals = (68, 69, 420, 30001, 3987533) + fmt = '!qViVV' + struct_helpers.pack_into(fmt, buff, offset, *vals) + output = struct_helpers.unpack_from(fmt, buff) + self.assertEqual(output, vals) + + if __name__ == '__main__': unittest2.main()