From aaa33ce81002b38b9079866820c75eb832bbb73b Mon Sep 17 00:00:00 2001 From: sunpeng Date: Fri, 28 Jul 2023 11:11:47 +0800 Subject: [PATCH] enh: update native tmq api --- taos/cinterface.py | 129 ++++++++++++++++++++++++++++++++------------- taos/tmq.py | 80 +++++++++++++++++++++++++--- tests/test_tmq.py | 59 +++++++++++++++++++-- 3 files changed, 221 insertions(+), 47 deletions(-) diff --git a/taos/cinterface.py b/taos/cinterface.py index 0d797c92..393edcb8 100644 --- a/taos/cinterface.py +++ b/taos/cinterface.py @@ -1036,10 +1036,10 @@ def taos_stmt_affected_rows(stmt): def taos_schemaless_insert( - connection: c_void_p, - lines: Union[List[str], Tuple[str]], - protocol: SmlProtocol, - precision: SmlPrecision, + connection: c_void_p, + lines: Union[List[str], Tuple[str]], + protocol: SmlProtocol, + precision: SmlPrecision, ): _check_if_supported() num_of_lines = len(lines) @@ -1084,11 +1084,11 @@ def taos_schemaless_insert( def taos_schemaless_insert_ttl( - connection: c_void_p, - lines: Union[List[str], Tuple[str]], - protocol: SmlProtocol, - precision: SmlPrecision, - ttl: int, + connection: c_void_p, + lines: Union[List[str], Tuple[str]], + protocol: SmlProtocol, + precision: SmlPrecision, + ttl: int, ): _check_if_supported() num_of_lines = len(lines) @@ -1135,12 +1135,12 @@ def taos_schemaless_insert_ttl( def taos_schemaless_insert_ttl_with_reqid( - connection: c_void_p, - lines: Union[List[str], Tuple[str]], - protocol: SmlProtocol, - precision: SmlPrecision, - ttl: int, - req_id: int, + connection: c_void_p, + lines: Union[List[str], Tuple[str]], + protocol: SmlProtocol, + precision: SmlPrecision, + ttl: int, + req_id: int, ): _check_if_supported() num_of_lines = len(lines) @@ -1217,10 +1217,10 @@ def taos_schemaless_insert_with_reqid(connection, lines, protocol, precision, re def taos_schemaless_insert_raw( - connection: c_void_p, - lines_raw: str, - protocol: SmlProtocol, - precision: SmlPrecision, + connection: c_void_p, + lines_raw: str, + protocol: SmlProtocol, + precision: SmlPrecision, ) -> int: _check_if_supported() length = len(lines_raw) @@ -1264,11 +1264,11 @@ def taos_schemaless_insert_raw( def taos_schemaless_insert_raw_with_reqid( - connection: c_void_p, - lines_raw: str, - protocol: SmlProtocol, - precision: SmlPrecision, - req_id: int, + connection: c_void_p, + lines_raw: str, + protocol: SmlProtocol, + precision: SmlPrecision, + req_id: int, ) -> int: _check_if_supported() length = len(lines_raw) @@ -1322,11 +1322,11 @@ def taos_schemaless_insert_raw_with_reqid( def taos_schemaless_insert_raw_ttl( - connection: c_void_p, - lines_raw: str, - protocol: SmlProtocol, - precision: SmlPrecision, - ttl: int, + connection: c_void_p, + lines_raw: str, + protocol: SmlProtocol, + precision: SmlPrecision, + ttl: int, ) -> int: _check_if_supported() length = len(lines_raw) @@ -1381,12 +1381,12 @@ def taos_schemaless_insert_raw_ttl( def taos_schemaless_insert_raw_ttl_with_reqid( - connection: c_void_p, - lines_raw: str, - protocol: SmlProtocol, - precision: SmlPrecision, - ttl: int, - req_id: int, + connection: c_void_p, + lines_raw: str, + protocol: SmlProtocol, + precision: SmlPrecision, + ttl: int, + req_id: int, ) -> int: _check_if_supported() length = len(lines_raw) @@ -1669,13 +1669,28 @@ def tmq_consumer_close(tmq): def tmq_commit_sync(tmq, offset): - # type: (c_void_p, c_void_p) -> None + # type: (c_void_p, c_void_p|None) -> None _check_if_supported() res = _libtaos.tmq_commit_sync(tmq, offset) if res != 0: raise TmqError(msg=f"failed on tmq_commit_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res) +try: + _libtaos.tmq_commit_offset_sync.argtypes = (c_void_p, c_char_p, c_int32, c_int64) + _libtaos.tmq_commit_offset_sync.restype = c_int32 +except Exception as err: + _UNSUPPORTED["tmq_commit_offset_sync"] = err + + +def tmq_commit_offset_sync(tmq, topic, vg_id, offset): + # type: (c_void_p, str, int, int) -> None + _check_if_supported() + res = _libtaos.tmq_commit_offset_sync(tmq, c_char_p(topic.encode("utf-8")), c_int32(vg_id), c_int64(offset)) + if res != 0: + raise TmqError(msg=f"failed on tmq_commit_offset_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res) + + try: _libtaos.tmq_get_topic_name.argtypes = (c_void_p,) _libtaos.tmq_get_topic_name.restype = c_char_p @@ -1777,6 +1792,18 @@ def tmq_get_res_type(message): return _libtaos.tmq_get_res_type(message) +try: + _libtaos.tmq_get_vgroup_offset.argstype = (c_void_p,) + _libtaos.tmq_get_vgroup_offset.restype = c_int64 +except Exception as err: + _UNSUPPORTED["tmq_get_vgroup_offset"] = err + + +def tmq_get_vgroup_offset(message): + # type: (c_void_p) -> int + return _libtaos.tmq_get_vgroup_offset(message) + + class TmqTopicAssignment(Structure): _fields_ = [ ("_vg_id", c_int32), @@ -1896,6 +1923,36 @@ def tmq_offset_seek(tmq, topic_name, vgroup_id, offset): raise TmqError(msg="failed on tmq_offset_seek()", errno=code) +try: + _libtaos.tmq_committed.argstype = (c_void_p, c_char_p, c_int32) + _libtaos.tmq_committed.restype = c_int64 +except Exception as err: + _UNSUPPORTED["tmq_committed"] = err + + +def tmq_committed(tmq, topic, vgroup_id): + # type: (c_void_p, str, int) -> int + res = _libtaos.tmq_committed(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id)) + if res < 0: + raise TmqError(msg=taos_errstr(), errno=res) + return res + + +try: + _libtaos.tmq_position.argstype = (c_void_p, c_char_p, c_int32) + _libtaos.tmq_position.restype = c_int64 +except Exception as err: + _UNSUPPORTED["tmq_position"] = err + + +def tmq_position(tmq, topic, vgroup_id): + # type: (c_void_p, str, int) -> int + offset = _libtaos.tmq_position(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id)) + if offset < 0: + raise TmqError(msg=tmq_err2str(), errno=offset) + return offset + + class CTaosInterface(object): def __init__(self, config=None, tz=None): """ diff --git a/taos/tmq.py b/taos/tmq.py index 195f54a6..77e2ebee 100644 --- a/taos/tmq.py +++ b/taos/tmq.py @@ -135,6 +135,14 @@ def value(self): table=tmq_get_table_name(self.msg))) return message_blocks + def offset(self): + # type: () -> int + """ + :returns: message offset. + :rtype: int + """ + return tmq_get_vgroup_offset(self.msg) + def __del__(self): if not self.msg: return @@ -279,20 +287,76 @@ def close(self): tmq_consumer_close(self._tmq) self._tmq = None - def commit(self, message): - # type (Message) -> None + def commit(self, message: Message = None, offsets: [TopicPartition] = None): + # type (Message, [TopicPartition], bool) -> None """ Commit a message. - The `message` parameters are mutually exclusive. If `message` is None, the current partition assignment's - offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set to False. + The `message` and `offsets` parameters are mutually exclusive. If neither is set, the current partition + assignment's offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set + to False. - :param Message message: Commit the message's offset. + :param Message message: Commit the message's offset+1. Note: By convention, committed offsets reflect the next + message to be consumed, **not** the last message consumed. + :param list(TopicPartition) offsets: List of topic+partitions+offsets to commit. """ - if message is None and not isinstance(message, Message): - tmq_commit_sync(self._tmq, None) - else: + if message: + if not isinstance(message, Message): + raise TmqError(msg='Invalid message type') tmq_commit_sync(self._tmq, message.msg) + return + + if offsets and isinstance(offsets, list): + for offset in offsets: + if not isinstance(offset, TopicPartition): + raise TmqError(msg='Invalid offset type') + tmq_commit_offset_sync(self._tmq, offset.topic, offset.partition, offset.offset) + return + + tmq_commit_sync(self._tmq, None) + + def committed(self, partitions): + # type ([TopicPartition]) -> [TopicPartition] + """ + Retrieve committed offsets for the specified partitions. + + :param list(TopicPartition) partitions: List of topic+partitions to query for stored offsets. + :returns: List of topic+partitions with offset and possibly error set. + :rtype: list(TopicPartition) + """ + for partition in partitions: + if not isinstance(partition, TopicPartition): + raise TmqError(msg='Invalid partition type') + offset = tmq_committed(self._tmq, partition.topic, partition.partition) + partition.offset = offset + + return partitions + + def position(self, partitions): + # type ([TopicPartition]) -> [TopicPartition] + """ + Retrieve current positions (offsets) for the specified partitions. + + :param list(TopicPartition) partitions: List of topic+partitions to return current offsets for. + :returns: List of topic+partitions with offset and possibly error set. + :rtype: list(TopicPartition) + """ + for partition in partitions: + if not isinstance(partition, TopicPartition): + raise TmqError(msg='Invalid partition type') + offset = tmq_position(self._tmq, partition.topic, partition.partition) + partition.offset = offset + + return partitions + + def list_topics(self) -> [str]: + # type () -> [str] + """ + Request subscription topics from the tmq. + + :rtype: topics list + """ + return tmq_subscription(self._tmq) def __del__(self): self.close() diff --git a/tests/test_tmq.py b/tests/test_tmq.py index 6b7416ae..2c6469de 100644 --- a/tests/test_tmq.py +++ b/tests/test_tmq.py @@ -86,10 +86,14 @@ def pre_test_tmq(precision: str): def after_ter_tmq(): + time.sleep(20) conn = taos.connect() # drop database and topic - conn.execute("drop topic if exists topic1") - conn.execute("drop database if exists tmq_test") + try: + conn.execute("drop topic if exists topic1") + conn.execute("drop database if exists tmq_test") + except Exception: + pass def test_consumer_with_precision(): @@ -124,7 +128,7 @@ def tmq_consumer_with_precision(precision: str): print("====== finish test, start clean") print("====== unsubscribe topic") consumer.unsubscribe() - + consumer.close() after_ter_tmq() @@ -152,6 +156,10 @@ def test_tmq_assignment(): assignment = consumer.assignment() assert assignment[0].offset > 0 + consumer.unsubscribe() + consumer.close() + after_ter_tmq() + def test_tmq_seek(): if not IS_V3: @@ -177,6 +185,51 @@ def test_tmq_seek(): assert assignment[0].offset == 0 + consumer.unsubscribe() + consumer.close() + after_ter_tmq() + + +def test_tmq_list_topics(): + if not IS_V3: + return + pre_test_tmq('') + consumer = Consumer({"group.id": "1"}) + consumer.subscribe(["topic1"]) + topics = consumer.list_topics() + assert topics == ["topic1"] + consumer.unsubscribe() + consumer.close() + after_ter_tmq() + + +def test_tmq_committed_and_position(): + if not IS_V3: + return + pre_test_tmq('') + conn = taos.connect() + conn.select_db("tmq_test") + conn.execute("insert into tb1 values (now-4s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1')") + conn.execute("insert into tb1 values (now-3s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1')") + conn.execute("insert into tb1 values (now-2s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2')") + conn.execute("insert into tb1 values (now-1s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2')") + + consumer = Consumer({"group.id": "1"}) + consumer.subscribe(["topic1"]) + consumer.poll(1) + res = consumer.poll(1) + consumer.commit() + topic_partitions = consumer.assignment() + committees = consumer.committed(topic_partitions) + assert committees[0].offset > 0 + positions = consumer.position(topic_partitions) + assert positions[0].offset > 0 + assert positions[0].offset == committees[0].offset + + consumer.unsubscribe() + consumer.close() + after_ter_tmq() + if __name__ == "__main__": test_consumer_with_precision()