Skip to content

Commit

Permalink
Remove flight
Browse files Browse the repository at this point in the history
  • Loading branch information
magbak committed Oct 30, 2024
1 parent 6542a51 commit 9835f7f
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 214 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ chrontext/pki-server

# These are backup files generated by rustfmt
**/*.rs.bk
skratch.py
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 6 additions & 59 deletions lib/flight/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::Ticket;
use bincode::{deserialize, serialize};
use futures::TryStreamExt;
use log::info;
use polars::io::SerReader;
use polars::prelude::{IntoLazy, IpcStreamReader, PolarsError};
use polars::prelude::{PolarsError};
use query_processing::errors::QueryProcessingError;
use query_processing::graph_patterns::union;
use representation::solution_mapping::SolutionMappings;
use representation::RDFNodeType;
use secrecy::{ExposeSecret, SecretString};
use secrecy::{SecretString};
use std::collections::HashMap;
use std::str::FromStr;
use thiserror::*;
use tonic::metadata::MetadataKey;
use tonic::transport::{Channel, Endpoint};
use tonic::{Request, Status};
use tonic::{Status};

#[derive(Error, Debug)]
pub enum ChrontextFlightClientError {
Expand All @@ -35,63 +24,21 @@ pub enum ChrontextFlightClientError {

#[derive(Clone)]
pub struct ChrontextFlightClient {
client: Option<FlightServiceClient<Channel>>,
uri: String,
}

impl ChrontextFlightClient {
pub fn new(uri: &str) -> ChrontextFlightClient {
Self {
client: None,
uri: uri.to_string(),
}
}

pub async fn query(
&mut self,
query: &str,
metadata: &HashMap<String, SecretString>,
_query: &str,
_metadata: &HashMap<String, SecretString>,
) -> Result<SolutionMappings, ChrontextFlightClientError> {
let endpoint = Endpoint::from_str(&self.uri)
.map_err(|e| ChrontextFlightClientError::ConnectError(e))?;
let mut client = if let Some(client) = self.client.take() {
client
} else {
FlightServiceClient::connect(endpoint)
.await
.map_err(|x| ChrontextFlightClientError::ConnectError(x))?
};
info!("Building request");
let mut request = Request::new(Ticket::new(serialize(query).unwrap()));
for (k, v) in metadata {
request.metadata_mut().insert(
MetadataKey::from_str(k).unwrap().to_owned(),
v.expose_secret().parse().unwrap(),
);
}
info!("Sending request");
let mut flight_data = client
.do_get(request)
.await
.map_err(|x| ChrontextFlightClientError::QueryExecutionError(x))?;
info!("Retrieving data");
let batches: Vec<_> = flight_data
.get_mut()
.try_collect()
.await
.map_err(|x| ChrontextFlightClientError::QueryExecutionError(x))?;
let mut mappings = vec![];
for b in batches {
let type_map: HashMap<String, RDFNodeType> = deserialize(&b.app_metadata)
.map_err(|x| ChrontextFlightClientError::TypesDeserializationError(x))?;
let df = IpcStreamReader::new(b.data_body.as_ref())
.finish()
.map_err(|x| ChrontextFlightClientError::PolarsDeserializationError(x))?;
mappings.push(SolutionMappings::new(df.lazy(), type_map));
}
let solution_mappings =
union(mappings, false).map_err(|x| ChrontextFlightClientError::UnionError(x))?;
self.client = Some(client);
Ok(solution_mappings)
unimplemented!("Contact Data Treehouse to try")
}
}
155 changes: 2 additions & 153 deletions lib/flight/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,7 @@
// Based on: https://github.com/apache/arrow-rs/blob/master/arrow-flight/examples/server.rs @ e7ce4bb

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::net::AddrParseError;
use futures::stream::BoxStream;
use std::pin::Pin;
use std::sync::Arc;
use tonic::{Code, Request, Response, Status, Streaming};

use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PollInfo, PutResult,
SchemaResult, Ticket,
};
use bincode::deserialize;
use bincode::serialize;
use chrontext::engine::Engine;
use futures::{stream, Stream};
use polars::io::SerWriter;
use polars::prelude::{IpcCompression, IpcStreamWriter};
use thiserror::*;
use tonic::transport::Server;
use log::info;

#[derive(Error, Debug)]
pub enum ChrontextFlightServerError {
Expand All @@ -52,111 +16,6 @@ pub struct ChrontextFlightService {
engine: Option<Arc<Engine>>,
}

#[tonic::async_trait]
impl FlightService for ChrontextFlightService {
type HandshakeStream = BoxStream<'static, Result<HandshakeResponse, Status>>;
type ListFlightsStream = BoxStream<'static, Result<FlightInfo, Status>>;
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
type DoPutStream = BoxStream<'static, Result<PutResult, Status>>;
type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status>>;
type ListActionsStream = BoxStream<'static, Result<ActionType, Status>>;
type DoExchangeStream = BoxStream<'static, Result<FlightData, Status>>;

async fn handshake(
&self,
_request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, Status> {
Err(Status::unimplemented("Implement handshake"))
}

async fn list_flights(
&self,
_request: Request<Criteria>,
) -> Result<Response<Self::ListFlightsStream>, Status> {
Err(Status::unimplemented("Implement list_flights"))
}

async fn get_flight_info(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented("Implement get_flight_info"))
}

async fn poll_flight_info(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<PollInfo>, Status> {
Err(Status::unimplemented("Implement poll_flight_info"))
}

async fn get_schema(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, Status> {
Err(Status::unimplemented("Implement get_schema"))
}

async fn do_get(
&self,
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, Status> {
info!("Got do_get request: {:?}", request);
let query_string: String = deserialize(request.get_ref().ticket.as_ref()).unwrap();
let (mut df, map, _context) = self
.engine.as_ref().unwrap()
.clone()
.query(&query_string)
.await
.map_err(|x| Status::new(Code::Internal, x.to_string()))?;
let map_bytes = serialize(&map).unwrap();

let mut df_bytes = vec![];
let mut writer = IpcStreamWriter::new(&mut df_bytes)
.with_compression(Some(IpcCompression::LZ4))
.with_pl_flavor(true);
writer.finish(&mut df).unwrap();

let flight_data = FlightData::new()
.with_app_metadata(map_bytes)
.with_data_body(df_bytes);
// Adapted from: https://github.com/apache/arrow-rs/blob/master/arrow-flight/examples/flight_sql_server.rs @ 7781bc2
let stream: Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send>> =
Box::pin(stream::iter(vec![Ok(flight_data)]));
let resp = Response::new(stream);
info!("Finished processing request {:?}", request);
Ok(resp)
}

async fn do_put(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
Err(Status::unimplemented("Implement do_put"))
}

async fn do_action(
&self,
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, Status> {
Err(Status::unimplemented("Implement do_action"))
}

async fn list_actions(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, Status> {
Err(Status::unimplemented("Implement list_actions"))
}

async fn do_exchange(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, Status> {
Err(Status::unimplemented("Implement do_exchange"))
}
}

pub struct ChrontextFlightServer {
chrontext_flight_service: ChrontextFlightService,
}
Expand All @@ -168,17 +27,7 @@ impl ChrontextFlightServer {
}
}

pub async fn serve(self, addr: &str) -> Result<(), ChrontextFlightServerError> {
info!("Starting server on {}", addr);
let addr = addr.parse().map_err(|x|ChrontextFlightServerError::AddrParseError(x))?;
let svc = FlightServiceServer::new(self.chrontext_flight_service.clone());

Server::builder()
.add_service(svc)
.serve(addr)
.await
.map_err(|x| ChrontextFlightServerError::TonicTransportError(x))?;
info!("Shutdown server");
Ok(())
pub async fn serve(self, _addr: &str) -> Result<(), ChrontextFlightServerError> {
unimplemented!("Contact Data Treehouse to try")
}
}
2 changes: 1 addition & 1 deletion py_chrontext/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "py_chrontext"
version = "0.9.11"
version = "0.9.12"
edition = "2021"

[dependencies]
Expand Down

0 comments on commit 9835f7f

Please sign in to comment.