diff --git a/core/src/search_stores/search_store.rs b/core/src/search_stores/search_store.rs index 4b360c68e1f32..7ee98b491d113 100644 --- a/core/src/search_stores/search_store.rs +++ b/core/src/search_stores/search_store.rs @@ -103,7 +103,7 @@ impl SearchStore for ElasticsearchSearchStore { let options = options.unwrap_or_default(); // then, search - match self + let response = self .client .search(SearchParts::Index(&[NODES_INDEX_NAME])) .from(options.offset.unwrap_or(0) as i64) @@ -122,9 +122,10 @@ impl SearchStore for ElasticsearchSearchStore { } })) .send() - .await - { - Ok(response) => { + .await?; + + match response.status_code().is_success() { + true => { // get nodes from elasticsearch response in hits.hits let response_body = response.json::().await?; let nodes: Vec = response_body["hits"]["hits"] @@ -135,22 +136,29 @@ impl SearchStore for ElasticsearchSearchStore { .collect(); Ok(nodes) } - Err(e) => Err(e.into()), + false => { + let error = response.json::().await?; + Err(anyhow::anyhow!( + "Failed to search nodes: {}", + serde_json::to_string_pretty(&error)? + )) + } } } async fn index_node(&self, node: Node) -> Result<()> { // todo(kw-search): fail on error let now = utils::now(); - match self + let response = self .client .index(IndexParts::IndexId(NODES_INDEX_NAME, &node.unique_id())) .timeout("200ms") .body(node.clone()) .send() - .await - { - Ok(_) => { + .await?; + + match response.status_code().is_success() { + true => { info!( duration = utils::now() - now, globally_unique_id = node.unique_id(), @@ -159,9 +167,10 @@ impl SearchStore for ElasticsearchSearchStore { ); Ok(()) } - Err(e) => { + false => { + let error = response.json::().await?; error!( - error = %e, + error = serde_json::to_string_pretty(&error)?, duration = utils::now() - now, globally_unique_id = node.unique_id(), "[ElasticsearchSearchStore] Failed to index {}", @@ -173,15 +182,27 @@ impl SearchStore for ElasticsearchSearchStore { } async fn delete_node(&self, node: Node) -> Result<()> { - self.client + let response = self + .client .delete(DeleteParts::IndexId(NODES_INDEX_NAME, &node.unique_id())) .send() .await?; + // todo(kw-search): fail on error + if !response.status_code().is_success() { + let error = response.json::().await?; + error!( + error = serde_json::to_string_pretty(&error)?, + globally_unique_id = node.unique_id(), + "[ElasticsearchSearchStore] Failed to delete {}", + node.node_type.to_string() + ); + } Ok(()) } async fn delete_data_source_nodes(&self, data_source_id: &str) -> Result<()> { - self.client + let response = self + .client .delete_by_query(DeleteByQueryParts::Index(&[NODES_INDEX_NAME])) .body(json!({ "query": { @@ -190,6 +211,15 @@ impl SearchStore for ElasticsearchSearchStore { })) .send() .await?; + // todo(kw-search): fail on error + if !response.status_code().is_success() { + let error = response.json::().await?; + error!( + error = serde_json::to_string_pretty(&error)?, + data_source_id = data_source_id, + "[ElasticsearchSearchStore] Failed to delete data source nodes" + ); + } Ok(()) }