Skip to content

Commit

Permalink
enh: update native tmq api
Browse files Browse the repository at this point in the history
  • Loading branch information
sunpe committed Jul 28, 2023
1 parent 949f3fe commit aaa33ce
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 47 deletions.
129 changes: 93 additions & 36 deletions taos/cinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,10 +1036,10 @@ def taos_stmt_affected_rows(stmt):


def taos_schemaless_insert(
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
):
_check_if_supported()
num_of_lines = len(lines)
Expand Down Expand Up @@ -1084,11 +1084,11 @@ def taos_schemaless_insert(


def taos_schemaless_insert_ttl(
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
):
_check_if_supported()
num_of_lines = len(lines)
Expand Down Expand Up @@ -1135,12 +1135,12 @@ def taos_schemaless_insert_ttl(


def taos_schemaless_insert_ttl_with_reqid(
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
):
_check_if_supported()
num_of_lines = len(lines)
Expand Down Expand Up @@ -1217,10 +1217,10 @@ def taos_schemaless_insert_with_reqid(connection, lines, protocol, precision, re


def taos_schemaless_insert_raw(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1264,11 +1264,11 @@ def taos_schemaless_insert_raw(


def taos_schemaless_insert_raw_with_reqid(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
req_id: int,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
req_id: int,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1322,11 +1322,11 @@ def taos_schemaless_insert_raw_with_reqid(


def taos_schemaless_insert_raw_ttl(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1381,12 +1381,12 @@ def taos_schemaless_insert_raw_ttl(


def taos_schemaless_insert_raw_ttl_with_reqid(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1669,13 +1669,28 @@ def tmq_consumer_close(tmq):


def tmq_commit_sync(tmq, offset):
# type: (c_void_p, c_void_p) -> None
# type: (c_void_p, c_void_p|None) -> None
_check_if_supported()
res = _libtaos.tmq_commit_sync(tmq, offset)
if res != 0:
raise TmqError(msg=f"failed on tmq_commit_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res)


try:
_libtaos.tmq_commit_offset_sync.argtypes = (c_void_p, c_char_p, c_int32, c_int64)
_libtaos.tmq_commit_offset_sync.restype = c_int32
except Exception as err:
_UNSUPPORTED["tmq_commit_offset_sync"] = err


def tmq_commit_offset_sync(tmq, topic, vg_id, offset):
# type: (c_void_p, str, int, int) -> None
_check_if_supported()
res = _libtaos.tmq_commit_offset_sync(tmq, c_char_p(topic.encode("utf-8")), c_int32(vg_id), c_int64(offset))
if res != 0:
raise TmqError(msg=f"failed on tmq_commit_offset_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res)


try:
_libtaos.tmq_get_topic_name.argtypes = (c_void_p,)
_libtaos.tmq_get_topic_name.restype = c_char_p
Expand Down Expand Up @@ -1777,6 +1792,18 @@ def tmq_get_res_type(message):
return _libtaos.tmq_get_res_type(message)


try:
_libtaos.tmq_get_vgroup_offset.argstype = (c_void_p,)
_libtaos.tmq_get_vgroup_offset.restype = c_int64
except Exception as err:
_UNSUPPORTED["tmq_get_vgroup_offset"] = err


def tmq_get_vgroup_offset(message):
# type: (c_void_p) -> int
return _libtaos.tmq_get_vgroup_offset(message)


class TmqTopicAssignment(Structure):
_fields_ = [
("_vg_id", c_int32),
Expand Down Expand Up @@ -1896,6 +1923,36 @@ def tmq_offset_seek(tmq, topic_name, vgroup_id, offset):
raise TmqError(msg="failed on tmq_offset_seek()", errno=code)


try:
_libtaos.tmq_committed.argstype = (c_void_p, c_char_p, c_int32)
_libtaos.tmq_committed.restype = c_int64
except Exception as err:
_UNSUPPORTED["tmq_committed"] = err


def tmq_committed(tmq, topic, vgroup_id):
# type: (c_void_p, str, int) -> int
res = _libtaos.tmq_committed(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id))
if res < 0:
raise TmqError(msg=taos_errstr(), errno=res)
return res


try:
_libtaos.tmq_position.argstype = (c_void_p, c_char_p, c_int32)
_libtaos.tmq_position.restype = c_int64
except Exception as err:
_UNSUPPORTED["tmq_position"] = err


def tmq_position(tmq, topic, vgroup_id):
# type: (c_void_p, str, int) -> int
offset = _libtaos.tmq_position(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id))
if offset < 0:
raise TmqError(msg=tmq_err2str(), errno=offset)
return offset


class CTaosInterface(object):
def __init__(self, config=None, tz=None):
"""
Expand Down
80 changes: 72 additions & 8 deletions taos/tmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ def value(self):
table=tmq_get_table_name(self.msg)))
return message_blocks

def offset(self):
# type: () -> int
"""
:returns: message offset.
:rtype: int
"""
return tmq_get_vgroup_offset(self.msg)

def __del__(self):
if not self.msg:
return
Expand Down Expand Up @@ -279,20 +287,76 @@ def close(self):
tmq_consumer_close(self._tmq)
self._tmq = None

def commit(self, message):
# type (Message) -> None
def commit(self, message: Message = None, offsets: [TopicPartition] = None):
# type (Message, [TopicPartition], bool) -> None
"""
Commit a message.
The `message` parameters are mutually exclusive. If `message` is None, the current partition assignment's
offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set to False.
The `message` and `offsets` parameters are mutually exclusive. If neither is set, the current partition
assignment's offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set
to False.
:param Message message: Commit the message's offset.
:param Message message: Commit the message's offset+1. Note: By convention, committed offsets reflect the next
message to be consumed, **not** the last message consumed.
:param list(TopicPartition) offsets: List of topic+partitions+offsets to commit.
"""
if message is None and not isinstance(message, Message):
tmq_commit_sync(self._tmq, None)
else:
if message:
if not isinstance(message, Message):
raise TmqError(msg='Invalid message type')
tmq_commit_sync(self._tmq, message.msg)
return

if offsets and isinstance(offsets, list):
for offset in offsets:
if not isinstance(offset, TopicPartition):
raise TmqError(msg='Invalid offset type')
tmq_commit_offset_sync(self._tmq, offset.topic, offset.partition, offset.offset)
return

tmq_commit_sync(self._tmq, None)

def committed(self, partitions):
# type ([TopicPartition]) -> [TopicPartition]
"""
Retrieve committed offsets for the specified partitions.
:param list(TopicPartition) partitions: List of topic+partitions to query for stored offsets.
:returns: List of topic+partitions with offset and possibly error set.
:rtype: list(TopicPartition)
"""
for partition in partitions:
if not isinstance(partition, TopicPartition):
raise TmqError(msg='Invalid partition type')
offset = tmq_committed(self._tmq, partition.topic, partition.partition)
partition.offset = offset

return partitions

def position(self, partitions):
# type ([TopicPartition]) -> [TopicPartition]
"""
Retrieve current positions (offsets) for the specified partitions.
:param list(TopicPartition) partitions: List of topic+partitions to return current offsets for.
:returns: List of topic+partitions with offset and possibly error set.
:rtype: list(TopicPartition)
"""
for partition in partitions:
if not isinstance(partition, TopicPartition):
raise TmqError(msg='Invalid partition type')
offset = tmq_position(self._tmq, partition.topic, partition.partition)
partition.offset = offset

return partitions

def list_topics(self) -> [str]:
# type () -> [str]
"""
Request subscription topics from the tmq.
:rtype: topics list
"""
return tmq_subscription(self._tmq)

def __del__(self):
self.close()
Expand Down
59 changes: 56 additions & 3 deletions tests/test_tmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,14 @@ def pre_test_tmq(precision: str):


def after_ter_tmq():
time.sleep(20)
conn = taos.connect()
# drop database and topic
conn.execute("drop topic if exists topic1")
conn.execute("drop database if exists tmq_test")
try:
conn.execute("drop topic if exists topic1")
conn.execute("drop database if exists tmq_test")
except Exception:
pass


def test_consumer_with_precision():
Expand Down Expand Up @@ -124,7 +128,7 @@ def tmq_consumer_with_precision(precision: str):
print("====== finish test, start clean")
print("====== unsubscribe topic")
consumer.unsubscribe()

consumer.close()
after_ter_tmq()


Expand Down Expand Up @@ -152,6 +156,10 @@ def test_tmq_assignment():
assignment = consumer.assignment()
assert assignment[0].offset > 0

consumer.unsubscribe()
consumer.close()
after_ter_tmq()


def test_tmq_seek():
if not IS_V3:
Expand All @@ -177,6 +185,51 @@ def test_tmq_seek():

assert assignment[0].offset == 0

consumer.unsubscribe()
consumer.close()
after_ter_tmq()


def test_tmq_list_topics():
if not IS_V3:
return
pre_test_tmq('')
consumer = Consumer({"group.id": "1"})
consumer.subscribe(["topic1"])
topics = consumer.list_topics()
assert topics == ["topic1"]
consumer.unsubscribe()
consumer.close()
after_ter_tmq()


def test_tmq_committed_and_position():
if not IS_V3:
return
pre_test_tmq('')
conn = taos.connect()
conn.select_db("tmq_test")
conn.execute("insert into tb1 values (now-4s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1')")
conn.execute("insert into tb1 values (now-3s, true,1,1,1,1,1,1,1,1,1,1,1,'1','1')")
conn.execute("insert into tb1 values (now-2s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2')")
conn.execute("insert into tb1 values (now-1s, true,2,2,2,2,2,2,2,2,2,2,2,'2','2')")

consumer = Consumer({"group.id": "1"})
consumer.subscribe(["topic1"])
consumer.poll(1)
res = consumer.poll(1)
consumer.commit()
topic_partitions = consumer.assignment()
committees = consumer.committed(topic_partitions)
assert committees[0].offset > 0
positions = consumer.position(topic_partitions)
assert positions[0].offset > 0
assert positions[0].offset == committees[0].offset

consumer.unsubscribe()
consumer.close()
after_ter_tmq()


if __name__ == "__main__":
test_consumer_with_precision()

0 comments on commit aaa33ce

Please sign in to comment.