Skip to content

Commit

Permalink
Add metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
magbak committed Oct 26, 2024
1 parent 0ad95cd commit d11307e
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 7 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ chrono = "0.4.37"
filesize = "0.2.0"
serde = "1.0.203"
serde_json = "1.0.117"
secrecy = "=0.10.3"
backoff = "0.4.0"
gcp-bigquery-client = "0.20.0"
rayon = "1.10.0"
Expand Down
3 changes: 2 additions & 1 deletion lib/flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ futures.workspace = true
bincode.workspace = true
polars = {workspace = true, features = ["polars-io", "ipc_streaming"]}
thiserror.workspace = true
log = "0.4.22"
secrecy.workspace = true
log.workspace = true
7 changes: 6 additions & 1 deletion lib/flight/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use representation::RDFNodeType;
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
use secrecy::{ExposeSecret, SecretString};
use thiserror::*;
use tonic::transport::{Channel, Endpoint};
use tonic::{Request, Status};
Expand Down Expand Up @@ -49,6 +50,7 @@ impl ChrontextFlightClient {
pub async fn query(
&mut self,
query: &str,
metadata: &HashMap<String, SecretString>,
) -> Result<SolutionMappings, ChrontextFlightClientError> {
let endpoint = Endpoint::from_str(&self.uri)
.map_err(|e| ChrontextFlightClientError::ConnectError(e))?;
Expand All @@ -60,7 +62,10 @@ impl ChrontextFlightClient {
.map_err(|x| ChrontextFlightClientError::ConnectError(x))?
};
info!("Building request");
let request = Request::new(Ticket::new(serialize(query).unwrap()));
let mut request = Request::new(Ticket::new(serialize(query).unwrap()));
for (k,v) in metadata {
request.metadata_mut().insert(k, v.expose_secret().parse().unwrap());
}
info!("Sending request");
let mut flight_data = client
.do_get(request)
Expand Down
1 change: 1 addition & 0 deletions py_chrontext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ log.workspace = true
env_logger.workspace = true
spargebra.workspace = true
oxrdfio.workspace = true
secrecy.workspace = true

[lib]
name = "chrontext"
Expand Down
3 changes: 2 additions & 1 deletion py_chrontext/chrontext/chrontext.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,11 @@ class DataProduct:


class FlightClient:
def __init__(self, uri:str):
def __init__(self, uri:str, metadata:Dict[str, str]=None):
"""
:param uri: The URI of the Flight server (see engine.serve_flight())
:param metadata: gRPC metadata to add to each request
"""

def query(self,
Expand Down
17 changes: 13 additions & 4 deletions py_chrontext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use representation::solution_mapping::EagerSolutionMappings;
use representation::BaseRDFNodeType;
use std::collections::HashMap;
use std::sync::Arc;
use secrecy::SecretString;
use templates::python::{a, py_triple, PyArgument, PyInstance, PyParameter, PyTemplate, PyXSD};
use tokio::runtime::Builder;
use virtualization::bigquery::VirtualizedBigQueryDatabase;
Expand Down Expand Up @@ -292,14 +293,21 @@ impl PyEngine {
#[derive(Clone)]
#[pyclass(name = "FlightClient")]
pub struct PyFlightClient {
uri:String
uri:String,
metadata: HashMap<String, SecretString>
}

#[pymethods]
impl PyFlightClient {
#[new]
pub fn new(uri:String) -> PyResult<Self> {
Ok(Self {uri})
pub fn new(uri:String, metadata: Option<HashMap<String, String>>) -> PyResult<Self> {
let mut metadata_s = HashMap::new();
if let Some(metadata) = metadata {
for (k,v) in metadata {
metadata_s.insert(k, SecretString::from(v));
}
}
Ok(Self {uri, metadata:metadata_s})
}

pub fn query(
Expand All @@ -311,6 +319,7 @@ impl PyFlightClient {
) -> PyResult<PyObject> {
let sparql = sparql.to_string();
let res = py.allow_threads(move || {

let sparql = sparql;
let mut builder = Builder::new_multi_thread();
builder.enable_all();
Expand All @@ -320,7 +329,7 @@ impl PyFlightClient {
let sm = builder
.build()
.unwrap()
.block_on(client.query(&sparql))
.block_on(client.query(&sparql, &self.metadata))
.map_err(|x|PyChrontextError::FlightClientError(x))?;
Ok(sm)
});
Expand Down

0 comments on commit d11307e

Please sign in to comment.