Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enh: update native tmq api #205

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 97 additions & 39 deletions taos/cinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,10 +1036,10 @@ def taos_stmt_affected_rows(stmt):


def taos_schemaless_insert(
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
connection: c_void_p,
sangshuduo marked this conversation as resolved.
Show resolved Hide resolved
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
):
_check_if_supported()
num_of_lines = len(lines)
Expand Down Expand Up @@ -1084,11 +1084,11 @@ def taos_schemaless_insert(


def taos_schemaless_insert_ttl(
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
):
_check_if_supported()
num_of_lines = len(lines)
Expand Down Expand Up @@ -1135,12 +1135,12 @@ def taos_schemaless_insert_ttl(


def taos_schemaless_insert_ttl_with_reqid(
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
):
_check_if_supported()
num_of_lines = len(lines)
Expand Down Expand Up @@ -1217,10 +1217,10 @@ def taos_schemaless_insert_with_reqid(connection, lines, protocol, precision, re


def taos_schemaless_insert_raw(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1264,11 +1264,11 @@ def taos_schemaless_insert_raw(


def taos_schemaless_insert_raw_with_reqid(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
req_id: int,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
req_id: int,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1322,11 +1322,11 @@ def taos_schemaless_insert_raw_with_reqid(


def taos_schemaless_insert_raw_ttl(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1381,12 +1381,12 @@ def taos_schemaless_insert_raw_ttl(


def taos_schemaless_insert_raw_ttl_with_reqid(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1509,7 +1509,7 @@ def tmq_consumer_new(conf, errstrlen=0):
def tmq_err2str(errno):
# type (c_int) -> c_char_p
_check_if_supported()
return c_char_p(_libtaos.tmq_err2str(errno))
return c_char_p(_libtaos.tmq_err2str(errno)).value.decode("utf-8")


try:
Expand Down Expand Up @@ -1669,13 +1669,28 @@ def tmq_consumer_close(tmq):


def tmq_commit_sync(tmq, offset):
# type: (c_void_p, c_void_p) -> None
# type: (c_void_p, c_void_p|None) -> None
_check_if_supported()
res = _libtaos.tmq_commit_sync(tmq, offset)
if res != 0:
raise TmqError(msg=f"failed on tmq_commit_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res)


try:
_libtaos.tmq_commit_offset_sync.argtypes = (c_void_p, c_char_p, c_int32, c_int64)
_libtaos.tmq_commit_offset_sync.restype = c_int32
except Exception as err:
_UNSUPPORTED["tmq_commit_offset_sync"] = err


def tmq_commit_offset_sync(tmq, topic, vg_id, offset):
# type: (c_void_p, str, int, int) -> None
_check_if_supported()
res = _libtaos.tmq_commit_offset_sync(tmq, c_char_p(topic.encode("utf-8")), c_int32(vg_id), c_int64(offset))
if res != 0:
raise TmqError(msg=f"failed on tmq_commit_offset_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res)


try:
_libtaos.tmq_get_topic_name.argtypes = (c_void_p,)
_libtaos.tmq_get_topic_name.restype = c_char_p
Expand Down Expand Up @@ -1777,6 +1792,18 @@ def tmq_get_res_type(message):
return _libtaos.tmq_get_res_type(message)


try:
_libtaos.tmq_get_vgroup_offset.argstype = (c_void_p,)
_libtaos.tmq_get_vgroup_offset.restype = c_int64
except Exception as err:
_UNSUPPORTED["tmq_get_vgroup_offset"] = err


def tmq_get_vgroup_offset(message):
# type: (c_void_p) -> int
return _libtaos.tmq_get_vgroup_offset(message)


class TmqTopicAssignment(Structure):
_fields_ = [
("_vg_id", c_int32),
Expand Down Expand Up @@ -1871,7 +1898,8 @@ def tmq_get_topic_assignment(tmq, topic_name):
code = _libtaos.tmq_get_topic_assignment(tmq, c_char_p(topic_name.encode('utf-8')), byref(assignment),
ctypes.byref(num_of_assignment))
if code != 0:
raise TmqError(msg="failed on tmq_get_topic_assignment()", errno=code)
raise TmqError(msg=f"failed on tmq_get_topic_assignment(), errno={code:X}, errmsg={tmq_err2str(code)}",
errno=code)

tmq_assignments = TmqTopicAssignments(assignment, num_of_assignment.value)

Expand All @@ -1893,7 +1921,37 @@ def tmq_get_topic_assignment(tmq, topic_name):
def tmq_offset_seek(tmq, topic_name, vgroup_id, offset):
code = _libtaos.tmq_offset_seek(tmq, c_char_p(topic_name.encode('utf-8')), c_int32(vgroup_id), c_int64(offset))
if code != 0:
raise TmqError(msg="failed on tmq_offset_seek()", errno=code)
raise TmqError(msg=f"failed on tmq_offset_seek(), errno={code:X}, errmsg={tmq_err2str(code)}", errno=code)


try:
_libtaos.tmq_committed.argstype = (c_void_p, c_char_p, c_int32)
_libtaos.tmq_committed.restype = c_int64
except Exception as err:
_UNSUPPORTED["tmq_committed"] = err


def tmq_committed(tmq, topic, vgroup_id):
# type: (c_void_p, str, int) -> int
res = _libtaos.tmq_committed(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id))
if res < 0:
raise TmqError(msg=f"failed on tmq_committed(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res)
return res


try:
_libtaos.tmq_position.argstype = (c_void_p, c_char_p, c_int32)
_libtaos.tmq_position.restype = c_int64
except Exception as err:
_UNSUPPORTED["tmq_position"] = err


def tmq_position(tmq, topic, vgroup_id):
# type: (c_void_p, str, int) -> int
offset = _libtaos.tmq_position(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id))
if offset < 0:
raise TmqError(msg=f"failed on tmq_position(), errno={offset:X}, errmsg={tmq_err2str(offset)}", errno=offset)
return offset


class CTaosInterface(object):
Expand Down
80 changes: 72 additions & 8 deletions taos/tmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ def value(self):
table=tmq_get_table_name(self.msg)))
return message_blocks

def offset(self):
# type: () -> int
"""
:returns: message offset.
:rtype: int
"""
return tmq_get_vgroup_offset(self.msg)

def __del__(self):
if not self.msg:
return
Expand Down Expand Up @@ -279,20 +287,76 @@ def close(self):
tmq_consumer_close(self._tmq)
self._tmq = None

def commit(self, message):
# type (Message) -> None
def commit(self, message: Message = None, offsets: [TopicPartition] = None):
# type (Message, [TopicPartition], bool) -> None
"""
Commit a message.

The `message` parameters are mutually exclusive. If `message` is None, the current partition assignment's
offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set to False.
The `message` and `offsets` parameters are mutually exclusive. If neither is set, the current partition
assignment's offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set
to False.

:param Message message: Commit the message's offset.
:param Message message: Commit the message's offset+1. Note: By convention, committed offsets reflect the next
message to be consumed, **not** the last message consumed.
:param list(TopicPartition) offsets: List of topic+partitions+offsets to commit.
"""
if message is None and not isinstance(message, Message):
tmq_commit_sync(self._tmq, None)
else:
if message:
if not isinstance(message, Message):
raise TmqError(msg='Invalid message type')
tmq_commit_sync(self._tmq, message.msg)
return

if offsets and isinstance(offsets, list):
for offset in offsets:
if not isinstance(offset, TopicPartition):
raise TmqError(msg='Invalid offset type')
tmq_commit_offset_sync(self._tmq, offset.topic, offset.partition, offset.offset)
return

tmq_commit_sync(self._tmq, None)

def committed(self, partitions):
# type ([TopicPartition]) -> [TopicPartition]
"""
Retrieve committed offsets for the specified partitions.

:param list(TopicPartition) partitions: List of topic+partitions to query for stored offsets.
:returns: List of topic+partitions with offset and possibly error set.
:rtype: list(TopicPartition)
"""
for partition in partitions:
if not isinstance(partition, TopicPartition):
raise TmqError(msg='Invalid partition type')
offset = tmq_committed(self._tmq, partition.topic, partition.partition)
partition.offset = offset

return partitions

def position(self, partitions):
# type ([TopicPartition]) -> [TopicPartition]
"""
Retrieve current positions (offsets) for the specified partitions.

:param list(TopicPartition) partitions: List of topic+partitions to return current offsets for.
:returns: List of topic+partitions with offset and possibly error set.
:rtype: list(TopicPartition)
"""
for partition in partitions:
if not isinstance(partition, TopicPartition):
raise TmqError(msg='Invalid partition type')
offset = tmq_position(self._tmq, partition.topic, partition.partition)
partition.offset = offset

return partitions

def list_topics(self) -> [str]:
# type () -> [str]
"""
Request subscription topics from the tmq.

:rtype: topics list
"""
return tmq_subscription(self._tmq)

def __del__(self):
self.close()
Expand Down
Loading
Loading