Skip to content

Commit

Permalink
Merge pull request #208 from taosdata/build/sunpeng/merge-from-3.0
Browse files Browse the repository at this point in the history
build: merge from 3.0
  • Loading branch information
sunpe authored Aug 2, 2023
2 parents b0686c2 + caaf1c8 commit 9dad4b5
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 74 deletions.
136 changes: 97 additions & 39 deletions taos/cinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,10 +1036,10 @@ def taos_stmt_affected_rows(stmt):


def taos_schemaless_insert(
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
):
_check_if_supported()
num_of_lines = len(lines)
Expand Down Expand Up @@ -1084,11 +1084,11 @@ def taos_schemaless_insert(


def taos_schemaless_insert_ttl(
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
):
_check_if_supported()
num_of_lines = len(lines)
Expand Down Expand Up @@ -1135,12 +1135,12 @@ def taos_schemaless_insert_ttl(


def taos_schemaless_insert_ttl_with_reqid(
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
connection: c_void_p,
lines: Union[List[str], Tuple[str]],
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
):
_check_if_supported()
num_of_lines = len(lines)
Expand Down Expand Up @@ -1217,10 +1217,10 @@ def taos_schemaless_insert_with_reqid(connection, lines, protocol, precision, re


def taos_schemaless_insert_raw(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1264,11 +1264,11 @@ def taos_schemaless_insert_raw(


def taos_schemaless_insert_raw_with_reqid(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
req_id: int,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
req_id: int,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1322,11 +1322,11 @@ def taos_schemaless_insert_raw_with_reqid(


def taos_schemaless_insert_raw_ttl(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1381,12 +1381,12 @@ def taos_schemaless_insert_raw_ttl(


def taos_schemaless_insert_raw_ttl_with_reqid(
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
connection: c_void_p,
lines_raw: str,
protocol: SmlProtocol,
precision: SmlPrecision,
ttl: int,
req_id: int,
) -> int:
_check_if_supported()
length = len(lines_raw)
Expand Down Expand Up @@ -1509,7 +1509,7 @@ def tmq_consumer_new(conf, errstrlen=0):
def tmq_err2str(errno):
# type (c_int) -> c_char_p
_check_if_supported()
return c_char_p(_libtaos.tmq_err2str(errno))
return c_char_p(_libtaos.tmq_err2str(errno)).value.decode("utf-8")


try:
Expand Down Expand Up @@ -1669,13 +1669,28 @@ def tmq_consumer_close(tmq):


def tmq_commit_sync(tmq, offset):
# type: (c_void_p, c_void_p) -> None
# type: (c_void_p, c_void_p|None) -> None
_check_if_supported()
res = _libtaos.tmq_commit_sync(tmq, offset)
if res != 0:
raise TmqError(msg=f"failed on tmq_commit_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res)


try:
_libtaos.tmq_commit_offset_sync.argtypes = (c_void_p, c_char_p, c_int32, c_int64)
_libtaos.tmq_commit_offset_sync.restype = c_int32
except Exception as err:
_UNSUPPORTED["tmq_commit_offset_sync"] = err


def tmq_commit_offset_sync(tmq, topic, vg_id, offset):
# type: (c_void_p, str, int, int) -> None
_check_if_supported()
res = _libtaos.tmq_commit_offset_sync(tmq, c_char_p(topic.encode("utf-8")), c_int32(vg_id), c_int64(offset))
if res != 0:
raise TmqError(msg=f"failed on tmq_commit_offset_sync(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res)


try:
_libtaos.tmq_get_topic_name.argtypes = (c_void_p,)
_libtaos.tmq_get_topic_name.restype = c_char_p
Expand Down Expand Up @@ -1777,6 +1792,18 @@ def tmq_get_res_type(message):
return _libtaos.tmq_get_res_type(message)


try:
_libtaos.tmq_get_vgroup_offset.argstype = (c_void_p,)
_libtaos.tmq_get_vgroup_offset.restype = c_int64
except Exception as err:
_UNSUPPORTED["tmq_get_vgroup_offset"] = err


def tmq_get_vgroup_offset(message):
# type: (c_void_p) -> int
return _libtaos.tmq_get_vgroup_offset(message)


class TmqTopicAssignment(Structure):
_fields_ = [
("_vg_id", c_int32),
Expand Down Expand Up @@ -1871,7 +1898,8 @@ def tmq_get_topic_assignment(tmq, topic_name):
code = _libtaos.tmq_get_topic_assignment(tmq, c_char_p(topic_name.encode('utf-8')), byref(assignment),
ctypes.byref(num_of_assignment))
if code != 0:
raise TmqError(msg="failed on tmq_get_topic_assignment()", errno=code)
raise TmqError(msg=f"failed on tmq_get_topic_assignment(), errno={code:X}, errmsg={tmq_err2str(code)}",
errno=code)

tmq_assignments = TmqTopicAssignments(assignment, num_of_assignment.value)

Expand All @@ -1893,7 +1921,37 @@ def tmq_get_topic_assignment(tmq, topic_name):
def tmq_offset_seek(tmq, topic_name, vgroup_id, offset):
code = _libtaos.tmq_offset_seek(tmq, c_char_p(topic_name.encode('utf-8')), c_int32(vgroup_id), c_int64(offset))
if code != 0:
raise TmqError(msg="failed on tmq_offset_seek()", errno=code)
raise TmqError(msg=f"failed on tmq_offset_seek(), errno={code:X}, errmsg={tmq_err2str(code)}", errno=code)


try:
_libtaos.tmq_committed.argstype = (c_void_p, c_char_p, c_int32)
_libtaos.tmq_committed.restype = c_int64
except Exception as err:
_UNSUPPORTED["tmq_committed"] = err


def tmq_committed(tmq, topic, vgroup_id):
# type: (c_void_p, str, int) -> int
res = _libtaos.tmq_committed(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id))
if res < 0:
raise TmqError(msg=f"failed on tmq_committed(), errno={res:X}, errmsg={tmq_err2str(res)}", errno=res)
return res


try:
_libtaos.tmq_position.argstype = (c_void_p, c_char_p, c_int32)
_libtaos.tmq_position.restype = c_int64
except Exception as err:
_UNSUPPORTED["tmq_position"] = err


def tmq_position(tmq, topic, vgroup_id):
# type: (c_void_p, str, int) -> int
offset = _libtaos.tmq_position(tmq, c_char_p(topic.encode('utf-8')), c_int32(vgroup_id))
if offset < 0:
raise TmqError(msg=f"failed on tmq_position(), errno={offset:X}, errmsg={tmq_err2str(offset)}", errno=offset)
return offset


class CTaosInterface(object):
Expand Down
80 changes: 72 additions & 8 deletions taos/tmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ def value(self):
table=tmq_get_table_name(self.msg)))
return message_blocks

def offset(self):
# type: () -> int
"""
:returns: message offset.
:rtype: int
"""
return tmq_get_vgroup_offset(self.msg)

def __del__(self):
if not self.msg:
return
Expand Down Expand Up @@ -279,20 +287,76 @@ def close(self):
tmq_consumer_close(self._tmq)
self._tmq = None

def commit(self, message):
# type (Message) -> None
def commit(self, message: Message = None, offsets: [TopicPartition] = None):
# type (Message, [TopicPartition], bool) -> None
"""
Commit a message.
The `message` parameters are mutually exclusive. If `message` is None, the current partition assignment's
offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set to False.
The `message` and `offsets` parameters are mutually exclusive. If neither is set, the current partition
assignment's offsets are used instead. Use this method to commit offsets if you have 'enable.auto.commit' set
to False.
:param Message message: Commit the message's offset.
:param Message message: Commit the message's offset+1. Note: By convention, committed offsets reflect the next
message to be consumed, **not** the last message consumed.
:param list(TopicPartition) offsets: List of topic+partitions+offsets to commit.
"""
if message is None and not isinstance(message, Message):
tmq_commit_sync(self._tmq, None)
else:
if message:
if not isinstance(message, Message):
raise TmqError(msg='Invalid message type')
tmq_commit_sync(self._tmq, message.msg)
return

if offsets and isinstance(offsets, list):
for offset in offsets:
if not isinstance(offset, TopicPartition):
raise TmqError(msg='Invalid offset type')
tmq_commit_offset_sync(self._tmq, offset.topic, offset.partition, offset.offset)
return

tmq_commit_sync(self._tmq, None)

def committed(self, partitions):
# type ([TopicPartition]) -> [TopicPartition]
"""
Retrieve committed offsets for the specified partitions.
:param list(TopicPartition) partitions: List of topic+partitions to query for stored offsets.
:returns: List of topic+partitions with offset and possibly error set.
:rtype: list(TopicPartition)
"""
for partition in partitions:
if not isinstance(partition, TopicPartition):
raise TmqError(msg='Invalid partition type')
offset = tmq_committed(self._tmq, partition.topic, partition.partition)
partition.offset = offset

return partitions

def position(self, partitions):
# type ([TopicPartition]) -> [TopicPartition]
"""
Retrieve current positions (offsets) for the specified partitions.
:param list(TopicPartition) partitions: List of topic+partitions to return current offsets for.
:returns: List of topic+partitions with offset and possibly error set.
:rtype: list(TopicPartition)
"""
for partition in partitions:
if not isinstance(partition, TopicPartition):
raise TmqError(msg='Invalid partition type')
offset = tmq_position(self._tmq, partition.topic, partition.partition)
partition.offset = offset

return partitions

def list_topics(self) -> [str]:
# type () -> [str]
"""
Request subscription topics from the tmq.
:rtype: topics list
"""
return tmq_subscription(self._tmq)

def __del__(self):
self.close()
Expand Down
Loading

0 comments on commit 9dad4b5

Please sign in to comment.