Skip to content

Commit

Permalink
second pass to get the nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
philipperolet committed Jan 14, 2025
1 parent 75ce71a commit 0bb2570
Showing 1 changed file with 136 additions and 6 deletions.
142 changes: 136 additions & 6 deletions core/src/search_stores/search_store.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use anyhow::Result;
use async_trait::async_trait;
use elasticsearch::{
Expand Down Expand Up @@ -127,23 +129,24 @@ impl SearchStore for ElasticsearchSearchStore {
.send()
.await?;

match response.status_code().is_success() {
let nodes: Vec<Node> = match response.status_code().is_success() {
true => {
// get nodes from elasticsearch response in hits.hits
let response_body = response.json::<serde_json::Value>().await?;
let nodes: Vec<Node> = response_body["hits"]["hits"]
response_body["hits"]["hits"]
.as_array()
.unwrap()
.iter()
.map(|h| Node::from(h.get("_source").unwrap().clone()))
.collect();
Ok(nodes)
.collect()
}
false => {
let error = response.json::<serde_json::Value>().await?;
Err(anyhow::anyhow!("Failed to search nodes: {}", error))
return Err(anyhow::anyhow!("Failed to search nodes: {}", error));
}
}
};

self.compute_core_content_nodes(nodes).await
}

async fn index_node(&self, node: Node) -> Result<()> {
Expand Down Expand Up @@ -232,3 +235,130 @@ impl SearchStore for ElasticsearchSearchStore {
Box::new(self.clone())
}
}

impl ElasticsearchSearchStore {
/// Compute core content nodes from a list of nodes.
///
/// This function performs two queries to Elasticsearch:
/// 1. Get has_children information for each node.
/// 2. Get parent titles for each node.
///
/// It then creates CoreContentNodes from the nodes, using the results of these queries
/// to populate the `has_children` and `parent_title` fields
async fn compute_core_content_nodes(&self, nodes: Vec<Node>) -> Result<Vec<CoreContentNode>> {
// Prepare the has_children query future
let has_children_future = self
.client
.search(SearchParts::Index(&[NODES_INDEX_NAME]))
.body(json!({
"size": 0,
"query": {
"terms": {
"parent_id": nodes.iter().map(|n| &n.node_id).collect::<Vec<_>>()
}
},
"aggs": {
"parent_nodes": {
"terms": {
"field": "parent_id",
"size": 10000
}
}
}
}))
.send();

// Prepare the parent titles query future
let parent_ids: Vec<_> = nodes.iter().filter_map(|n| n.parent_id.as_ref()).collect();
let parent_titles_future = self
.client
.search(SearchParts::Index(&[NODES_INDEX_NAME]))
.body(json!({
"size": 10000,
"query": {
"terms": {
"node_id": parent_ids
}
},
"_source": ["node_id", "title"]
}))
.send();

// Execute both futures concurrently
let (has_children_response, parent_titles_response) =
tokio::join!(has_children_future, parent_titles_future);

// Process responses (rest of the function remains the same)
let has_children_response = has_children_response?;
let parent_titles_response = parent_titles_response?;

let has_children_map = if has_children_response.status_code().is_success() {
let response_body = has_children_response.json::<serde_json::Value>().await?;
let mut map = HashMap::new();

if let Some(aggs) = response_body["aggregations"]["parent_nodes"]["buckets"].as_array()
{
for bucket in aggs {
if let (Some(parent_id), Some(doc_count)) =
(bucket["key"].as_str(), bucket["doc_count"].as_u64())
{
map.insert(parent_id.to_string(), doc_count > 0);
}
}
}
map
} else {
let error = has_children_response.json::<serde_json::Value>().await?;
return Err(anyhow::anyhow!(
"Failed to fetch has_children data: {}",
error
));
};

// Process parent titles results
let parent_titles_map = if parent_titles_response.status_code().is_success() {
let response_body = parent_titles_response.json::<serde_json::Value>().await?;
let mut map = HashMap::new();

if let Some(hits) = response_body["hits"]["hits"].as_array() {
for hit in hits {
if let (Some(node_id), Some(title)) = (
hit["_source"]["node_id"].as_str(),
hit["_source"]["title"].as_str(),
) {
map.insert(node_id.to_string(), title.to_string());
}
}
}
map
} else {
let error = parent_titles_response.json::<serde_json::Value>().await?;
return Err(anyhow::anyhow!("Failed to fetch parent titles: {}", error));
};

// Create CoreContentNodes, consuming the nodes
let core_content_nodes = nodes
.into_iter()
.map(|node| {
let has_children = has_children_map
.get(&node.node_id)
.copied()
.unwrap_or(false);
let parent_title = node
.parent_id
.as_ref()
.and_then(|pid| parent_titles_map.get(pid))
.cloned()
.unwrap_or_default();

CoreContentNode::new(
node, // Node is moved here
has_children,
parent_title,
)
})
.collect();

Ok(core_content_nodes)
}
}

0 comments on commit 0bb2570

Please sign in to comment.