Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MergeV, MergeE, & Option Steps #214

Open
wants to merge 52 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
e2c85cf
First pass
criminosis Jul 10, 2024
8486503
Added JanusGraph in memory mode to docker compose enviornment
criminosis Jul 11, 2024
e06fe55
Added mergeV as a start step
criminosis Jul 11, 2024
b569530
Relaxed property step keys to Into<GValue> from &str
criminosis Jul 11, 2024
5ec7c71
Added JanusGraph custom vertex id tests
criminosis Jul 11, 2024
7cf15cd
Implemented From<HashMap<GKey, GValue>> for MergeVertexStep to suppor…
criminosis Jul 11, 2024
6a0f4c5
Implemented Option step for mergeV
criminosis Jul 13, 2024
0f4aee6
Added healthcheck for JG & wait with timeout for docker compose up in…
criminosis Jul 13, 2024
dddf8c0
Use Docker Compose v2 via "docker compose" vs v1's "docker-compose" i…
criminosis Jul 13, 2024
e1d7096
Combine merge v custom id test cases
criminosis Jul 13, 2024
3d3a300
Better handle tests being reran
criminosis Jul 13, 2024
6f56392
Added merge_v_tests feature for tests
criminosis Jul 13, 2024
dada863
Formatting
criminosis Jul 13, 2024
bc94862
FIxed if condition
criminosis Jul 13, 2024
80357fe
Corrected cargo.toml merge_test feature
criminosis Jul 13, 2024
18051d3
Changed GH Action if statement formatting
criminosis Jul 13, 2024
6737d91
Increased docker compose timeout time for healthy service check
criminosis Jul 13, 2024
7a8a0cd
Fixed imports for merge_test module. Moved merge tests into their own…
criminosis Jul 13, 2024
097fa04
Use drop vertices test utility function
criminosis Jul 13, 2024
aff1d27
Drop vertices for test_merge_v_options
criminosis Jul 13, 2024
44cdfd3
Added mergeV step to anonymous traversals
criminosis Jul 13, 2024
1c97441
Implemented mergeE step
criminosis Jul 13, 2024
8b28edc
Added mergeV and mergeE to Bytecode WRITE_OPERATORS
criminosis Jul 13, 2024
72615b9
Implemented travsal test based on reference doc combo mergeV and mergeE
criminosis Jul 13, 2024
edffffb
Support literal options for choose step and added test
criminosis Jul 13, 2024
7a255df
Rewrote side effect and expose via GraphTraversal
criminosis Jul 16, 2024
09d562c
Implemented support for Columns in By Step
criminosis Jul 16, 2024
97b677c
Expose properties() step in an anonymouse traversal
criminosis Jul 16, 2024
f0115d2
Also update property_many, property_with_cardinality, and property_ma…
criminosis Jul 17, 2024
f8f5fec
If a request responds with a websocket error mark the conneciton as i…
criminosis Aug 2, 2024
668c564
Map additional tungstenite errors to GremlinError::WebSocketAsync
criminosis Aug 6, 2024
cdcc212
Arc tungstenite::Error into WebSocketAsync error to maintain Async er…
criminosis Aug 22, 2024
2b72a1a
Expose healthcheck interval setting on async connection pool
criminosis Aug 27, 2024
093c04c
Formatting
criminosis Aug 27, 2024
802a20c
Map mobc pool errors to type that would invalidate connection
criminosis Aug 27, 2024
03c24fe
Exploratory logging
criminosis Aug 31, 2024
a342d8c
Added uuid to connection instance logging
criminosis Aug 31, 2024
ec1437d
Update mobc and make its idle connection behavior the same as the rd2…
criminosis Sep 1, 2024
784ca87
0.8 mobc does not treat async-std feature as mutually exclusive from …
criminosis Sep 1, 2024
c10ceaa
Include tokio/sync for mobc compilation in async-std-runtime feature
criminosis Sep 1, 2024
6a94a88
Implemented None step
criminosis Sep 17, 2024
bce3737
Implemented iterator() method on returned remote stream to consume st…
criminosis Sep 17, 2024
5db2b58
Trial connection multiplexing for non-credential configured clients
criminosis Sep 21, 2024
8082d3b
Formatting
criminosis Sep 21, 2024
7500820
Removed internal channel bounding
criminosis Sep 24, 2024
839ef5e
Revert "Removed internal channel bounding"
criminosis Sep 27, 2024
41c753d
Revert "Formatting"
criminosis Sep 27, 2024
19c9a40
Revert "Trial connection multiplexing for non-credential configured c…
criminosis Sep 27, 2024
cebe99a
Revert "Added uuid to connection instance logging"
criminosis Sep 27, 2024
3b894b4
Revert "Exploratory logging"
criminosis Sep 27, 2024
f2d4346
Formatting
criminosis Sep 27, 2024
2c5bf51
Switched command to docker compose in coverage GH Action Workflow
criminosis Sep 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- uses: actions/checkout@v2
- name: Starting Gremlin Servers
run: |
docker-compose -f ./docker-compose/docker-compose.yaml up -d
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems docker-compose (note the middle - is no longer a known command on the latest action runners, so changed it here and in the test workflow files

docker compose -f ./docker-compose/docker-compose.yaml up -d
env:
GREMLIN_SERVER: ${{ matrix.gremlin-server }}

Expand Down
17 changes: 16 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- uses: actions/checkout@v2
- name: Starting Gremlin Servers
run: |
docker-compose -f ./docker-compose/docker-compose.yaml up -d
docker compose -f ./docker-compose/docker-compose.yaml up -d --wait --wait-timeout 90
env:
GREMLIN_SERVER: ${{ matrix.gremlin-server }}

Expand All @@ -42,12 +42,27 @@ jobs:
command: fmt
args: --all -- --check
- name: Run cargo test with tokio
if: matrix.gremlin-server == '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime
- name: Run cargo test with async-std
if: matrix.gremlin-server == '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime
# MergeV as a step doesn't exist in 3.5.x, so selectively run those tests
- name: Run cargo test with tokio
if: matrix.gremlin-server != '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime,merge_tests
- name: Run cargo test with async-std
if: matrix.gremlin-server != '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime,merge_tests
13 changes: 13 additions & 0 deletions docker-compose/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,16 @@ services:
command : ["conf/gremlin-server-credentials.yaml"]
ports:
- "8183:8182"
janusgraph:
image: janusgraph/janusgraph:latest
environment:
- janusgraph.graph.set-vertex-id=true
- janusgraph.graph.allow-custom-vid-types=true
- JANUS_PROPS_TEMPLATE=inmemory
ports:
- "8184:8182"
healthcheck:
test: ["CMD", "bin/gremlin.sh", "-e", "scripts/remote-connect.groovy"]
interval: 10s
timeout: 30s
retries: 3
7 changes: 4 additions & 3 deletions gremlin-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ readme = "README.md"
[features]

default = []
merge_tests = []



async_gremlin = ["futures","mobc","async-tungstenite","async-trait","url","pin-project-lite"]

async_std = ["async-std-runtime"]
tokio-runtime = ["async_gremlin","tokio","mobc/tokio","async-tungstenite/tokio-runtime","async-tungstenite/tokio-native-tls","tokio-native-tls","tokio-stream"]
async-std-runtime = ["async_gremlin","async-std","async-tungstenite/async-std-runtime","async-tungstenite/async-tls","mobc/async-std","async-tls","rustls","webpki"]
tokio-runtime = ["async_gremlin","tokio","async-tungstenite/tokio-runtime","async-tungstenite/tokio-native-tls","tokio-native-tls","tokio-stream"]
async-std-runtime = ["async_gremlin","async-std","async-tungstenite/async-std-runtime","async-tungstenite/async-tls","tokio/sync", "mobc/async-std","async-tls","rustls","webpki"]

derive = ["gremlin-derive"]

Expand Down Expand Up @@ -57,7 +58,7 @@ thiserror = "1.0.20"



mobc = {version = "0.7", optional = true, default-features=false, features = ["unstable"] }
mobc = {version = "0.8", optional = true }
url = {version = "2.1.0", optional = true}
futures = { version = "0.3.1", optional = true}
pin-project-lite = { version = "0.2", optional = true}
Expand Down
3 changes: 3 additions & 0 deletions gremlin-client/src/aio/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use crate::GValue;
use crate::ToGValue;
use crate::{ConnectionOptions, GremlinError, GremlinResult};
use base64::encode;

Check warning on line 11 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 11 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 11 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 11 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated function `base64::encode`: Use Engine::encode
use futures::future::{BoxFuture, FutureExt};
use mobc::{Connection, Pool};
use serde::Serialize;
Expand Down Expand Up @@ -59,6 +59,9 @@
let pool = Pool::builder()
.get_timeout(opts.pool_get_connection_timeout)
.max_open(pool_size as u64)
.health_check_interval(opts.pool_healthcheck_interval)
//Makes max idle connections equal to max open, matching the behavior of the sync pool r2d2
.max_idle(0)
.build(manager);

Ok(GremlinClient {
Expand Down Expand Up @@ -179,7 +182,7 @@

args.insert(
String::from("sasl"),
GValue::String(encode(&format!("\0{}\0{}", c.username, c.password))),

Check warning on line 185 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 185 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 185 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated function `base64::encode`: Use Engine::encode

Check warning on line 185 in gremlin-client/src/aio/client.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated function `base64::encode`: Use Engine::encode
);

let args = self.options.serializer.write(&GValue::from(args))?;
Expand Down
20 changes: 18 additions & 2 deletions gremlin-client/src/aio/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{GremlinError, GremlinResult, WebSocketOptions};

Check warning on line 1 in gremlin-client/src/aio/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

unused import: `WebSocketOptions`

Check warning on line 1 in gremlin-client/src/aio/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

unused import: `WebSocketOptions`

Check warning on line 1 in gremlin-client/src/aio/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

unused import: `WebSocketOptions`

Check warning on line 1 in gremlin-client/src/aio/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

unused import: `WebSocketOptions`

use crate::connection::ConnectionOptions;

Expand All @@ -21,6 +21,7 @@
pub use tokio_native_tls::TlsStream;
}

use futures::TryFutureExt;
#[cfg(feature = "tokio-runtime")]
use tokio_use::*;

Expand Down Expand Up @@ -89,7 +90,7 @@
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::TLSError> {

Check warning on line 93 in gremlin-client/src/aio/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated type alias `rustls::TLSError`: Use Error

Check warning on line 93 in gremlin-client/src/aio/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated type alias `rustls::TLSError`: Use Error
Ok(rustls::client::ServerCertVerified::assertion())
}
}
Expand Down Expand Up @@ -144,6 +145,7 @@
tls::connector(&opts),
websocket_config,
)
.map_err(|e| Arc::new(e))
.await?
};
#[cfg(feature = "tokio-runtime")]
Expand All @@ -153,6 +155,7 @@
tls::connector(&opts),
websocket_config,
)
.map_err(|e| Arc::new(e))
.await?
};

Expand Down Expand Up @@ -190,6 +193,18 @@
.await
.expect("It should contain the response")
.map(|r| (r, receiver))
.map_err(|e| {
//If there's been an websocket layer error, mark the connection as invalid
match e {
GremlinError::WebSocket(_)
| GremlinError::WebSocketAsync(_)
| GremlinError::WebSocketPoolAsync(_) => {
self.valid = false;
}
_ => {}
}
e
})
}

pub fn is_valid(&self) -> bool {
Expand Down Expand Up @@ -222,7 +237,7 @@
if let Err(e) = sink.send(Message::Binary(msg.2)).await {
let mut sender = guard.remove(&msg.1).unwrap();
sender
.send(Err(GremlinError::from(e)))
.send(Err(GremlinError::from(Arc::new(e))))
.await
.expect("Failed to send error");
}
Expand Down Expand Up @@ -257,8 +272,9 @@
match stream.next().await {
Some(Err(error)) => {
let mut guard = requests.lock().await;
let error = Arc::new(error);
for s in guard.values_mut() {
match s.send(Err(GremlinError::from(&error))).await {
match s.send(Err(error.clone().into())).await {
Ok(_r) => {}
Err(_e) => {}
}
Expand Down
15 changes: 0 additions & 15 deletions gremlin-client/src/aio/error.rs

This file was deleted.

1 change: 0 additions & 1 deletion gremlin-client/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ pub(crate) mod connection;
pub(crate) mod pool;
mod result;

mod error;
pub(crate) mod process;
pub use client::GremlinClient;
pub use process::traversal::AsyncTerminator;
Expand Down
12 changes: 12 additions & 0 deletions gremlin-client/src/aio/process/traversal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::GremlinResult;
use core::task::Context;
use core::task::Poll;
use futures::Stream;
use futures::StreamExt;
use std::marker::PhantomData;
use std::pin::Pin;

Expand All @@ -29,6 +30,17 @@ impl<T> RemoteTraversalStream<T> {
}
}
}

impl RemoteTraversalStream<crate::structure::Null> {
pub async fn iterate(&mut self) -> GremlinResult<()> {
while let Some(response) = self.next().await {
//consume the entire stream, returning any errors
response?;
}
Ok(())
}
}

impl<T: FromGValue> Stream for RemoteTraversalStream<T> {
type Item = GremlinResult<T>;

Expand Down
17 changes: 15 additions & 2 deletions gremlin-client/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::TcpStream, time::Duration};
use std::{net::TcpStream, sync::Arc, time::Duration};

use crate::{GraphSON, GremlinError, GremlinResult};
use native_tls::TlsConnector;
Expand Down Expand Up @@ -62,12 +62,13 @@

fn send(&mut self, payload: Vec<u8>) -> GremlinResult<()> {
self.0
.write_message(Message::Binary(payload))

Check warning on line 65 in gremlin-client/src/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

use of deprecated method `tungstenite::WebSocket::<Stream>::write_message`: Use `send`

Check warning on line 65 in gremlin-client/src/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

use of deprecated method `tungstenite::WebSocket::<Stream>::write_message`: Use `send`
.map_err(|e| Arc::new(e))
.map_err(GremlinError::from)
}

fn recv(&mut self) -> GremlinResult<Vec<u8>> {
match self.0.read_message()? {
match self.0.read_message().map_err(|e| Arc::new(e))? {
Message::Binary(binary) => Ok(binary),
_ => unimplemented!(),
}
Expand Down Expand Up @@ -120,6 +121,16 @@
self
}

/// Only applicable to async client. By default a connection is checked on each return to the pool (None)
/// This allows setting an interval of how often it is checked on return.
pub fn pool_healthcheck_interval(
mut self,
pool_healthcheck_interval: Option<Duration>,
) -> Self {
self.0.pool_healthcheck_interval = pool_healthcheck_interval;
self
}

/// Both the sync and async pool providers use a default of 30 seconds,
/// Async pool interprets `None` as no timeout. Sync pool maps `None` to the default value
pub fn pool_connection_timeout(mut self, pool_connection_timeout: Option<Duration>) -> Self {
Expand Down Expand Up @@ -170,6 +181,7 @@
pub(crate) host: String,
pub(crate) port: u16,
pub(crate) pool_size: u32,
pub(crate) pool_healthcheck_interval: Option<Duration>,
pub(crate) pool_get_connection_timeout: Option<Duration>,
pub(crate) credentials: Option<Credentials>,
pub(crate) ssl: bool,
Expand Down Expand Up @@ -254,6 +266,7 @@
port: 8182,
pool_size: 10,
pool_get_connection_timeout: Some(Duration::from_secs(30)),
pool_healthcheck_interval: None,
credentials: None,
ssl: false,
tls_options: None,
Expand Down
16 changes: 15 additions & 1 deletion gremlin-client/src/conversion.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
process::traversal::Bytecode,
structure::{TextP, P as Predicate},
structure::{Null, TextP, P as Predicate},
Edge, GKey, GValue, GremlinError, GremlinResult, IntermediateRepr, List, Map, Metric, Path,
Property, Token, TraversalExplanation, TraversalMetrics, Vertex, VertexProperty, GID,
};
Expand Down Expand Up @@ -134,9 +134,23 @@ impl_from_gvalue!(IntermediateRepr, GValue::IntermediateRepr);
impl_from_gvalue!(chrono::DateTime<chrono::Utc>, GValue::Date);
impl_from_gvalue!(Traverser, GValue::Traverser);

impl FromGValue for Null {
fn from_gvalue(v: GValue) -> GremlinResult<Self> {
match v {
GValue::Null => Ok(crate::structure::Null {}),
_ => Err(GremlinError::Cast(format!(
"Cannot convert {:?} to {}",
v,
stringify!($t)
))),
}
}
}

impl FromGValue for GKey {
fn from_gvalue(v: GValue) -> GremlinResult<GKey> {
match v {
GValue::Direction(d) => Ok(GKey::Direction(d)),
GValue::String(s) => Ok(GKey::String(s)),
GValue::Token(s) => Ok(GKey::String(s.value().clone())),
GValue::Vertex(s) => Ok(GKey::Vertex(s)),
Expand Down
12 changes: 7 additions & 5 deletions gremlin-client/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::structure::GValue;

use thiserror::Error;
Expand Down Expand Up @@ -34,7 +36,10 @@ pub enum GremlinError {

#[cfg(feature = "async_gremlin")]
#[error(transparent)]
WebSocketAsync(#[from] async_tungstenite::tungstenite::Error),
WebSocketAsync(#[from] Arc<async_tungstenite::tungstenite::Error>),
#[cfg(feature = "async_gremlin")]
#[error(transparent)]
WebSocketPoolAsync(#[from] Arc<mobc::Error<GremlinError>>),
#[cfg(feature = "async_gremlin")]
#[error(transparent)]
ChannelSend(#[from] futures::channel::mpsc::SendError),
Expand All @@ -47,10 +52,7 @@ impl From<mobc::Error<GremlinError>> for GremlinError {
fn from(e: mobc::Error<GremlinError>) -> GremlinError {
match e {
mobc::Error::Inner(e) => e,
mobc::Error::BadConn => {
GremlinError::Generic(String::from("Async pool bad connection"))
}
mobc::Error::Timeout => GremlinError::Generic(String::from("Async pool timeout")),
other => GremlinError::WebSocketPoolAsync(Arc::new(other)),
}
}
}
Expand Down
35 changes: 33 additions & 2 deletions gremlin-client/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod serializer_v3;

use crate::conversion::ToGValue;
use crate::process::traversal::{Order, Scope};
use crate::structure::{Cardinality, GValue, T};
use crate::structure::{Cardinality, Direction, GValue, Merge, T};
use serde_json::{json, Map, Value};
use std::string::ToString;

Expand Down Expand Up @@ -216,7 +216,38 @@ impl GraphSON {
"@value" : v
}))
}

(_, GValue::Merge(merge)) => {
let merge_option = match merge {
Merge::OnCreate => "onCreate",
Merge::OnMatch => "onMatch",
Merge::OutV => "outV",
Merge::InV => "inV",
};
Ok(json!({
"@type" : "g:Merge",
"@value" : merge_option
}))
}
(_, GValue::Direction(direction)) => {
let direction = match direction {
Direction::Out | Direction::From => "OUT",
Direction::In | Direction::To => "IN",
};
Ok(json!({
"@type" : "g:Direction",
"@value" : direction,
}))
}
(_, GValue::Column(column)) => {
let column = match column {
crate::structure::Column::Keys => "keys",
crate::structure::Column::Values => "values",
};
Ok(json!({
"@type" : "g:Column",
"@value" : column,
}))
}
(_, _) => panic!("Type {:?} not supported.", value),
}
}
Expand Down
Loading
Loading