From 1f059b06e7a35a96c8158913f60207c5f13e915b Mon Sep 17 00:00:00 2001 From: hammadb Date: Thu, 14 Dec 2023 19:29:37 -0800 Subject: [PATCH] [ENH] Add rust message id conversion for pulsar --- rust/worker/src/ingest/message_id.rs | 27 +++++++++++++++++++++++++++ rust/worker/src/ingest/mod.rs | 1 + 2 files changed, 28 insertions(+) create mode 100644 rust/worker/src/ingest/message_id.rs diff --git a/rust/worker/src/ingest/message_id.rs b/rust/worker/src/ingest/message_id.rs new file mode 100644 index 00000000000..6d0fd8441aa --- /dev/null +++ b/rust/worker/src/ingest/message_id.rs @@ -0,0 +1,27 @@ +// mirrors chromadb/utils/messageid.py +use num_bigint::BigInt; +use pulsar::proto::MessageIdData; + +use crate::types::SeqId; + +pub(crate) fn pulsar_to_int(message_id: &MessageIdData) -> SeqId { + let ledger_id = message_id.ledger_id; + let entry_id = message_id.entry_id; + let batch_index = message_id.batch_index.unwrap_or(0); + let partition = message_id.partition.unwrap_or(0); + + let mut ledger_id = BigInt::from(ledger_id); + let mut entry_id = BigInt::from(entry_id); + let mut batch_index = BigInt::from(batch_index); + let mut partition = BigInt::from(partition); + + // Convert to offset binary encoding to preserve ordering semantics when encoded + // see https://en.wikipedia.org/wiki/Offset_binary + ledger_id = ledger_id + BigInt::from(2).pow(63); + entry_id = entry_id + BigInt::from(2).pow(63); + batch_index = batch_index + BigInt::from(2).pow(31); + partition = partition + BigInt::from(2).pow(31); + + let res = ledger_id << 128 | entry_id << 96 | batch_index << 64 | partition; + res +} diff --git a/rust/worker/src/ingest/mod.rs b/rust/worker/src/ingest/mod.rs index 954234e98cb..34b20dd79f1 100644 --- a/rust/worker/src/ingest/mod.rs +++ b/rust/worker/src/ingest/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod config; mod ingest; +mod message_id; // Re-export the ingest provider for use in the worker pub(crate) use ingest::*;