diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 8a5ece43..5ccca0ab 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -23,7 +23,7 @@ jobs: - uses: actions/checkout@v2 - name: Starting Gremlin Servers run: | - docker-compose -f ./docker-compose/docker-compose.yaml up -d + docker compose -f ./docker-compose/docker-compose.yaml up -d env: GREMLIN_SERVER: ${{ matrix.gremlin-server }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c6ee807a..c209a85c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,7 +23,7 @@ jobs: - uses: actions/checkout@v2 - name: Starting Gremlin Servers run: | - docker-compose -f ./docker-compose/docker-compose.yaml up -d + docker compose -f ./docker-compose/docker-compose.yaml up -d --wait --wait-timeout 90 env: GREMLIN_SERVER: ${{ matrix.gremlin-server }} @@ -42,12 +42,27 @@ jobs: command: fmt args: --all -- --check - name: Run cargo test with tokio + if: matrix.gremlin-server == '3.5.7' uses: actions-rs/cargo@v1 with: command: test args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime - name: Run cargo test with async-std + if: matrix.gremlin-server == '3.5.7' uses: actions-rs/cargo@v1 with: command: test args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime + # MergeV as a step doesn't exist in 3.5.x, so selectively run those tests + - name: Run cargo test with tokio + if: matrix.gremlin-server != '3.5.7' + uses: actions-rs/cargo@v1 + with: + command: test + args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime,merge_tests + - name: Run cargo test with async-std + if: matrix.gremlin-server != '3.5.7' + uses: actions-rs/cargo@v1 + with: + command: test + args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime,merge_tests diff --git a/docker-compose/docker-compose.yaml b/docker-compose/docker-compose.yaml index 8262faac..38817ac5 100644 --- a/docker-compose/docker-compose.yaml +++ b/docker-compose/docker-compose.yaml @@ -18,3 +18,16 @@ services: command : ["conf/gremlin-server-credentials.yaml"] ports: - "8183:8182" + janusgraph: + image: janusgraph/janusgraph:latest + environment: + - janusgraph.graph.set-vertex-id=true + - janusgraph.graph.allow-custom-vid-types=true + - JANUS_PROPS_TEMPLATE=inmemory + ports: + - "8184:8182" + healthcheck: + test: ["CMD", "bin/gremlin.sh", "-e", "scripts/remote-connect.groovy"] + interval: 10s + timeout: 30s + retries: 3 diff --git a/gremlin-client/Cargo.toml b/gremlin-client/Cargo.toml index 69cbd0d6..e3b6dfa4 100644 --- a/gremlin-client/Cargo.toml +++ b/gremlin-client/Cargo.toml @@ -15,14 +15,15 @@ readme = "README.md" [features] default = [] +merge_tests = [] async_gremlin = ["futures","mobc","async-tungstenite","async-trait","url","pin-project-lite"] async_std = ["async-std-runtime"] -tokio-runtime = ["async_gremlin","tokio","mobc/tokio","async-tungstenite/tokio-runtime","async-tungstenite/tokio-native-tls","tokio-native-tls","tokio-stream"] -async-std-runtime = ["async_gremlin","async-std","async-tungstenite/async-std-runtime","async-tungstenite/async-tls","mobc/async-std","async-tls","rustls","webpki"] +tokio-runtime = ["async_gremlin","tokio","async-tungstenite/tokio-runtime","async-tungstenite/tokio-native-tls","tokio-native-tls","tokio-stream"] +async-std-runtime = ["async_gremlin","async-std","async-tungstenite/async-std-runtime","async-tungstenite/async-tls","tokio/sync", "mobc/async-std","async-tls","rustls","webpki"] derive = ["gremlin-derive"] @@ -57,7 +58,7 @@ thiserror = "1.0.20" -mobc = {version = "0.7", optional = true, default-features=false, features = ["unstable"] } +mobc = {version = "0.8", optional = true } url = {version = "2.1.0", optional = true} futures = { version = "0.3.1", optional = true} pin-project-lite = { version = "0.2", optional = true} diff --git a/gremlin-client/src/aio/client.rs b/gremlin-client/src/aio/client.rs index c2f7a727..ad3c52f2 100644 --- a/gremlin-client/src/aio/client.rs +++ b/gremlin-client/src/aio/client.rs @@ -59,6 +59,9 @@ impl GremlinClient { let pool = Pool::builder() .get_timeout(opts.pool_get_connection_timeout) .max_open(pool_size as u64) + .health_check_interval(opts.pool_healthcheck_interval) + //Makes max idle connections equal to max open, matching the behavior of the sync pool r2d2 + .max_idle(0) .build(manager); Ok(GremlinClient { diff --git a/gremlin-client/src/aio/connection.rs b/gremlin-client/src/aio/connection.rs index 0e74e264..a0f311e0 100644 --- a/gremlin-client/src/aio/connection.rs +++ b/gremlin-client/src/aio/connection.rs @@ -21,6 +21,7 @@ mod tokio_use { pub use tokio_native_tls::TlsStream; } +use futures::TryFutureExt; #[cfg(feature = "tokio-runtime")] use tokio_use::*; @@ -144,6 +145,7 @@ impl Conn { tls::connector(&opts), websocket_config, ) + .map_err(|e| Arc::new(e)) .await? }; #[cfg(feature = "tokio-runtime")] @@ -153,6 +155,7 @@ impl Conn { tls::connector(&opts), websocket_config, ) + .map_err(|e| Arc::new(e)) .await? }; @@ -190,6 +193,18 @@ impl Conn { .await .expect("It should contain the response") .map(|r| (r, receiver)) + .map_err(|e| { + //If there's been an websocket layer error, mark the connection as invalid + match e { + GremlinError::WebSocket(_) + | GremlinError::WebSocketAsync(_) + | GremlinError::WebSocketPoolAsync(_) => { + self.valid = false; + } + _ => {} + } + e + }) } pub fn is_valid(&self) -> bool { @@ -222,7 +237,7 @@ fn sender_loop( if let Err(e) = sink.send(Message::Binary(msg.2)).await { let mut sender = guard.remove(&msg.1).unwrap(); sender - .send(Err(GremlinError::from(e))) + .send(Err(GremlinError::from(Arc::new(e)))) .await .expect("Failed to send error"); } @@ -257,8 +272,9 @@ fn receiver_loop( match stream.next().await { Some(Err(error)) => { let mut guard = requests.lock().await; + let error = Arc::new(error); for s in guard.values_mut() { - match s.send(Err(GremlinError::from(&error))).await { + match s.send(Err(error.clone().into())).await { Ok(_r) => {} Err(_e) => {} } diff --git a/gremlin-client/src/aio/error.rs b/gremlin-client/src/aio/error.rs deleted file mode 100644 index 5861c77b..00000000 --- a/gremlin-client/src/aio/error.rs +++ /dev/null @@ -1,15 +0,0 @@ -use crate::GremlinError; -use async_tungstenite::tungstenite; - -impl From<&tungstenite::error::Error> for GremlinError { - fn from(e: &tungstenite::error::Error) -> GremlinError { - let error = match e { - tungstenite::error::Error::AlreadyClosed => tungstenite::error::Error::AlreadyClosed, - tungstenite::error::Error::ConnectionClosed => { - tungstenite::error::Error::ConnectionClosed - } - _ => return GremlinError::Generic(format!("Error from ws {}", e)), - }; - GremlinError::WebSocketAsync(error) - } -} diff --git a/gremlin-client/src/aio/mod.rs b/gremlin-client/src/aio/mod.rs index 03047c06..6cb6a773 100644 --- a/gremlin-client/src/aio/mod.rs +++ b/gremlin-client/src/aio/mod.rs @@ -3,7 +3,6 @@ pub(crate) mod connection; pub(crate) mod pool; mod result; -mod error; pub(crate) mod process; pub use client::GremlinClient; pub use process::traversal::AsyncTerminator; diff --git a/gremlin-client/src/aio/process/traversal/mod.rs b/gremlin-client/src/aio/process/traversal/mod.rs index b6ddd1a9..cdd0a4b2 100644 --- a/gremlin-client/src/aio/process/traversal/mod.rs +++ b/gremlin-client/src/aio/process/traversal/mod.rs @@ -6,6 +6,7 @@ use crate::GremlinResult; use core::task::Context; use core::task::Poll; use futures::Stream; +use futures::StreamExt; use std::marker::PhantomData; use std::pin::Pin; @@ -29,6 +30,17 @@ impl RemoteTraversalStream { } } } + +impl RemoteTraversalStream { + pub async fn iterate(&mut self) -> GremlinResult<()> { + while let Some(response) = self.next().await { + //consume the entire stream, returning any errors + response?; + } + Ok(()) + } +} + impl Stream for RemoteTraversalStream { type Item = GremlinResult; diff --git a/gremlin-client/src/connection.rs b/gremlin-client/src/connection.rs index 16fa44a4..7b31462a 100644 --- a/gremlin-client/src/connection.rs +++ b/gremlin-client/src/connection.rs @@ -1,4 +1,4 @@ -use std::{net::TcpStream, time::Duration}; +use std::{net::TcpStream, sync::Arc, time::Duration}; use crate::{GraphSON, GremlinError, GremlinResult}; use native_tls::TlsConnector; @@ -63,11 +63,12 @@ impl ConnectionStream { fn send(&mut self, payload: Vec) -> GremlinResult<()> { self.0 .write_message(Message::Binary(payload)) + .map_err(|e| Arc::new(e)) .map_err(GremlinError::from) } fn recv(&mut self) -> GremlinResult> { - match self.0.read_message()? { + match self.0.read_message().map_err(|e| Arc::new(e))? { Message::Binary(binary) => Ok(binary), _ => unimplemented!(), } @@ -120,6 +121,16 @@ impl ConnectionOptionsBuilder { self } + /// Only applicable to async client. By default a connection is checked on each return to the pool (None) + /// This allows setting an interval of how often it is checked on return. + pub fn pool_healthcheck_interval( + mut self, + pool_healthcheck_interval: Option, + ) -> Self { + self.0.pool_healthcheck_interval = pool_healthcheck_interval; + self + } + /// Both the sync and async pool providers use a default of 30 seconds, /// Async pool interprets `None` as no timeout. Sync pool maps `None` to the default value pub fn pool_connection_timeout(mut self, pool_connection_timeout: Option) -> Self { @@ -170,6 +181,7 @@ pub struct ConnectionOptions { pub(crate) host: String, pub(crate) port: u16, pub(crate) pool_size: u32, + pub(crate) pool_healthcheck_interval: Option, pub(crate) pool_get_connection_timeout: Option, pub(crate) credentials: Option, pub(crate) ssl: bool, @@ -254,6 +266,7 @@ impl Default for ConnectionOptions { port: 8182, pool_size: 10, pool_get_connection_timeout: Some(Duration::from_secs(30)), + pool_healthcheck_interval: None, credentials: None, ssl: false, tls_options: None, diff --git a/gremlin-client/src/conversion.rs b/gremlin-client/src/conversion.rs index fbe60d25..fd2102dd 100644 --- a/gremlin-client/src/conversion.rs +++ b/gremlin-client/src/conversion.rs @@ -1,6 +1,6 @@ use crate::{ process::traversal::Bytecode, - structure::{TextP, P as Predicate}, + structure::{Null, TextP, P as Predicate}, Edge, GKey, GValue, GremlinError, GremlinResult, IntermediateRepr, List, Map, Metric, Path, Property, Token, TraversalExplanation, TraversalMetrics, Vertex, VertexProperty, GID, }; @@ -134,9 +134,23 @@ impl_from_gvalue!(IntermediateRepr, GValue::IntermediateRepr); impl_from_gvalue!(chrono::DateTime, GValue::Date); impl_from_gvalue!(Traverser, GValue::Traverser); +impl FromGValue for Null { + fn from_gvalue(v: GValue) -> GremlinResult { + match v { + GValue::Null => Ok(crate::structure::Null {}), + _ => Err(GremlinError::Cast(format!( + "Cannot convert {:?} to {}", + v, + stringify!($t) + ))), + } + } +} + impl FromGValue for GKey { fn from_gvalue(v: GValue) -> GremlinResult { match v { + GValue::Direction(d) => Ok(GKey::Direction(d)), GValue::String(s) => Ok(GKey::String(s)), GValue::Token(s) => Ok(GKey::String(s.value().clone())), GValue::Vertex(s) => Ok(GKey::Vertex(s)), diff --git a/gremlin-client/src/error.rs b/gremlin-client/src/error.rs index a803075b..58e01214 100644 --- a/gremlin-client/src/error.rs +++ b/gremlin-client/src/error.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::structure::GValue; use thiserror::Error; @@ -34,7 +36,10 @@ pub enum GremlinError { #[cfg(feature = "async_gremlin")] #[error(transparent)] - WebSocketAsync(#[from] async_tungstenite::tungstenite::Error), + WebSocketAsync(#[from] Arc), + #[cfg(feature = "async_gremlin")] + #[error(transparent)] + WebSocketPoolAsync(#[from] Arc>), #[cfg(feature = "async_gremlin")] #[error(transparent)] ChannelSend(#[from] futures::channel::mpsc::SendError), @@ -47,10 +52,7 @@ impl From> for GremlinError { fn from(e: mobc::Error) -> GremlinError { match e { mobc::Error::Inner(e) => e, - mobc::Error::BadConn => { - GremlinError::Generic(String::from("Async pool bad connection")) - } - mobc::Error::Timeout => GremlinError::Generic(String::from("Async pool timeout")), + other => GremlinError::WebSocketPoolAsync(Arc::new(other)), } } } diff --git a/gremlin-client/src/io/mod.rs b/gremlin-client/src/io/mod.rs index d5898758..214d59ef 100644 --- a/gremlin-client/src/io/mod.rs +++ b/gremlin-client/src/io/mod.rs @@ -5,7 +5,7 @@ mod serializer_v3; use crate::conversion::ToGValue; use crate::process::traversal::{Order, Scope}; -use crate::structure::{Cardinality, GValue, T}; +use crate::structure::{Cardinality, Direction, GValue, Merge, T}; use serde_json::{json, Map, Value}; use std::string::ToString; @@ -216,7 +216,38 @@ impl GraphSON { "@value" : v })) } - + (_, GValue::Merge(merge)) => { + let merge_option = match merge { + Merge::OnCreate => "onCreate", + Merge::OnMatch => "onMatch", + Merge::OutV => "outV", + Merge::InV => "inV", + }; + Ok(json!({ + "@type" : "g:Merge", + "@value" : merge_option + })) + } + (_, GValue::Direction(direction)) => { + let direction = match direction { + Direction::Out | Direction::From => "OUT", + Direction::In | Direction::To => "IN", + }; + Ok(json!({ + "@type" : "g:Direction", + "@value" : direction, + })) + } + (_, GValue::Column(column)) => { + let column = match column { + crate::structure::Column::Keys => "keys", + crate::structure::Column::Values => "values", + }; + Ok(json!({ + "@type" : "g:Column", + "@value" : column, + })) + } (_, _) => panic!("Type {:?} not supported.", value), } } diff --git a/gremlin-client/src/io/serializer_v2.rs b/gremlin-client/src/io/serializer_v2.rs index 9e494c32..a67b2fae 100644 --- a/gremlin-client/src/io/serializer_v2.rs +++ b/gremlin-client/src/io/serializer_v2.rs @@ -125,6 +125,20 @@ where Ok(GValue::Token(token)) } +pub fn deserialize_direction(_: &T, val: &Value) -> GremlinResult +where + T: Fn(&Value) -> GremlinResult, +{ + let val = get_value!(val, Value::String)?; + match val.as_str() { + "OUT" => Ok(GValue::Direction(crate::structure::Direction::Out)), + "IN" => Ok(GValue::Direction(crate::structure::Direction::In)), + other => Err(GremlinError::Cast(format!( + "Unknown direction literal {other}" + ))), + } +} + // Vertex deserializer [docs](http://tinkerpop.apache.org/docs/current/dev/io/#_vertex_3) pub fn deserialize_vertex(reader: &T, val: &Value) -> GremlinResult where @@ -360,7 +374,8 @@ g_serializer_2!(deserializer_v2, { "g:TraversalMetrics" => deserialize_metrics, "g:Metrics" => deserialize_metric, "g:TraversalExplanation" => deserialize_explain, - "g:Traverser" => deserialize_traverser + "g:Traverser" => deserialize_traverser, + "g:Direction" => deserialize_direction }); fn deserialize_vertex_properties( diff --git a/gremlin-client/src/io/serializer_v3.rs b/gremlin-client/src/io/serializer_v3.rs index 8b5ea3bb..ec96fc7b 100644 --- a/gremlin-client/src/io/serializer_v3.rs +++ b/gremlin-client/src/io/serializer_v3.rs @@ -337,6 +337,20 @@ where Ok(Property::new(label, v).into()) } +pub fn deserialize_direction(_: &T, val: &Value) -> GremlinResult +where + T: Fn(&Value) -> GremlinResult, +{ + let val = get_value!(val, Value::String)?; + match val.as_str() { + "OUT" => Ok(GValue::Direction(crate::structure::Direction::Out)), + "IN" => Ok(GValue::Direction(crate::structure::Direction::In)), + other => Err(GremlinError::Cast(format!( + "Unknown direction literal {other}" + ))), + } +} + // Traverser deserializer [docs](http://tinkerpop.apache.org/docs/3.4.1/dev/io/#_traverser_2) pub fn deserialize_traverser(reader: &T, val: &Value) -> GremlinResult where @@ -368,7 +382,8 @@ g_serializer!(deserializer_v3, { "g:TraversalMetrics" => deserialize_metrics, "g:Metrics" => deserialize_metric, "g:TraversalExplanation" => deserialize_explain, - "g:Traverser" => deserialize_traverser + "g:Traverser" => deserialize_traverser, + "g:Direction" => deserialize_direction }); fn deserialize_vertex_properties( diff --git a/gremlin-client/src/process/traversal/anonymous_traversal_source.rs b/gremlin-client/src/process/traversal/anonymous_traversal_source.rs index 63293f33..8765697a 100644 --- a/gremlin-client/src/process/traversal/anonymous_traversal_source.rs +++ b/gremlin-client/src/process/traversal/anonymous_traversal_source.rs @@ -10,6 +10,9 @@ use crate::process::traversal::TraversalBuilder; use crate::structure::{Either2, GIDs, IntoPredicate, Labels, T}; use crate::GValue; +use super::merge_edge::MergeEdgeStep; +use super::merge_vertex::MergeVertexStep; + pub struct AnonymousTraversalSource { traversal: TraversalBuilder, } @@ -39,6 +42,24 @@ impl AnonymousTraversalSource { self.traversal.clone().add_v(label) } + pub fn identity(&self) -> TraversalBuilder { + self.traversal.clone().identity() + } + + pub fn merge_v(&self, merge_v: V) -> TraversalBuilder + where + V: Into, + { + self.traversal.clone().merge_v(merge_v) + } + + pub fn merge_e(&self, merge_e: V) -> TraversalBuilder + where + V: Into, + { + self.traversal.clone().merge_e(merge_e) + } + pub fn property(&self, key: Either2<&str, T>, value: A) -> TraversalBuilder where A: Into, @@ -46,6 +67,13 @@ impl AnonymousTraversalSource { self.traversal.clone().property(key, value) } + pub fn properties(&self, labels: L) -> TraversalBuilder + where + L: Into, + { + self.traversal.clone().properties(labels) + } + pub fn v(&self, ids: T) -> TraversalBuilder where T: Into, diff --git a/gremlin-client/src/process/traversal/builder.rs b/gremlin-client/src/process/traversal/builder.rs index f2199ed5..49491aea 100644 --- a/gremlin-client/src/process/traversal/builder.rs +++ b/gremlin-client/src/process/traversal/builder.rs @@ -13,7 +13,6 @@ use crate::process::traversal::step::not::NotStep; use crate::process::traversal::step::or::OrStep; use crate::process::traversal::step::repeat::RepeatStep; use crate::process::traversal::step::select::SelectStep; -use crate::process::traversal::step::side_effect::IntoSideEffectStep; use crate::process::traversal::step::to::ToStep; use crate::process::traversal::step::until::UntilStep; use crate::process::traversal::step::where_step::WhereStep; @@ -22,6 +21,11 @@ use crate::process::traversal::{Bytecode, Scope}; use crate::structure::{Cardinality, GIDs, IntoPredicate, Labels}; use crate::GValue; +use super::merge_edge::MergeEdgeStep; +use super::merge_vertex::MergeVertexStep; +use super::option::OptionStep; +use super::side_effect::SideEffectStep; + #[derive(Clone)] pub struct TraversalBuilder { pub(crate) bytecode: Bytecode, @@ -98,9 +102,10 @@ impl TraversalBuilder { self } - pub fn property_many(mut self, values: Vec<(String, A)>) -> Self + pub fn property_many(mut self, values: Vec<(K, V)>) -> Self where - A: Into, + K: Into, + V: Into, { for property in values { self.bytecode.add_step( @@ -112,18 +117,19 @@ impl TraversalBuilder { self } - pub fn property_with_cardinality( + pub fn property_with_cardinality( mut self, cardinality: Cardinality, - key: &str, - value: A, + key: K, + value: V, ) -> Self where - A: Into, + K: Into, + V: Into, { self.bytecode.add_step( String::from("property"), - vec![cardinality.into(), String::from(key).into(), value.into()], + vec![cardinality.into(), key.into(), value.into()], ); self } @@ -139,10 +145,10 @@ impl TraversalBuilder { pub fn side_effect(mut self, step: A) -> Self where - A: IntoSideEffectStep, + A: Into, { self.bytecode - .add_step(String::from("sideEffect"), step.into_step()); + .add_step(String::from("sideEffect"), step.into().into()); self } @@ -293,6 +299,12 @@ impl TraversalBuilder { self } + pub fn none(mut self) -> Self { + self.bytecode.add_step(String::from("none"), vec![]); + + self + } + pub fn label(mut self) -> Self { self.bytecode.add_step(String::from("label"), vec![]); @@ -653,6 +665,36 @@ impl TraversalBuilder { self } + pub fn merge_v(mut self, merge_v: A) -> Self + where + A: Into, + { + self.bytecode + .add_step(String::from("mergeV"), merge_v.into().into()); + + self + } + + pub fn merge_e(mut self, merge_e: A) -> Self + where + A: Into, + { + self.bytecode + .add_step(String::from("mergeE"), merge_e.into().into()); + + self + } + + pub fn option(mut self, option: A) -> Self + where + A: Into, + { + self.bytecode + .add_step(String::from("option"), option.into().into()); + + self + } + pub fn identity(mut self) -> Self { self.bytecode.add_step(String::from("identity"), vec![]); self diff --git a/gremlin-client/src/process/traversal/bytecode.rs b/gremlin-client/src/process/traversal/bytecode.rs index c9c23cec..31c99ed6 100644 --- a/gremlin-client/src/process/traversal/bytecode.rs +++ b/gremlin-client/src/process/traversal/bytecode.rs @@ -39,7 +39,7 @@ impl Bytecode { lazy_static! { pub static ref WRITE_OPERATORS: Vec<&'static str> = - vec!["addV", "property", "addE", "from", "to", "drop"]; + vec!["addV", "property", "addE", "from", "to", "drop", "mergeV", "mergeE"]; } #[derive(Debug, PartialEq, Clone)] diff --git a/gremlin-client/src/process/traversal/graph_traversal.rs b/gremlin-client/src/process/traversal/graph_traversal.rs index 6542adec..be734946 100644 --- a/gremlin-client/src/process/traversal/graph_traversal.rs +++ b/gremlin-client/src/process/traversal/graph_traversal.rs @@ -22,13 +22,18 @@ use crate::process::traversal::strategies::{ RemoteStrategy, TraversalStrategies, TraversalStrategy, }; use crate::process::traversal::{Bytecode, Scope, TraversalBuilder, WRITE_OPERATORS}; -use crate::structure::{Cardinality, Labels}; +use crate::structure::{Cardinality, Labels, Null}; use crate::{ structure::GIDs, structure::GProperty, structure::IntoPredicate, Edge, GValue, GremlinClient, List, Map, Path, Vertex, }; use std::marker::PhantomData; +use super::merge_edge::MergeEdgeStep; +use super::merge_vertex::MergeVertexStep; +use super::option::OptionStep; +use super::side_effect::SideEffectStep; + #[derive(Clone)] pub struct GraphTraversal> { start: PhantomData, @@ -89,22 +94,24 @@ impl> GraphTraversal { GraphTraversal::new(self.terminator, self.builder) } - pub fn property(mut self, key: &str, value: A) -> Self + pub fn property(mut self, key: K, value: V) -> Self where - A: Into, + K: Into, + V: Into, { self.builder = self.builder.property(key, value); self } - pub fn property_with_cardinality( + pub fn property_with_cardinality( mut self, cardinality: Cardinality, - key: &str, - value: A, + key: K, + value: V, ) -> Self where - A: Into, + K: Into, + V: Into, { self.builder = self .builder @@ -112,30 +119,27 @@ impl> GraphTraversal { self } - pub fn property_many(mut self, values: Vec<(String, A)>) -> Self + pub fn property_many(mut self, values: Vec<(K, V)>) -> Self where - A: Into, + K: Into, + V: Into, { for property in values { - self.builder = self - .builder - .property::<&str, A>(property.0.as_ref(), property.1) + self.builder = self.builder.property(property.0, property.1) } self } - pub fn property_many_with_cardinality( - mut self, - values: Vec<(Cardinality, String, A)>, - ) -> Self + pub fn property_many_with_cardinality(mut self, values: Vec<(Cardinality, K, V)>) -> Self where - A: Into, + K: Into, + V: Into, { for property in values { - self.builder = - self.builder - .property_with_cardinality(property.0, property.1.as_ref(), property.2); + self.builder = self + .builder + .property_with_cardinality(property.0, property.1, property.2); } self @@ -184,6 +188,15 @@ impl> GraphTraversal { self } + pub fn side_effect(mut self, step: A) -> Self + where + A: Into, + { + self.builder = self.builder.side_effect(step); + + self + } + pub fn add_e(mut self, label: A) -> GraphTraversal where A: Into, @@ -289,6 +302,16 @@ impl> GraphTraversal { GraphTraversal::new(self.terminator, self.builder) } + ///Filters all objects from the traversal stream. Generally only useful for applying traversal sideffects and avoiding unwanted response I/O + pub fn none(mut self) -> GraphTraversal + where + T: Terminator, + { + self.builder = self.builder.none(); + + GraphTraversal::new(self.terminator, self.builder) + } + pub fn label(mut self) -> GraphTraversal where T: Terminator, @@ -669,6 +692,24 @@ impl> GraphTraversal { GraphTraversal::new(self.terminator, self.builder) } + pub fn merge_v(mut self, merge_v: A) -> GraphTraversal + where + A: Into, + T: Terminator, + { + self.builder = self.builder.merge_v(merge_v); + GraphTraversal::new(self.terminator, self.builder) + } + + pub fn merge_e(mut self, merge_e: A) -> GraphTraversal + where + A: Into, + T: Terminator, + { + self.builder = self.builder.merge_e(merge_e); + GraphTraversal::new(self.terminator, self.builder) + } + pub fn identity(mut self) -> Self { self.builder = self.builder.identity(); self @@ -689,6 +730,14 @@ impl> GraphTraversal { self } + pub fn option(mut self, step: A) -> Self + where + A: Into, + { + self.builder = self.builder.option(step); + self + } + pub fn optional(mut self, step: TraversalBuilder) -> Self { self.builder = self.builder.optional(step); self diff --git a/gremlin-client/src/process/traversal/graph_traversal_source.rs b/gremlin-client/src/process/traversal/graph_traversal_source.rs index 4de6df13..ac3a3199 100644 --- a/gremlin-client/src/process/traversal/graph_traversal_source.rs +++ b/gremlin-client/src/process/traversal/graph_traversal_source.rs @@ -15,6 +15,9 @@ use crate::structure::Labels; use crate::structure::{Edge, GValue, Vertex}; use crate::GremlinClient; +use super::merge_edge::MergeEdgeStep; +use super::merge_vertex::MergeVertexStep; + #[derive(Clone)] pub struct GraphTraversalSource> { term: A, @@ -122,6 +125,41 @@ impl> GraphTraversalSource { ); GraphTraversal::new(self.term.clone(), TraversalBuilder::new(code)) } + + pub fn inject(&self, injection: T) -> GraphTraversal + where + T: Into + FromGValue, + A: Terminator, + { + let mut code = Bytecode::new(); + + code.add_step(String::from("inject"), vec![injection.into()]); + GraphTraversal::new(self.term.clone(), TraversalBuilder::new(code)) + } + + pub fn merge_v(&self, merge_v: V) -> GraphTraversal + where + V: Into, + A: Terminator, + { + let mut code = Bytecode::new(); + + code.add_step(String::from("mergeV"), merge_v.into().into()); + + GraphTraversal::new(self.term.clone(), TraversalBuilder::new(code)) + } + + pub fn merge_e(&self, merge_e: V) -> GraphTraversal + where + V: Into, + A: Terminator, + { + let mut code = Bytecode::new(); + + code.add_step(String::from("mergeE"), merge_e.into().into()); + + GraphTraversal::new(self.term.clone(), TraversalBuilder::new(code)) + } } // TESTS @@ -982,5 +1020,25 @@ mod tests { ); } - // g.V().hasLabel('person').coalesce(values('nickname'), values('name')) + #[test] + fn inject_test() { + let g = empty(); + + let mut code = Bytecode::new(); + + code.add_step( + String::from("inject"), + vec![GValue::List(vec!["foo".into(), "bar".into()].into())], + ); + code.add_step(String::from("unfold"), vec![]); + + assert_eq!( + &code, + g.inject(vec!["foo".into(), "bar".into()]) + .unfold() + .bytecode() + ); + } + + //TODO add tests for mergeV, etc } diff --git a/gremlin-client/src/process/traversal/mod.rs b/gremlin-client/src/process/traversal/mod.rs index 2826bd2e..4001c2d2 100644 --- a/gremlin-client/src/process/traversal/mod.rs +++ b/gremlin-client/src/process/traversal/mod.rs @@ -47,6 +47,16 @@ impl RemoteTraversalIterator { } } +impl RemoteTraversalIterator { + pub fn iterate(&mut self) -> GremlinResult<()> { + while let Some(response) = self.next() { + //consume the entire iterator, returning any errors + response?; + } + Ok(()) + } +} + impl Iterator for RemoteTraversalIterator { type Item = GremlinResult; diff --git a/gremlin-client/src/process/traversal/step/by.rs b/gremlin-client/src/process/traversal/step/by.rs index bc12d1e9..3c377581 100644 --- a/gremlin-client/src/process/traversal/step/by.rs +++ b/gremlin-client/src/process/traversal/step/by.rs @@ -1,5 +1,5 @@ use crate::process::traversal::{Order, TraversalBuilder}; -use crate::structure::{GValue, T}; +use crate::structure::{Column, GValue, T}; pub struct ByStep { params: Vec, @@ -35,6 +35,12 @@ impl From for ByStep { } } +impl From for ByStep { + fn from(value: Column) -> Self { + ByStep::new(vec![value.into()]) + } +} + impl From for ByStep { fn from(param: T) -> Self { ByStep::new(vec![param.into()]) diff --git a/gremlin-client/src/process/traversal/step/merge_edge.rs b/gremlin-client/src/process/traversal/step/merge_edge.rs new file mode 100644 index 00000000..7cc73c09 --- /dev/null +++ b/gremlin-client/src/process/traversal/step/merge_edge.rs @@ -0,0 +1,33 @@ +use std::collections::HashMap; + +use crate::process::traversal::TraversalBuilder; +use crate::structure::GValue; +use crate::GKey; + +pub struct MergeEdgeStep { + params: Vec, +} + +impl MergeEdgeStep { + fn new(params: Vec) -> Self { + MergeEdgeStep { params } + } +} + +impl From for Vec { + fn from(step: MergeEdgeStep) -> Self { + step.params + } +} + +impl From for MergeEdgeStep { + fn from(param: TraversalBuilder) -> Self { + MergeEdgeStep::new(vec![param.bytecode.into()]) + } +} + +impl From> for MergeEdgeStep { + fn from(value: HashMap) -> Self { + MergeEdgeStep::new(vec![value.into()]) + } +} diff --git a/gremlin-client/src/process/traversal/step/merge_vertex.rs b/gremlin-client/src/process/traversal/step/merge_vertex.rs new file mode 100644 index 00000000..6458a41b --- /dev/null +++ b/gremlin-client/src/process/traversal/step/merge_vertex.rs @@ -0,0 +1,33 @@ +use std::collections::HashMap; + +use crate::process::traversal::TraversalBuilder; +use crate::structure::GValue; +use crate::GKey; + +pub struct MergeVertexStep { + params: Vec, +} + +impl MergeVertexStep { + fn new(params: Vec) -> Self { + MergeVertexStep { params } + } +} + +impl From for Vec { + fn from(step: MergeVertexStep) -> Self { + step.params + } +} + +impl From for MergeVertexStep { + fn from(param: TraversalBuilder) -> Self { + MergeVertexStep::new(vec![param.bytecode.into()]) + } +} + +impl From> for MergeVertexStep { + fn from(value: HashMap) -> Self { + MergeVertexStep::new(vec![value.into()]) + } +} diff --git a/gremlin-client/src/process/traversal/step/mod.rs b/gremlin-client/src/process/traversal/step/mod.rs index fb2f91c5..eecf9934 100644 --- a/gremlin-client/src/process/traversal/step/mod.rs +++ b/gremlin-client/src/process/traversal/step/mod.rs @@ -8,7 +8,10 @@ pub mod limit; pub mod local; pub mod loops; pub mod match_step; +pub mod merge_edge; +pub mod merge_vertex; pub mod not; +pub mod option; pub mod or; pub mod repeat; pub mod select; diff --git a/gremlin-client/src/process/traversal/step/option.rs b/gremlin-client/src/process/traversal/step/option.rs new file mode 100644 index 00000000..f50c413a --- /dev/null +++ b/gremlin-client/src/process/traversal/step/option.rs @@ -0,0 +1,39 @@ +use std::collections::HashMap; + +use crate::process::traversal::TraversalBuilder; +use crate::structure::{GValue, Merge}; +use crate::GKey; + +pub struct OptionStep { + params: Vec, +} + +impl OptionStep { + fn new(params: Vec) -> Self { + OptionStep { params } + } +} + +impl From for Vec { + fn from(step: OptionStep) -> Self { + step.params + } +} + +impl From<(GValue, TraversalBuilder)> for OptionStep { + fn from(value: (GValue, TraversalBuilder)) -> Self { + OptionStep::new(vec![value.0.into(), value.1.into()]) + } +} + +impl From<(Merge, TraversalBuilder)> for OptionStep { + fn from(value: (Merge, TraversalBuilder)) -> Self { + OptionStep::new(vec![value.0.into(), value.1.into()]) + } +} + +impl From<(Merge, HashMap)> for OptionStep { + fn from(value: (Merge, HashMap)) -> Self { + OptionStep::new(vec![value.0.into(), value.1.into()]) + } +} diff --git a/gremlin-client/src/process/traversal/step/side_effect.rs b/gremlin-client/src/process/traversal/step/side_effect.rs index 8fb00636..6fa6b487 100644 --- a/gremlin-client/src/process/traversal/step/side_effect.rs +++ b/gremlin-client/src/process/traversal/step/side_effect.rs @@ -1,11 +1,23 @@ use crate::{process::traversal::TraversalBuilder, GValue}; -pub trait IntoSideEffectStep { - fn into_step(self) -> Vec; +pub struct SideEffectStep { + params: Vec, } -impl IntoSideEffectStep for TraversalBuilder { - fn into_step(self) -> Vec { - vec![self.bytecode.into()] +impl SideEffectStep { + fn new(params: Vec) -> Self { + SideEffectStep { params } + } +} + +impl From for Vec { + fn from(step: SideEffectStep) -> Self { + step.params + } +} + +impl From for SideEffectStep { + fn from(param: TraversalBuilder) -> Self { + SideEffectStep::new(vec![param.bytecode.into()]) } } diff --git a/gremlin-client/src/structure/column.rs b/gremlin-client/src/structure/column.rs new file mode 100644 index 00000000..99ba0755 --- /dev/null +++ b/gremlin-client/src/structure/column.rs @@ -0,0 +1,5 @@ +#[derive(Debug, PartialEq, Clone, Eq, Hash)] +pub enum Column { + Keys, + Values, +} diff --git a/gremlin-client/src/structure/direction.rs b/gremlin-client/src/structure/direction.rs new file mode 100644 index 00000000..bce2beef --- /dev/null +++ b/gremlin-client/src/structure/direction.rs @@ -0,0 +1,7 @@ +#[derive(Debug, PartialEq, Clone, Eq, Hash)] +pub enum Direction { + Out, + In, + From, + To, +} diff --git a/gremlin-client/src/structure/map.rs b/gremlin-client/src/structure/map.rs index 3a1c8a08..10e491e0 100644 --- a/gremlin-client/src/structure/map.rs +++ b/gremlin-client/src/structure/map.rs @@ -6,6 +6,8 @@ use std::collections::hash_map::IntoIter; use std::collections::{BTreeMap, HashMap}; use std::convert::{TryFrom, TryInto}; +use super::{Direction, T}; + /// Represent a Map<[GKey](struct.GKey),[GValue](struct.GValue)> which has ability to allow for non-String keys. /// TinkerPop type [here](http://tinkerpop.apache.org/docs/current/dev/io/#_map) #[derive(Debug, PartialEq, Clone)] @@ -137,10 +139,24 @@ impl std::iter::FromIterator<(String, GValue)> for Map { #[allow(clippy::large_enum_variant)] #[derive(Debug, PartialEq, Clone, Eq, Hash)] pub enum GKey { + T(T), String(String), Token(Token), Vertex(Vertex), Edge(Edge), + Direction(Direction), +} + +impl From for GKey { + fn from(val: T) -> Self { + GKey::T(val) + } +} + +impl From for GKey { + fn from(value: Direction) -> Self { + GKey::Direction(value) + } } impl From<&str> for GKey { diff --git a/gremlin-client/src/structure/merge.rs b/gremlin-client/src/structure/merge.rs new file mode 100644 index 00000000..df406e91 --- /dev/null +++ b/gremlin-client/src/structure/merge.rs @@ -0,0 +1,7 @@ +#[derive(Debug, PartialEq, Clone)] +pub enum Merge { + OnCreate, + OnMatch, + OutV, + InV, +} diff --git a/gremlin-client/src/structure/mod.rs b/gremlin-client/src/structure/mod.rs index 1e6ab0ec..523adbc1 100644 --- a/gremlin-client/src/structure/mod.rs +++ b/gremlin-client/src/structure/mod.rs @@ -1,4 +1,6 @@ mod cardinality; +mod column; +mod direction; mod edge; mod either; mod gid; @@ -6,7 +8,9 @@ mod label; mod list; mod macros; mod map; +mod merge; mod metrics; +mod null; mod p; mod path; mod pop; @@ -25,6 +29,7 @@ pub use self::edge::Edge; pub use self::gid::{GIDs, GID}; pub use self::list::List; pub use self::metrics::{IntermediateRepr, Metric, TraversalExplanation, TraversalMetrics}; +pub use self::null::Null; pub use self::path::Path; pub use self::property::Property; pub use self::result::GResultSet; @@ -34,9 +39,12 @@ pub use self::value::GValue; pub use self::vertex::Vertex; pub use self::vertex_property::{GProperty, VertexProperty}; pub use cardinality::Cardinality; +pub use column::Column; +pub use direction::Direction; pub use either::*; pub use label::Labels; pub use map::{GKey, Map}; +pub use merge::Merge; pub use p::{IntoPredicate, P}; pub use pop::Pop; pub use t::T; diff --git a/gremlin-client/src/structure/null.rs b/gremlin-client/src/structure/null.rs new file mode 100644 index 00000000..76dbac88 --- /dev/null +++ b/gremlin-client/src/structure/null.rs @@ -0,0 +1,2 @@ +#[derive(Debug, Clone)] +pub struct Null {} diff --git a/gremlin-client/src/structure/t.rs b/gremlin-client/src/structure/t.rs index db79a6ce..2a62492c 100644 --- a/gremlin-client/src/structure/t.rs +++ b/gremlin-client/src/structure/t.rs @@ -1,4 +1,4 @@ -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Clone, Eq, Hash)] pub enum T { Id, Key, diff --git a/gremlin-client/src/structure/value.rs b/gremlin-client/src/structure/value.rs index c4d955f9..5ad9b4d5 100644 --- a/gremlin-client/src/structure/value.rs +++ b/gremlin-client/src/structure/value.rs @@ -1,16 +1,18 @@ use crate::conversion::{BorrowFromGValue, FromGValue}; -use crate::process::traversal::{Bytecode, Order, Scope}; +use crate::process::traversal::{Bytecode, Order, Scope, TraversalBuilder}; use crate::structure::traverser::Traverser; use crate::structure::{ label::LabelType, Cardinality, Edge, GKey, IntermediateRepr, List, Map, Metric, Path, Property, Set, Token, TraversalExplanation, TraversalMetrics, Vertex, VertexProperty, }; use crate::structure::{Pop, TextP, P, T}; -use crate::{GremlinError, GremlinResult}; +use crate::{GremlinError, GremlinResult, ToGValue, GID}; use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; pub type Date = chrono::DateTime; use std::convert::TryInto; use std::hash::Hash; + +use super::{Column, Direction, Merge}; /// Represent possible values coming from the [Gremlin Server](http://tinkerpop.apache.org/docs/3.4.0/dev/io/) #[allow(clippy::large_enum_variant)] #[derive(Debug, PartialEq, Clone)] @@ -46,6 +48,9 @@ pub enum GValue { TextP(TextP), Pop(Pop), Cardinality(Cardinality), + Merge(Merge), + Direction(Direction), + Column(Column), } impl GValue { @@ -99,6 +104,13 @@ impl From for GValue { GValue::Float(val) } } + +impl From<&GID> for GValue { + fn from(value: &GID) -> Self { + value.to_gvalue() + } +} + impl From for GValue { fn from(val: f64) -> Self { GValue::Double(val) @@ -180,6 +192,25 @@ impl From for GValue { GValue::Order(val) } } + +impl From for GValue { + fn from(value: Merge) -> Self { + GValue::Merge(value) + } +} + +impl From for GValue { + fn from(value: Direction) -> Self { + GValue::Direction(value) + } +} + +impl From for GValue { + fn from(value: Column) -> Self { + GValue::Column(value) + } +} + impl From for GValue { fn from(val: Token) -> Self { GValue::Token(val) @@ -229,6 +260,8 @@ impl From for VecDeque { impl From for GValue { fn from(val: GKey) -> Self { match val { + GKey::Direction(d) => GValue::Direction(d), + GKey::T(t) => GValue::T(t), GKey::String(s) => GValue::String(s), GKey::Token(s) => GValue::String(s.value().clone()), GKey::Vertex(v) => GValue::Vertex(v), @@ -289,6 +322,12 @@ impl From for GValue { } } +impl From for GValue { + fn from(value: TraversalBuilder) -> Self { + value.bytecode.into() + } +} + impl std::convert::TryFrom for String { type Error = crate::GremlinError; diff --git a/gremlin-client/tests/common.rs b/gremlin-client/tests/common.rs index 6fec8241..af3a5b44 100644 --- a/gremlin-client/tests/common.rs +++ b/gremlin-client/tests/common.rs @@ -1,3 +1,14 @@ +use gremlin_client::Map; + +pub fn assert_map_property(element_map: &Map, expected_key: &str, expected_value: &str) { + let actual_prop_value: &String = element_map + .get(expected_key) + .unwrap_or_else(|| panic!("Didn't have expected key {}", expected_key)) + .get() + .expect("Should be String"); + assert_eq!(expected_value, actual_prop_value); +} + #[allow(dead_code)] pub mod io { use gremlin_client::{ConnectionOptions, Edge, GraphSON, GremlinClient, GremlinResult, Vertex}; @@ -6,6 +17,10 @@ pub mod io { GremlinClient::connect(("localhost", 8182)) } + fn connect_janusgraph_client() -> GremlinResult { + GremlinClient::connect(("localhost", 8184)) + } + pub fn connect_serializer(serializer: GraphSON) -> GremlinResult { let port = match serializer { GraphSON::V2 => 8182, @@ -25,6 +40,10 @@ pub mod io { connect().expect("It should connect") } + pub fn expect_janusgraph_client() -> GremlinClient { + connect_janusgraph_client().expect("It should connect") + } + pub fn expect_client_serializer(serializer: GraphSON) -> GremlinClient { connect_serializer(serializer).expect("It should connect") } diff --git a/gremlin-client/tests/custom_vertex_ids.rs b/gremlin-client/tests/custom_vertex_ids.rs new file mode 100644 index 00000000..97b74f96 --- /dev/null +++ b/gremlin-client/tests/custom_vertex_ids.rs @@ -0,0 +1,95 @@ +use std::collections::HashMap; + +use common::io::{drop_vertices, expect_janusgraph_client}; +use gremlin_client::{ + process::traversal::{traversal, __}, + structure::T, + GKey, GValue, +}; + +mod common; + +//Custom vertex ids are a feature offered by JanusGraph +//https://docs.janusgraph.org/advanced-topics/custom-vertex-id/ + +#[test] +fn test_merge_v_custom_id() { + let client = expect_janusgraph_client(); + let expected_label = "test_merge_v_custom_id"; + drop_vertices(&client, expected_label).expect("Failed to drop vertices"); + let g = traversal().with_remote(client); + let expected_id = "test_merge_v_custom_id"; + let mut start_step_map: HashMap = HashMap::new(); + start_step_map.insert(T::Id.into(), expected_id.into()); + start_step_map.insert(T::Label.into(), expected_label.into()); + let actual_vertex = g + .merge_v(start_step_map) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + match actual_vertex.id() { + gremlin_client::GID::String(actual) => assert_eq!(expected_id, actual), + other => panic!("Didn't get expected id type {:?}", other), + } + + assert_eq!(expected_label, actual_vertex.label()); + + //Now try it as a mid-traversal step (inject is the start step) + let expected_id = "foo"; + let expected_property = "propValue"; + + let mut map_to_inject: HashMap = HashMap::new(); + let mut lookup_map: HashMap = HashMap::new(); + lookup_map.insert(T::Id.into(), expected_id.into()); + lookup_map.insert(T::Label.into(), "myvertexlabel".into()); + let mut property_map: HashMap = HashMap::new(); + property_map.insert("propertyKey".into(), expected_property.into()); + map_to_inject.insert("lookup".into(), lookup_map.into()); + map_to_inject.insert("properties".into(), property_map.into()); + + let actual_vertex = g + .inject(vec![map_to_inject.into()]) + .unfold() + .as_("payload") + .merge_v(__.select("lookup")) + .property( + "propertyKey", + __.select("payload") + .select("properties") + .select("propertyKey"), + ) + .next() + .expect("Should get response") + .expect("Should have returned a vertex"); + + match actual_vertex.id() { + gremlin_client::GID::String(actual) => assert_eq!(expected_id, actual), + other => panic!("Didn't get expected id type {:?}", other), + } + + let actual_property: &String = actual_vertex + .property("propertyKey") + .expect("Should have property") + .get() + .unwrap(); + assert_eq!(expected_property, actual_property); +} + +#[test] +fn test_add_v_custom_id() { + let client = expect_janusgraph_client(); + let expected_id = "test_add_v_custom_id"; + let test_vertex_label = "test_add_v_custom_id"; + drop_vertices(&client, test_vertex_label).expect("Failed to drop vertices"); + let g = traversal().with_remote(client); + let actual_vertex = g + .add_v(test_vertex_label) + .property(T::Id, expected_id) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + match actual_vertex.id() { + gremlin_client::GID::String(actual) => assert_eq!(expected_id, actual), + other => panic!("Didn't get expected id type {:?}", other), + } +} diff --git a/gremlin-client/tests/integration_traversal.rs b/gremlin-client/tests/integration_traversal.rs index fb3faf7b..78c4725a 100644 --- a/gremlin-client/tests/integration_traversal.rs +++ b/gremlin-client/tests/integration_traversal.rs @@ -1,6 +1,13 @@ +use std::collections::HashMap; +use std::convert::TryInto; + +use common::assert_map_property; use gremlin_client::process::traversal::{traversal, Order, __}; -use gremlin_client::structure::{Cardinality, List, Map, Pop, TextP, Vertex, VertexProperty, P, T}; -use gremlin_client::utils; +use gremlin_client::structure::{ + Cardinality, Column, List, Map, Pop, TextP, Vertex, VertexProperty, P, T, +}; + +use gremlin_client::{utils, GKey, GValue}; mod common; @@ -8,6 +15,463 @@ use common::io::{ create_edge, create_vertex, create_vertex_with_label, drop_edges, drop_vertices, graph, }; +#[cfg(feature = "merge_tests")] +mod merge_tests { + use super::*; + use gremlin_client::{ + process::traversal::{GraphTraversalSource, SyncTerminator}, + structure::{Direction, Merge}, + Edge, GValue, ToGValue, + }; + use std::collections::HashMap; + + #[test] + fn test_merge_v_no_options() { + let client = graph(); + let test_vertex_label = "test_merge_v_no_options"; + drop_vertices(&client, test_vertex_label) + .expect("Failed to drop vertices in case of rerun"); + let g = traversal().with_remote(client); + + let mut injection_map: HashMap = HashMap::new(); + let mut lookup_map: HashMap = HashMap::new(); + lookup_map.insert(T::Label.into(), test_vertex_label.into()); + let mut property_map: HashMap = HashMap::new(); + property_map.insert("propertyKey".into(), "propertyValue".into()); + injection_map.insert("lookup".into(), lookup_map.into()); + injection_map.insert("properties".into(), property_map.into()); + + let vertex_properties = g + .inject(vec![injection_map.into()]) + .unfold() + .as_("payload") + .merge_v(__.select("lookup")) + .property( + "propertyKey", + __.select("payload") + .select("properties") + .select("propertyKey"), + ) + .element_map(()) + .next() + .expect("Should get response") + .expect("Should have returned a vertex"); + + assert_map_property(&vertex_properties, "propertyKey", "propertyValue"); + } + + #[test] + fn test_merge_v_options() { + let client = graph(); + let expected_label = "test_merge_v_options"; + drop_vertices(&client, expected_label).expect("Failed to drop vertices"); + let g = traversal().with_remote(client); + let mut start_step_map: HashMap = HashMap::new(); + start_step_map.insert(T::Label.into(), expected_label.into()); + start_step_map.insert("identifing_prop".into(), "some_Value".into()); + + let prop_key = "some_prop"; + let mut on_create_map: HashMap = HashMap::new(); + let expected_on_create_prop_value = "on_create_value"; + on_create_map.insert(prop_key.into(), expected_on_create_prop_value.into()); + + let mut on_match_map: HashMap = HashMap::new(); + let expected_on_match_prop_value = "on_match_value"; + on_match_map.insert(prop_key.into(), expected_on_match_prop_value.into()); + + let on_create_vertex_map = g + .merge_v(start_step_map.clone()) + .option((Merge::OnCreate, on_create_map.clone())) + .option((Merge::OnMatch, on_match_map.clone())) + .element_map(()) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + + assert_map_property(&on_create_vertex_map, "label", expected_label); + + assert_map_property( + &on_create_vertex_map, + prop_key, + expected_on_create_prop_value, + ); + + //Now run the traversal again, and confirm the OnMatch applied this time + let on_match_vertex_map = g + .merge_v(start_step_map) + .option((Merge::OnCreate, on_create_map.clone())) + .option((Merge::OnMatch, on_match_map.clone())) + .element_map(()) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + + assert_map_property(&on_match_vertex_map, "label", expected_label); + + assert_map_property(&on_match_vertex_map, prop_key, expected_on_match_prop_value); + } + + #[test] + fn test_merge_v_start_step() { + let client = graph(); + let expected_label = "test_merge_v_start_step"; + drop_vertices(&client, &expected_label).expect("Failed to drop vertiecs"); + let g = traversal().with_remote(client); + let mut start_step_map: HashMap = HashMap::new(); + start_step_map.insert(T::Label.into(), expected_label.into()); + let actual_vertex = g + .merge_v(start_step_map) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + + assert_eq!(expected_label, actual_vertex.label()) + } + + #[test] + fn test_merge_v_anonymous_traversal() { + let client = graph(); + let expected_label = "test_merge_v_anonymous_traversal"; + drop_vertices(&client, &expected_label).expect("Failed to drop vertiecs"); + let g = traversal().with_remote(client); + let mut start_step_map: HashMap = HashMap::new(); + start_step_map.insert(T::Label.into(), expected_label.into()); + let actual_vertex = g + .inject(1) + .unfold() + .coalesce::([__.merge_v(start_step_map)]) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + assert_eq!(expected_label, actual_vertex.label()) + } + + #[test] + fn test_merge_e_start_step() { + let client = graph(); + let expected_vertex_label = "test_merge_e_start_step_vertex"; + let expected_edge_label = "test_merge_e_start_step_edge"; + let expected_edge_property_key = "test_merge_e_start_step_edge_prop"; + let expected_edge_property_value = "test_merge_e_start_step_edge_value"; + drop_vertices(&client, &expected_vertex_label).expect("Failed to drop vertiecs"); + let g = traversal().with_remote(client); + + let vertex_a = g + .add_v(expected_vertex_label) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + + let vertex_b = g + .add_v(expected_vertex_label) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + + let mut start_step_map: HashMap = HashMap::new(); + start_step_map.insert(Direction::In.into(), vertex_a.id().into()); + start_step_map.insert(Direction::Out.into(), vertex_b.id().into()); + start_step_map.insert(T::Label.into(), expected_edge_label.into()); + start_step_map.insert( + expected_edge_property_key.into(), + expected_edge_property_value.into(), + ); + let merged_edge_properties = g + .merge_e(start_step_map) + .element_map(()) + .next() + .expect("Should get a response") + .expect("Should return a edge properties"); + + assert_map_property(&merged_edge_properties, "label", expected_edge_label); + + assert_map_property( + &merged_edge_properties, + expected_edge_property_key, + expected_edge_property_value, + ); + + let incoming_vertex: &Map = merged_edge_properties + .get(Direction::In) + .expect("Should have returned incoming vertex info") + .get() + .unwrap(); + + let incoming_vertex_id = incoming_vertex + .get("id") + .expect("Should have returned vertex id"); + assert_eq!(incoming_vertex_id, &vertex_a.id().to_gvalue()); + + let outgoing_vertex: &Map = merged_edge_properties + .get(Direction::Out) + .expect("Should have returned outgoing vertex info") + .get() + .unwrap(); + let outgoing_vertex_id = outgoing_vertex + .get("id") + .expect("Should have returned vertex id"); + assert_eq!(outgoing_vertex_id, &vertex_b.id().to_gvalue()); + } + + #[test] + fn test_merge_e_no_options() { + let client = graph(); + let expected_vertex_label = "test_merge_e_no_options_vertex"; + let expected_edge_label = "test_merge_e_no_options_edge"; + let expected_edge_property_key = "test_merge_e_no_options_edge_prop"; + let expected_edge_property_value = "test_merge_e_no_options_edge_value"; + drop_vertices(&client, &expected_vertex_label).expect("Failed to drop vertiecs"); + let g = traversal().with_remote(client); + + let vertex_a = g + .add_v(expected_vertex_label) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + + let vertex_b = g + .add_v(expected_vertex_label) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + + let mut assignment_map: HashMap = HashMap::new(); + assignment_map.insert(Direction::In.into(), vertex_a.id().into()); + assignment_map.insert(Direction::Out.into(), vertex_b.id().into()); + assignment_map.insert(T::Label.into(), expected_edge_label.into()); + assignment_map.insert( + expected_edge_property_key.into(), + expected_edge_property_value.into(), + ); + + let merged_edge_properties = g + .inject(vec![assignment_map.into()]) + .unfold() + .as_("payload") + .merge_e(__.select("payload")) + .element_map(()) + .next() + .expect("Should get a response") + .expect("Should return edge properties"); + + assert_map_property(&merged_edge_properties, "label", expected_edge_label); + assert_map_property( + &merged_edge_properties, + expected_edge_property_key, + expected_edge_property_value, + ); + + let incoming_vertex: &Map = merged_edge_properties + .get(Direction::In) + .expect("Should have returned incoming vertex info") + .get() + .unwrap(); + let incoming_vertex_id = incoming_vertex + .get("id") + .expect("Should have returned vertex id"); + assert_eq!(incoming_vertex_id, &vertex_a.id().to_gvalue()); + + let outgoing_vertex: &Map = merged_edge_properties + .get(Direction::Out) + .expect("Should have returned outgoing vertex info") + .get() + .unwrap(); + let outgoing_vertex_id = outgoing_vertex + .get("id") + .expect("Should have returned vertex id"); + assert_eq!(outgoing_vertex_id, &vertex_b.id().to_gvalue()); + } + + #[test] + fn test_merge_e_options() { + let client = graph(); + let expected_vertex_label = "test_merge_e_options_vertex"; + let expected_edge_label = "test_merge_e_options_edge"; + let expected_edge_property_key = "test_merge_e_options_edge_prop"; + drop_vertices(&client, &expected_vertex_label).expect("Failed to drop vertiecs"); + let g = traversal().with_remote(client); + + let vertex_a = g + .add_v(expected_vertex_label) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + + let vertex_b = g + .add_v(expected_vertex_label) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + + let mut assignment_map: HashMap = HashMap::new(); + assignment_map.insert(Direction::In.into(), vertex_a.id().into()); + assignment_map.insert(Direction::Out.into(), vertex_b.id().into()); + assignment_map.insert(T::Label.into(), expected_edge_label.into()); + + let mut on_create_map: HashMap = HashMap::new(); + on_create_map.insert(expected_edge_property_key.into(), "on_create".into()); + + let mut on_match_map: HashMap = HashMap::new(); + on_match_map.insert(expected_edge_property_key.into(), "on_match".into()); + + let mut injection_map: HashMap = HashMap::new(); + injection_map.insert("merge_params".into(), assignment_map.into()); + injection_map.insert("create_params".into(), on_create_map.into()); + injection_map.insert("match_params".into(), on_match_map.into()); + + let do_merge_edge = |g: GraphTraversalSource| -> Map { + g.inject(vec![injection_map.clone().into()]) + .unfold() + .as_("payload") + .merge_e(__.select("payload").select("merge_params")) + .option(( + Merge::OnCreate, + __.select("payload").select("create_params"), + )) + .option((Merge::OnMatch, __.select("payload").select("match_params"))) + .element_map(()) + .next() + .expect("Should get a response") + .expect("Should return a edge properties") + }; + + let on_create_edge_properties = do_merge_edge(g.clone()); + + //Initially the edge should be the on create value + assert_map_property( + &on_create_edge_properties, + expected_edge_property_key, + "on_create", + ); + + let on_match_edge_properties = do_merge_edge(g); + assert_map_property( + &on_match_edge_properties, + expected_edge_property_key, + "on_match", + ); + } + + #[test] + fn test_merge_e_anonymous_traversal() { + let client = graph(); + let expected_vertex_label = "test_merge_e_options_vertex"; + let expected_edge_label = "test_merge_e_options_edge"; + drop_vertices(&client, &expected_vertex_label).expect("Failed to drop vertiecs"); + let g = traversal().with_remote(client); + + let vertex_a = g + .add_v(expected_vertex_label) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + + let vertex_b = g + .add_v(expected_vertex_label) + .next() + .expect("Should get a response") + .expect("Should return a vertex"); + + let mut assignment_map: HashMap = HashMap::new(); + assignment_map.insert(Direction::In.into(), vertex_a.id().into()); + assignment_map.insert(Direction::Out.into(), vertex_b.id().into()); + assignment_map.insert(T::Label.into(), expected_edge_label.into()); + + let anonymous_merge_e_properties = g + .inject(1) + .unfold() + .coalesce::([__.merge_e(assignment_map)]) + .element_map(()) + .next() + .expect("Should get a response") + .expect("Should return a edge properties"); + + let incoming_vertex: &Map = anonymous_merge_e_properties + .get(Direction::In) + .expect("Should have returned incoming vertex info") + .get() + .unwrap(); + let incoming_vertex_id = incoming_vertex + .get("id") + .expect("Should have returned vertex id"); + assert_eq!(incoming_vertex_id, &vertex_a.id().to_gvalue()); + + let outgoing_vertex: &Map = anonymous_merge_e_properties + .get(Direction::Out) + .expect("Should have returned outgoing vertex info") + .get() + .unwrap(); + let outgoing_vertex_id = outgoing_vertex + .get("id") + .expect("Should have returned vertex id"); + assert_eq!(outgoing_vertex_id, &vertex_b.id().to_gvalue()); + } + + #[test] + fn test_merge_v_into_merge_e() { + //Based on the reference doc's combo example + let client = graph(); + let expected_vertex_label = "test_merge_v_into_merge_e_vertex"; + let expected_edge_label = "test_merge_v_into_merge_e_edge"; + drop_vertices(&client, &expected_vertex_label).expect("Failed to drop vertiecs"); + let g = traversal().with_remote(client); + + let expected_toby_id = 100_001i64; + let expected_brandy_id = 200_001i64; + + let mut vertex_a_map: HashMap = HashMap::new(); + vertex_a_map.insert(T::Label.into(), expected_vertex_label.into()); + vertex_a_map.insert(T::Id.into(), expected_toby_id.into()); + vertex_a_map.insert("name".into(), "Toby".into()); + + let mut vertex_b_map: HashMap = HashMap::new(); + vertex_b_map.insert(T::Label.into(), expected_vertex_label.into()); + vertex_b_map.insert(T::Id.into(), expected_brandy_id.into()); + vertex_b_map.insert("name".into(), "Brandy".into()); + + let mut edge_map: HashMap = HashMap::new(); + edge_map.insert(T::Label.into(), expected_edge_label.into()); + edge_map.insert("some_key".into(), "some_value".into()); + edge_map.insert(Direction::From.into(), Merge::OutV.into()); + edge_map.insert(Direction::To.into(), Merge::InV.into()); + + let combo_merge_edge_properties = g + .merge_v(vertex_a_map) + .as_("Toby") + .merge_v(vertex_b_map) + .as_("Brandy") + .merge_e(edge_map) + .option((Merge::OutV, __.select("Toby"))) + .option((Merge::InV, __.select("Brandy"))) + .element_map(()) + .next() + .expect("Should get a response") + .expect("Should return a edge properties"); + + let brandy_vertex: &Map = combo_merge_edge_properties + .get(Direction::In) + .expect("Should have returned incoming vertex info") + .get() + .unwrap(); + let brandy_vertex_id = brandy_vertex + .get("id") + .expect("Should have returned vertex id"); + assert_eq!(*brandy_vertex_id, GValue::Int64(expected_brandy_id)); + + let toby_vertex: &Map = combo_merge_edge_properties + .get(Direction::Out) + .expect("Should have returned outgoing vertex info") + .get() + .unwrap(); + let toby_vertex_id = toby_vertex + .get("id") + .expect("Should have returned vertex id"); + assert_eq!(*toby_vertex_id, GValue::Int64(expected_toby_id)); + + assert_map_property(&combo_merge_edge_properties, "label", expected_edge_label); + } +} + #[test] fn test_simple_vertex_traversal() { let g = traversal().with_remote(graph()); @@ -17,6 +481,20 @@ fn test_simple_vertex_traversal() { assert!(results.len() > 0); } +#[test] +fn test_inject() { + let g = traversal().with_remote(graph()); + let expected_value = "foo"; + let response: String = g + .inject(vec![expected_value.into()]) + .next() + .expect("Should get response") + .expect("Should have gotten a Some") + .try_into() + .expect("Should be parsable into a String"); + assert_eq!(expected_value, response); +} + #[test] fn test_simple_vertex_traversal_with_id() { let client = graph(); @@ -1791,6 +2269,146 @@ fn test_local() { assert_eq!(results.len(), 2); } +#[test] +fn test_side_effect() { + let client = graph(); + let test_vertex_label = "test_side_effect"; + let expected_side_effect_key = "prop_key"; + let expected_side_effect_value = "prop_val"; + + drop_vertices(&client, &test_vertex_label).unwrap(); + + let g = traversal().with_remote(client); + + let element_map = g + .add_v(test_vertex_label) + .side_effect(__.property( + gremlin_client::structure::Either2::A(expected_side_effect_key), + expected_side_effect_value, + )) + .element_map(()) + .next() + .expect("Should get response") + .expect("Should have returned an element map"); + + assert_map_property( + &element_map, + expected_side_effect_key, + expected_side_effect_value, + ); +} + +#[test] +fn test_anonymous_traversal_properties_drop() { + let client = graph(); + let test_vertex_label = "test_anonymous_traversal_properties_drop"; + let pre_drop_prop_key = "pre_drop_prop_key"; + let expected_prop_value = "prop_val"; + + drop_vertices(&client, &test_vertex_label).unwrap(); + + let g = traversal().with_remote(client); + + let element_map = g + .add_v(test_vertex_label) + .side_effect(__.property( + gremlin_client::structure::Either2::A(pre_drop_prop_key), + expected_prop_value, + )) + .element_map(()) + .next() + .expect("Should get response") + .expect("Should have returned an element map"); + + //Make sure the property was assigned + assert_map_property(&element_map, pre_drop_prop_key, expected_prop_value); + + let created_vertex_id = element_map.get("id").expect("Should have id property"); + let GValue::Int64(id) = created_vertex_id else { + panic!("Not expected id type"); + }; + + let post_drop_prop_key = "post_drop_prop_key"; + //Operate on the same vertex via id + let post_drop_map = g + .v(*id) + //Drop all properties first + .side_effect(__.properties(()).drop()) + //Then add a different property + .side_effect(__.property( + gremlin_client::structure::Either2::A(pre_drop_prop_key), + expected_prop_value, + )) + .element_map(()) + .next() + .expect("Should get response") + .expect("Should have returned an element map"); + + assert_map_property(&post_drop_map, pre_drop_prop_key, expected_prop_value); + + //Now make sure the pre drop property key is no longer present + assert!( + post_drop_map.get(post_drop_prop_key).is_none(), + "Pre drop key should have been dropped" + ); +} + +#[test] +fn test_by_columns() { + let client = graph(); + let test_vertex_label = "test_by_columns"; + let expected_side_effect_key_a = "prop_key_a"; + let expected_side_effect_value_a = "prop_val_a"; + let expected_side_effect_key_b = "prop_key_b"; + let expected_side_effect_value_b = "prop_val_b"; + + drop_vertices(&client, &test_vertex_label).unwrap(); + + let g = traversal().with_remote(client); + let mut property_map: HashMap = HashMap::new(); + property_map.insert( + expected_side_effect_key_a.into(), + expected_side_effect_value_a.into(), + ); + property_map.insert( + expected_side_effect_key_b.into(), + expected_side_effect_value_b.into(), + ); + + let element_map = g + .inject(vec![property_map.into()]) + .unfold() + .as_("properties") + .add_v(test_vertex_label) + .as_("v") + .side_effect( + __.select("properties") + .unfold() + .as_("kv_pair") + .select("v") + .property( + __.select("kv_pair").by(Column::Keys), + __.select("kv_pair").by(Column::Values), + ), + ) + .element_map(()) + .next() + .expect("Should get response") + .expect("Should have returned an element map"); + + assert_map_property( + &element_map, + expected_side_effect_key_a, + expected_side_effect_value_a, + ); + + assert_map_property( + &element_map, + expected_side_effect_key_b, + expected_side_effect_value_b, + ); +} + #[test] fn test_property_cardinality() { let client = graph(); @@ -1864,6 +2482,34 @@ fn test_choose() { assert_eq!(success_vertices.is_some(), true); } +#[test] +fn test_choose_by_literal_options() { + let client = graph(); + let g = traversal().with_remote(client); + + let choosen_literal_a = g + .inject(1) + .unfold() + .choose(__.identity()) + .option((GValue::Int64(1), __.constant("option-a"))) + .option((GValue::Int64(2), __.constant("option-b"))) + .next() + .unwrap(); + + assert_eq!(choosen_literal_a, Some("option-a".into())); + + let choosen_literal_b = g + .inject(2) + .unfold() + .choose(__.identity()) + .option((GValue::Int64(1), __.constant("option-a"))) + .option((GValue::Int64(2), __.constant("option-b"))) + .next() + .unwrap(); + + assert_eq!(choosen_literal_b, Some("option-b".into())); +} + #[test] fn test_coalesce() { let client = graph(); @@ -1957,6 +2603,34 @@ fn test_coalesce_unfold() { ); } +#[test] +fn test_none_step() { + let client = graph(); + + drop_vertices(&client, "test_none_step").unwrap(); + + let g = traversal().with_remote(client); + + //The addition of a None step however should not IO a vertex back + g.add_v("test_none_step") + .none() + .iter() + .expect("Should get a iter back") + .iterate() + .expect("Shouldn't error consuming iterator"); + + //Make sure the vertex is present in the graph + let vertex_count = g + .v(()) + .has_label("test_none_step") + .count() + .next() + .ok() + .flatten() + .expect("Should have gotten a response"); + assert_eq!(1, vertex_count); +} + #[test] #[cfg(feature = "derive")] fn test_traversal_vertex_mapping() {