diff --git a/taos/cinterface.py b/taos/cinterface.py index 0d797c92..af377ef7 100644 --- a/taos/cinterface.py +++ b/taos/cinterface.py @@ -1036,10 +1036,10 @@ def taos_stmt_affected_rows(stmt): def taos_schemaless_insert( - connection: c_void_p, - lines: Union[List[str], Tuple[str]], - protocol: SmlProtocol, - precision: SmlPrecision, + connection: c_void_p, + lines: Union[List[str], Tuple[str]], + protocol: SmlProtocol, + precision: SmlPrecision, ): _check_if_supported() num_of_lines = len(lines) @@ -1084,11 +1084,11 @@ def taos_schemaless_insert( def taos_schemaless_insert_ttl( - connection: c_void_p, - lines: Union[List[str], Tuple[str]], - protocol: SmlProtocol, - precision: SmlPrecision, - ttl: int, + connection: c_void_p, + lines: Union[List[str], Tuple[str]], + protocol: SmlProtocol, + precision: SmlPrecision, + ttl: int, ): _check_if_supported() num_of_lines = len(lines) @@ -1135,12 +1135,12 @@ def taos_schemaless_insert_ttl( def taos_schemaless_insert_ttl_with_reqid( - connection: c_void_p, - lines: Union[List[str], Tuple[str]], - protocol: SmlProtocol, - precision: SmlPrecision, - ttl: int, - req_id: int, + connection: c_void_p, + lines: Union[List[str], Tuple[str]], + protocol: SmlProtocol, + precision: SmlPrecision, + ttl: int, + req_id: int, ): _check_if_supported() num_of_lines = len(lines) @@ -1217,10 +1217,10 @@ def taos_schemaless_insert_with_reqid(connection, lines, protocol, precision, re def taos_schemaless_insert_raw( - connection: c_void_p, - lines_raw: str, - protocol: SmlProtocol, - precision: SmlPrecision, + connection: c_void_p, + lines_raw: str, + protocol: SmlProtocol, + precision: SmlPrecision, ) -> int: _check_if_supported() length = len(lines_raw) @@ -1264,11 +1264,11 @@ def taos_schemaless_insert_raw( def taos_schemaless_insert_raw_with_reqid( - connection: c_void_p, - lines_raw: str, - protocol: SmlProtocol, - precision: SmlPrecision, - req_id: int, + connection: c_void_p, + lines_raw: str, + protocol: SmlProtocol, + precision: SmlPrecision, + req_id: int, ) -> int: _check_if_supported() length = len(lines_raw) @@ -1322,11 +1322,11 @@ def taos_schemaless_insert_raw_with_reqid( def taos_schemaless_insert_raw_ttl( - connection: c_void_p, - lines_raw: str, - protocol: SmlProtocol, - precision: SmlPrecision, - ttl: int, + connection: c_void_p, + lines_raw: str, + protocol: SmlProtocol, + precision: SmlPrecision, + ttl: int, ) -> int: _check_if_supported() length = len(lines_raw) @@ -1381,12 +1381,12 @@ def taos_schemaless_insert_raw_ttl( def taos_schemaless_insert_raw_ttl_with_reqid( - connection: c_void_p, - lines_raw: str, - protocol: SmlProtocol, - precision: SmlPrecision, - ttl: int, - req_id: int, + connection: c_void_p, + lines_raw: str, + protocol: SmlProtocol, + precision: SmlPrecision, + ttl: int, + req_id: int, ) -> int: _check_if_supported() length = len(lines_raw) @@ -1509,7 +1509,7 @@ def tmq_consumer_new(conf, errstrlen=0): def tmq_err2str(errno): # type (c_int) -> c_char_p _check_if_supported() - return c_char_p(_libtaos.tmq_err2str(errno)) + return c_char_p(_libtaos.tmq_err2str(errno)).value.decode("utf-8") try: @@ -1669,13 +1669,28 @@ def tmq_consumer_close(tmq): def tmq_commit_sync(tmq, offset): - # type: (c_void_p, c_void_p) -> None + # type: (c_void_p, c_void_p|None) -> None _check_if_supported() res = _libtaos.tmq_commit_sync(tmq, offset) if res != 0: raise TmqError(msg=f"failed on tmq_commit_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res) +try: + _libtaos.tmq_commit_offset_sync.argtypes = (c_void_p, c_char_p, c_int32, c_int64) + _libtaos.tmq_commit_offset_sync.restype = c_int32 +except Exception as err: + _UNSUPPORTED["tmq_commit_offset_sync"] = err + + +def tmq_commit_offset_sync(tmq, topic, vg_id, offset): + # type: (c_void_p, str, int, int) -> None + _check_if_supported() + res = _libtaos.tmq_commit_offset_sync(tmq, c_char_p(topic.encode("utf-8")), c_int32(vg_id), c_int64(offset)) + if res != 0: + raise TmqError(msg=f"failed on tmq_commit_offset_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res) + + try: _libtaos.tmq_get_topic_name.argtypes = (c_void_p,) _libtaos.tmq_get_topic_name.restype = c_char_p @@ -1777,6 +1792,18 @@ def tmq_get_res_type(message): return _libtaos.tmq_get_res_type(message) +try: + _libtaos.tmq_get_vgroup_offset.argstype = (c_void_p,) + _libtaos.tmq_get_vgroup_offset.restype = c_int64 +except Exception as err: + _UNSUPPORTED["tmq_get_vgroup_offset"] = err + + +def tmq_get_vgroup_offset(message): + # type: (c_void_p) -> int + return _libtaos.tmq_get_vgroup_offset(message) + + class TmqTopicAssignment(Structure): _fields_ = [ ("_vg_id", c_int32), @@ -1871,7 +1898,8 @@ def tmq_get_topic_assignment(tmq, topic_name): code = _libtaos.tmq_get_topic_assignment(tmq, c_char_p(topic_name.encode('utf-8')), byref(assignment), ctypes.byref(num_of_assignment)) if code != 0: - raise TmqError(msg="failed on tmq_get_topic_assignment()", errno=code) + raise TmqError(msg=f"failed on tmq_get_topic_assignment(), errno={code:X}, errmsg={tmq_err2str(code)}", + errno=code) tmq_assignments = TmqTopicAssignments(assignment, num_of_assignment.value) @@ -1893,7 +1921,37 @@ def tmq_get_topic_assignment(tmq, topic_name): def tmq_offset_seek(tmq, topic_name, vgroup_id, offset): code = _libtaos.tmq_offset_seek(tmq, c_char_p(topic_name.encode('utf-8')), c_int32(vgroup_id), c_int64(offset)) if code != 0: - raise TmqError(msg="failed on tmq_offset_seek()", errno=code) + raise TmqError(msg=f"failed on tmq_offset_seek(), errno={code:X}, errmsg={tmq_err2str(code)}", errno=code) + + +try: + _libtaos.tmq_committed.argstype = (c_void_p, c_char_p, c_int32) + _libtaos.tmq_committed.restype = c_int64 +except Exception as err: + _UNSUPPORTED["tmq_committed"] = err + + +def tmq_committed(tmq, topic, vgroup_id): + # type: (c_void_p, str, int) -> int + res = _libtaos.tmq_committed(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id)) + if res < 0: + raise TmqError(msg=f"failed on tmq_committed(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res) + return res + + +try: + _libtaos.tmq_position.argstype = (c_void_p, c_char_p, c_int32) + _libtaos.tmq_position.restype = c_int64 +except Exception as err: + _UNSUPPORTED["tmq_position"] = err + + +def tmq_position(tmq, topic, vgroup_id): + # type: (c_void_p, str, int) -> int + offset = _libtaos.tmq_position(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id)) + if offset < 0: + raise TmqError(msg=f"failed on tmq_position(), errno={offset:X}, errmsg={tmq_err2str(offset)}", errno=offset) + return offset class CTaosInterface(object): diff --git a/taos/tmq.py b/taos/tmq.py index 195f54a6..77e2ebee 100644 --- a/taos/tmq.py +++ b/taos/tmq.py @@ -135,6 +135,14 @@ def value(self): table=tmq_get_table_name(self.msg))) return message_blocks + def offset(self): + # type: () -> int + """ + :returns: message offset. + :rtype: int + """ + return tmq_get_vgroup_offset(self.msg) + def __del__(self): if not self.msg: return @@ -279,20 +287,76 @@ def close(self): tmq_consumer_close(self._tmq) self._tmq = None - def commit(self, message): - # type (Message) -> None + def commit(self, message: Message = None, offsets: [TopicPartition] = None): + # type (Message, [TopicPartition], bool) -> None """ Commit a message. - The `message` parameters are mutually exclusive. If `message` is None, the current partition assignment's - offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set to False. + The `message` and `offsets` parameters are mutually exclusive. If neither is set, the current partition + assignment's offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set + to False. - :param Message message: Commit the message's offset. + :param Message message: Commit the message's offset+1. Note: By convention, committed offsets reflect the next + message to be consumed, **not** the last message consumed. + :param list(TopicPartition) offsets: List of topic+partitions+offsets to commit. """ - if message is None and not isinstance(message, Message): - tmq_commit_sync(self._tmq, None) - else: + if message: + if not isinstance(message, Message): + raise TmqError(msg='Invalid message type') tmq_commit_sync(self._tmq, message.msg) + return + + if offsets and isinstance(offsets, list): + for offset in offsets: + if not isinstance(offset, TopicPartition): + raise TmqError(msg='Invalid offset type') + tmq_commit_offset_sync(self._tmq, offset.topic, offset.partition, offset.offset) + return + + tmq_commit_sync(self._tmq, None) + + def committed(self, partitions): + # type ([TopicPartition]) -> [TopicPartition] + """ + Retrieve committed offsets for the specified partitions. + + :param list(TopicPartition) partitions: List of topic+partitions to query for stored offsets. + :returns: List of topic+partitions with offset and possibly error set. + :rtype: list(TopicPartition) + """ + for partition in partitions: + if not isinstance(partition, TopicPartition): + raise TmqError(msg='Invalid partition type') + offset = tmq_committed(self._tmq, partition.topic, partition.partition) + partition.offset = offset + + return partitions + + def position(self, partitions): + # type ([TopicPartition]) -> [TopicPartition] + """ + Retrieve current positions (offsets) for the specified partitions. + + :param list(TopicPartition) partitions: List of topic+partitions to return current offsets for. + :returns: List of topic+partitions with offset and possibly error set. + :rtype: list(TopicPartition) + """ + for partition in partitions: + if not isinstance(partition, TopicPartition): + raise TmqError(msg='Invalid partition type') + offset = tmq_position(self._tmq, partition.topic, partition.partition) + partition.offset = offset + + return partitions + + def list_topics(self) -> [str]: + # type () -> [str] + """ + Request subscription topics from the tmq. + + :rtype: topics list + """ + return tmq_subscription(self._tmq) def __del__(self): self.close() diff --git a/tests/test_tmq.py b/tests/test_tmq.py index 6b7416ae..87b2ef76 100644 --- a/tests/test_tmq.py +++ b/tests/test_tmq.py @@ -88,8 +88,11 @@ def pre_test_tmq(precision: str): def after_ter_tmq(): conn = taos.connect() # drop database and topic - conn.execute("drop topic if exists topic1") - conn.execute("drop database if exists tmq_test") + try: + conn.execute("drop topic if exists topic1") + conn.execute("drop database if exists tmq_test") + except Exception: + pass def test_consumer_with_precision(): @@ -110,22 +113,23 @@ def tmq_consumer_with_precision(precision: str): "msg.with.table.name": "true" }) consumer.subscribe(["topic1"]) + try: + sThread = ConsumerThread(consumer) + iThread = insertThread() - sThread = ConsumerThread(consumer) - iThread = insertThread() - - sThread.start() - sleep(2) - iThread.start() - - iThread.join() - sThread.join() + sThread.start() + sleep(2) + iThread.start() - print("====== finish test, start clean") - print("====== unsubscribe topic") - consumer.unsubscribe() + iThread.join() + sThread.join() - after_ter_tmq() + print("====== finish test, start clean") + print("====== unsubscribe topic") + finally: + consumer.unsubscribe() + consumer.close() + after_ter_tmq() def test_tmq_assignment(): @@ -142,15 +146,20 @@ def test_tmq_assignment(): consumer = Consumer({"group.id": "1"}) consumer.subscribe(["topic1"]) - assignment = consumer.assignment() + try: + assignment = consumer.assignment() - assert assignment[0].offset == 0 + assert assignment[0].offset == 0 - consumer.poll(1) - consumer.poll(1) + consumer.poll(1) + consumer.poll(1) - assignment = consumer.assignment() - assert assignment[0].offset > 0 + assignment = consumer.assignment() + assert assignment[0].offset > 0 + finally: + consumer.unsubscribe() + consumer.close() + after_ter_tmq() def test_tmq_seek(): @@ -166,16 +175,65 @@ def test_tmq_seek(): consumer = Consumer({"group.id": "1"}) consumer.subscribe(["topic1"]) + try: + assignment = consumer.assignment() + consumer.poll(1) - assignment = consumer.assignment() + for partition in assignment: + consumer.seek(partition) - consumer.poll(1) + assignment = consumer.assignment() + assert assignment[0].offset == 0 + finally: + consumer.unsubscribe() + consumer.close() + after_ter_tmq() - for partition in assignment: - consumer.seek(partition) - assignment = consumer.assignment() - assert assignment[0].offset == 0 +def test_tmq_list_topics(): + if not IS_V3: + return + pre_test_tmq('') + consumer = Consumer({"group.id": "1"}) + consumer.subscribe(["topic1"]) + try: + topics = consumer.list_topics() + assert topics == ["topic1"] + finally: + consumer.unsubscribe() + consumer.close() + after_ter_tmq() + + +def test_tmq_committed_and_position(): + if not IS_V3: + return + pre_test_tmq('') + + conn = taos.connect() + conn.select_db("tmq_test") + conn.execute("insert into tb1 values (now-4s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1')") + conn.execute("insert into tb1 values (now-3s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1')") + conn.execute("insert into tb1 values (now-2s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2')") + conn.execute("insert into tb1 values (now-1s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2')") + + consumer = Consumer({"group.id": "1"}) + consumer.subscribe(["topic1"]) + + try: + consumer.poll(1) + res = consumer.poll(1) + consumer.commit() + topic_partitions = consumer.assignment() + committees = consumer.committed(topic_partitions) + assert committees[0].offset > 0 + positions = consumer.position(topic_partitions) + assert positions[0].offset > 0 + assert positions[0].offset == committees[0].offset + finally: + consumer.unsubscribe() + consumer.close() + after_ter_tmq() if __name__ == "__main__":