Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft: Nested document query #13

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
cb89fd5
Use Levenshtein distance to score documents in fuzzy term queries
neilyio Feb 1, 2024
0230acb
feat: implement `TokenFilter` for `Option<F>` (#4)
aalexandrov Sep 7, 2024
759c0eb
Fix managed paths (#5)
rebasedming Oct 24, 2024
0cc66a2
expose AddOperation and with_max_doc (#7)
rebasedming Oct 28, 2024
e9f969b
adjust `Dictionary::sorted_ords_to_term_cb()` to allow duplicates (#8)
eeeebbbbrrrr Oct 28, 2024
febfba4
chore: point tantivy-fst to paradedb fork to fix regex
neilyio Nov 19, 2024
29bf48d
nested_document_query
neilyio Nov 26, 2024
b9a412c
nested_document_query impl
neilyio Nov 26, 2024
8d80e4a
add add_documents method
neilyio Dec 3, 2024
44edcd3
test passes
neilyio Dec 4, 2024
1bb653c
test pass stable
neilyio Dec 11, 2024
a57c3d6
feat: Add block_join_query module to src/query directory
neilyio Dec 12, 2024
c5fd4d5
Based on the extensive println debugging added, I'll generate a conci…
neilyio Dec 12, 2024
082dcde
refactor: Modernize block join query test code with add_documents and…
neilyio Dec 12, 2024
dd67221
fix: Resolve BlockJoinQuery test failures and implement explain funct…
neilyio Dec 12, 2024
cc5da00
fix: Update BlockJoinQuery explain method with minor syntax changes
neilyio Dec 12, 2024
e706616
The changes look good. Let me generate a concise commit message for t…
neilyio Dec 12, 2024
b5b9ffd
refactor: Improve BlockJoinQuery explain method with document-specifi…
neilyio Dec 12, 2024
871aab7
fix: Refactor BlockJoinScorer to correctly handle document matching a…
neilyio Dec 12, 2024
8402703
fix: Correct BlockJoinQuery parameter order and child doc matching logic
neilyio Dec 13, 2024
0571075
fix: Refactor BlockJoinScorer to correctly handle child document scor…
neilyio Dec 13, 2024
d1513b4
refactor: Update parent query from "parent" to "resume" in tests
neilyio Dec 13, 2024
8a300eb
fix: Adjust BlockJoinScorer initialization to resolve failing tests
neilyio Dec 13, 2024
6a93e29
fix: Track previous parent in BlockJoinScorer to collect child docume…
neilyio Dec 13, 2024
6bf4254
fix: Use u32::MAX as sentinel value for previous_parent in BlockJoinS…
neilyio Dec 13, 2024
541b8b8
fix: Ensure child_scorer is advanced before collecting child documents
neilyio Dec 13, 2024
060e5a8
fix: Correct handling of previous_parent to avoid overflow in tests
neilyio Dec 13, 2024
9949add
fix: Change previous_parent type to Option<DocId> in BlockJoinScorer …
neilyio Dec 13, 2024
b7c433f
fix: Correct BlockJoinScorer initialization and doc() method for bloc…
neilyio Dec 13, 2024
f278893
fix: Correct BlockJoinScorer initialization and document advancement …
neilyio Dec 13, 2024
a8707c1
fix: Correct BlockJoinScorer document advancement and score collectio…
neilyio Dec 13, 2024
573948a
fix: Correctly track parent documents in BlockJoinScorer methods
neilyio Dec 13, 2024
9978e58
fix: Correct parent update order in BlockJoinScorer methods
neilyio Dec 13, 2024
51c1c6f
fix: Correct order of parent updates in block_join_query.rs
neilyio Dec 13, 2024
c44ca1d
fix: Correct parent document handling in BlockJoinScorer advance method
neilyio Dec 13, 2024
d85175e
refactor: Update return types to use crate::Result and add for_each_p…
neilyio Dec 13, 2024
9ad8a77
fix: Update test assertion to match indexed resume content
neilyio Dec 13, 2024
3ff825a
fix: Correct test data content in block join query test
neilyio Dec 13, 2024
a5fbd7c
block join collector
neilyio Dec 13, 2024
c7e58df
block join tests
neilyio Dec 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ byteorder = "1.4.3"
crc32fast = "1.3.2"
once_cell = "1.10.0"
regex = { version = "1.5.5", default-features = false, features = [
"std",
"unicode",
"std",
"unicode",
] }
aho-corasick = "1.0"
tantivy-fst = "0.5"
tantivy-fst = { git = "https://github.com/paradedb/fst.git" }
memmap2 = { version = "0.9.0", optional = true }
lz4_flex = { version = "0.11", default-features = false, optional = true }
zstd = { version = "0.13", optional = true, default-features = false }
Expand All @@ -40,7 +40,7 @@ crossbeam-channel = "0.5.4"
rust-stemmers = "1.2.0"
downcast-rs = "1.2.1"
bitpacking = { version = "0.9.2", default-features = false, features = [
"bitpacker4x",
"bitpacker4x",
] }
census = "0.4.2"
rustc-hash = "2.0.0"
Expand Down Expand Up @@ -129,14 +129,14 @@ compare_hash_only = ["stacker/compare_hash_only"]

[workspace]
members = [
"query-grammar",
"bitpacker",
"common",
"ownedbytes",
"stacker",
"sstable",
"tokenizer-api",
"columnar",
"query-grammar",
"bitpacker",
"common",
"ownedbytes",
"stacker",
"sstable",
"tokenizer-api",
"columnar",
]

# Following the "fail" crate best practises, we isolate
Expand Down
5 changes: 5 additions & 0 deletions src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ use itertools::Itertools;
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize};

#[allow(unused)]
pub(crate) fn invalid_agg_request(message: String) -> crate::TantivyError {
crate::TantivyError::AggregationError(AggregationError::InvalidRequest(message))
}

fn parse_str_into_f64<E: de::Error>(value: &str) -> Result<f64, E> {
let parsed = value
.parse::<f64>()
Expand Down
113 changes: 113 additions & 0 deletions src/block_join_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use crate::collector::Collector;
use crate::query::Scorer;
use crate::DocId;
use crate::Result;
use crate::Score;
use crate::SegmentReader;
use common::BitSet;

/// A conceptual `BlockJoinCollector` that aims to mimic Lucene's BlockJoinCollector.
/// It collects parent documents and, for each one, stores which child docs matched.
/// After search, you can retrieve these "groups".
///
/// NOTE: This is a conceptual implementation. Adjust as per Tantivy's Collector API.
/// In Tantivy, you'd typically implement `Collector` and `SegmentCollector`.
pub struct BlockJoinCollector {
// For simplicity, store doc groups in memory:
groups: Vec<(DocId, Vec<DocId>, Vec<Score>)>,
current_reader_base: DocId,
}

impl BlockJoinCollector {
pub fn new() -> BlockJoinCollector {
BlockJoinCollector {
groups: Vec::new(),
current_reader_base: 0,
}
}

/// Retrieve the collected groups:
pub fn get_groups(&self) -> &[(DocId, Vec<DocId>, Vec<Score>)] {
&self.groups
}
}

impl Collector for BlockJoinCollector {
type Fruit = ();

fn set_segment(
&mut self,
_segment_id: u32,
reader: &SegmentReader,
) -> Result<Box<dyn crate::collector::SegmentCollector<Fruit = ()>>> {
let base = self.current_reader_base;
self.current_reader_base += reader.max_doc();
let mut parent_bitset = BitSet::with_max_value(reader.max_doc());
// In a real scenario, you'd identify the parent docs here using a filter.
// For this conceptual example, we assume parents are known externally.
// You might need to pass that information in or have a filter pre-applied.

Ok(Box::new(BlockJoinSegmentCollector {
parent_bitset,
parent_groups: &mut self.groups,
base,
}))
}

fn requires_scoring(&self) -> bool {
true
}

fn collect(&mut self, _doc: DocId, _score: Score) -> Result<()> {
// This method won't be called directly if we rely on segment collectors.
Ok(())
}

fn harvest(self) -> Result<Self::Fruit> {
Ok(())
}
}

struct BlockJoinSegmentCollector<'a> {
parent_bitset: BitSet,
parent_groups: &'a mut Vec<(DocId, Vec<DocId>, Vec<Score>)>,
base: DocId,
}

impl<'a> crate::collector::SegmentCollector for BlockJoinSegmentCollector<'a> {
type Fruit = ();

fn collect(&mut self, doc: DocId, score: Score) {
// In a more complete implementation, you'd need
// logic to detect transitions from child docs to parent doc.
//
// This is a simplified conceptual collector. In practice:
// 1. Identify if `doc` is a parent or child.
// 2. If child, associate with last-seen parent.
// 3. If parent, start a new group.

// Without full integration it's hard to do. For now,
// assume that the scoring and doc iteration are done by
// BlockJoinScorer and that we only collect parents when
// we hit them:
if self.parent_bitset.contains(doc) {
// It's a parent doc
self.parent_groups
.push((self.base + doc, Vec::new(), Vec::new()));
} else {
// It's a child doc - associate it with last parent
if let Some(last) = self.parent_groups.last_mut() {
last.1.push(self.base + doc);
last.2.push(score);
}
}
}

fn set_scorer(&mut self, _scorer: Box<dyn Scorer>) {
// Not implemented - you'd store the scorer if needed.
}

fn harvest(self) -> Result<Self::Fruit> {
Ok(())
}
}
5 changes: 5 additions & 0 deletions src/directory/directory_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,8 @@ pub static META_LOCK: Lazy<Lock> = Lazy::new(|| Lock {
filepath: PathBuf::from(".tantivy-meta.lock"),
is_blocking: true,
});

pub static MANAGED_LOCK: Lazy<Lock> = Lazy::new(|| Lock {
filepath: PathBuf::from(".tantivy-managed.lock"),
is_blocking: true,
});
77 changes: 34 additions & 43 deletions src/directory/managed_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteEr
use crate::directory::footer::{Footer, FooterProxy};
use crate::directory::{
DirectoryLock, FileHandle, FileSlice, GarbageCollectionResult, Lock, WatchCallback,
WatchHandle, WritePtr, META_LOCK,
WatchHandle, WritePtr, MANAGED_LOCK, META_LOCK,
};
use crate::error::DataCorruption;
use crate::Directory;
Expand Down Expand Up @@ -39,7 +39,6 @@ fn is_managed(path: &Path) -> bool {
#[derive(Debug)]
pub struct ManagedDirectory {
directory: Box<dyn Directory>,
meta_informations: Arc<RwLock<MetaInformation>>,
}

#[derive(Debug, Default)]
Expand All @@ -51,9 +50,9 @@ struct MetaInformation {
/// that were created by tantivy.
fn save_managed_paths(
directory: &dyn Directory,
wlock: &RwLockWriteGuard<'_, MetaInformation>,
managed_paths: &HashSet<PathBuf>,
) -> io::Result<()> {
let mut w = serde_json::to_vec(&wlock.managed_paths)?;
let mut w = serde_json::to_vec(managed_paths)?;
writeln!(&mut w)?;
directory.atomic_write(&MANAGED_FILEPATH, &w[..])?;
Ok(())
Expand All @@ -62,7 +61,11 @@ fn save_managed_paths(
impl ManagedDirectory {
/// Wraps a directory as managed directory.
pub fn wrap(directory: Box<dyn Directory>) -> crate::Result<ManagedDirectory> {
match directory.atomic_read(&MANAGED_FILEPATH) {
Ok(ManagedDirectory { directory })
}

pub fn get_managed_paths(&self) -> crate::Result<HashSet<PathBuf>> {
match self.directory.atomic_read(&MANAGED_FILEPATH) {
Ok(data) => {
let managed_files_json = String::from_utf8_lossy(&data);
let managed_files: HashSet<PathBuf> = serde_json::from_str(&managed_files_json)
Expand All @@ -72,17 +75,9 @@ impl ManagedDirectory {
format!("Managed file cannot be deserialized: {e:?}. "),
)
})?;
Ok(ManagedDirectory {
directory,
meta_informations: Arc::new(RwLock::new(MetaInformation {
managed_paths: managed_files,
})),
})
Ok(managed_files)
}
Err(OpenReadError::FileDoesNotExist(_)) => Ok(ManagedDirectory {
directory,
meta_informations: Arc::default(),
}),
Err(OpenReadError::FileDoesNotExist(_)) => Ok(HashSet::new()),
io_err @ Err(OpenReadError::IoError { .. }) => Err(io_err.err().unwrap().into()),
Err(OpenReadError::IncompatibleIndex(incompatibility)) => {
// For the moment, this should never happen `meta.json`
Expand Down Expand Up @@ -110,9 +105,11 @@ impl ManagedDirectory {
&mut self,
get_living_files: L,
) -> crate::Result<GarbageCollectionResult> {
info!("Garbage collect");
let mut files_to_delete = vec![];

// We're about to do an atomic write to managed.json, lock it down
let _lock = self.acquire_lock(&MANAGED_LOCK)?;
let managed_paths = self.get_managed_paths()?;
// It is crucial to get the living files after acquiring the
// read lock of meta information. That way, we
// avoid the following scenario.
Expand All @@ -124,11 +121,6 @@ impl ManagedDirectory {
//
// releasing the lock as .delete() will use it too.
{
let meta_informations_rlock = self
.meta_informations
.read()
.expect("Managed directory rlock poisoned in garbage collect.");

// The point of this second "file" lock is to enforce the following scenario
// 1) process B tries to load a new set of searcher.
// The list of segments is loaded
Expand All @@ -138,7 +130,7 @@ impl ManagedDirectory {
match self.acquire_lock(&META_LOCK) {
Ok(_meta_lock) => {
let living_files = get_living_files();
for managed_path in &meta_informations_rlock.managed_paths {
for managed_path in &managed_paths {
if !living_files.contains(managed_path) {
files_to_delete.push(managed_path.clone());
}
Expand Down Expand Up @@ -181,16 +173,12 @@ impl ManagedDirectory {
if !deleted_files.is_empty() {
// update the list of managed files by removing
// the file that were removed.
let mut meta_informations_wlock = self
.meta_informations
.write()
.expect("Managed directory wlock poisoned (2).");
let managed_paths_write = &mut meta_informations_wlock.managed_paths;
let mut managed_paths_write = managed_paths;
for delete_file in &deleted_files {
managed_paths_write.remove(delete_file);
}
self.directory.sync_directory()?;
save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?;
save_managed_paths(self.directory.as_mut(), &managed_paths_write)?;
}

Ok(GarbageCollectionResult {
Expand All @@ -215,27 +203,33 @@ impl ManagedDirectory {
if !is_managed(filepath) {
return Ok(());
}
let mut meta_wlock = self
.meta_informations
.write()
.expect("Managed file lock poisoned");
let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned());

// We're about to do an atomic write to managed.json, lock it down
let _lock = self
.acquire_lock(&MANAGED_LOCK)
.expect("must be able to acquire lock for managed.json");

let mut managed_paths = self
.get_managed_paths()
.expect("reading managed files should not fail");
let has_changed = managed_paths.insert(filepath.to_owned());
if !has_changed {
return Ok(());
}
save_managed_paths(self.directory.as_ref(), &meta_wlock)?;
save_managed_paths(self.directory.as_ref(), &managed_paths)?;
// This is not the first file we add.
// Therefore, we are sure that `.managed.json` has been already
// properly created and we do not need to sync its parent directory.
//
// (It might seem like a nicer solution to create the managed_json on the
// creation of the ManagedDirectory instance but it would actually
// prevent the use of read-only directories..)
let managed_file_definitely_already_exists = meta_wlock.managed_paths.len() > 1;
let managed_file_definitely_already_exists = managed_paths.len() > 1;
if managed_file_definitely_already_exists {
return Ok(());
}
self.directory.sync_directory()?;

Ok(())
}

Expand All @@ -258,13 +252,11 @@ impl ManagedDirectory {

/// List all managed files
pub fn list_managed_files(&self) -> HashSet<PathBuf> {
let managed_paths = self
.meta_informations
.read()
.expect("Managed directory rlock poisoned in list damaged.")
.managed_paths
.clone();
managed_paths
let _lock = self
.acquire_lock(&MANAGED_LOCK)
.expect("must be able to acquire lock for managed.json");
self.get_managed_paths()
.expect("reading managed files should not fail")
}
}

Expand Down Expand Up @@ -329,7 +321,6 @@ impl Clone for ManagedDirectory {
fn clone(&self) -> ManagedDirectory {
ManagedDirectory {
directory: self.directory.box_clone(),
meta_informations: Arc::clone(&self.meta_informations),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/directory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use common::{AntiCallToken, OwnedBytes, TerminatingWrite};

pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
pub use self::directory::{Directory, DirectoryClone, DirectoryLock};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, MANAGED_LOCK, META_LOCK};
pub use self::ram_directory::RamDirectory;
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};

Expand Down
4 changes: 3 additions & 1 deletion src/index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use crate::core::{Executor, META_FILEPATH};
use crate::directory::error::OpenReadError;
#[cfg(feature = "mmap")]
use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::directory::{
Directory, DirectoryLock, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK,
};
use crate::error::{DataCorruption, TantivyError};
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
Expand Down
2 changes: 1 addition & 1 deletion src/index/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Segment {
///
/// This method is only used when updating `max_doc` from 0
/// as we finalize a fresh new segment.
pub(crate) fn with_max_doc(self, max_doc: u32) -> Segment {
pub fn with_max_doc(self, max_doc: u32) -> Segment {
Segment {
index: self.index,
meta: self.meta.with_max_doc(max_doc),
Expand Down
Loading