Skip to content

Commit

Permalink
Fix transactions not converting values internally. (valkey-io#727)
Browse files Browse the repository at this point in the history
Renamed `send_pipeline` to `send_transaction`, in order to separate it
from a function that can handle non-transactional pipeline. Added logic
that checks the structure of the response, and converts all values to
their expected form.

Co-authored-by: nihohit <[email protected]>
  • Loading branch information
shachlanAmazon and nihohit authored Dec 27, 2023
1 parent 1128788 commit 60ac1c7
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 24 deletions.
55 changes: 44 additions & 11 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,17 @@ impl Client {
.boxed()
}

pub fn send_pipeline<'a>(
pub fn send_transaction<'a>(
&'a mut self,
pipeline: &'a redis::Pipeline,
offset: usize,
count: usize,
routing: Option<RoutingInfo>,
) -> redis::RedisFuture<'a, Vec<Value>> {
) -> redis::RedisFuture<'a, Value> {
let command_count = pipeline.cmd_iter().count();
let offset = command_count + 1;
run_with_timeout(self.request_timeout, async move {
match self.internal_client {
ClientWrapper::Standalone(ref mut client) => {
client.send_pipeline(pipeline, offset, count).await
client.send_pipeline(pipeline, offset, 1).await
}

ClientWrapper::Cluster { ref mut client } => {
Expand All @@ -208,17 +208,50 @@ impl Client {
_ => SingleNodeRoutingInfo::Random,
};

client.route_pipeline(pipeline, offset, count, route).await
client.route_pipeline(pipeline, offset, 1, route).await
}
}
.and_then(|values| {
values
.and_then(|mut values| {
assert_eq!(values.len(), 1);
let values = values.pop();
let values = match values {
Some(Value::Array(values)) => values,
Some(Value::Nil) => {
return Ok(Value::Nil);
}
Some(value) => {
if offset == 2 {
vec![value]
} else {
return Err((
ErrorKind::ResponseError,
"Received non-array response for transaction",
format!("Response: `{value:?}`"),
)
.into());
}
}
_ => {
return Err((
ErrorKind::ResponseError,
"Received empty response for transaction",
)
.into());
}
};

let results = values
.into_iter()
.zip(pipeline.cmd_iter().map(expected_type_for_cmd))
.map(|(value, expected_type)| convert_to_expected_type(value, expected_type))
.collect::<Vec<_>>()
.into_iter()
.collect()
.try_fold(
Vec::with_capacity(command_count),
|mut acc, result| -> RedisResult<_> {
acc.push(result?);
Ok(acc)
},
)?;
Ok(Value::Array(results))
})
})
.boxed()
Expand Down
4 changes: 1 addition & 3 deletions glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,16 +381,14 @@ async fn send_transaction(
routing: Option<RoutingInfo>,
) -> ClientUsageResult<Value> {
let mut pipeline = redis::Pipeline::with_capacity(request.commands.capacity());
let offset = request.commands.len() + 1;
pipeline.atomic();
for command in request.commands {
pipeline.add_command(get_redis_command(&command)?);
}

client
.send_pipeline(&pipeline, offset, 1, routing)
.send_transaction(&pipeline, routing)
.await
.map(|mut values| values.pop().unwrap_or(Value::Nil))
.map_err(|err| err.into())
}

Expand Down
51 changes: 46 additions & 5 deletions glide-core/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ mod shared_client_tests {

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_request_pipeline_timeout(#[values(false, true)] use_cluster: bool) {
fn test_request_transaction_timeout(#[values(false, true)] use_cluster: bool) {
block_on_all(async {
let mut test_basics = setup_test_basics(
use_cluster,
Expand All @@ -267,13 +267,54 @@ mod shared_client_tests {

let mut pipeline = redis::pipe();
pipeline.blpop("foo", 0.0); // 0 timeout blocks indefinitely
let result = test_basics
.client
.send_pipeline(&pipeline, 0, 1, None)
.await;
let result = test_basics.client.send_transaction(&pipeline, None).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.is_timeout(), "{err}");
});
}

#[rstest]
#[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]
fn test_request_transaction_and_convert_all_values(#[values(false, true)] use_cluster: bool) {
block_on_all(async {
let mut test_basics = setup_test_basics(
use_cluster,
TestConfiguration {
shared_server: true,
protocol: glide_core::connection_request::ProtocolVersion::RESP2,
..Default::default()
},
)
.await;

let key = generate_random_string(10);
let mut pipeline = redis::pipe();
pipeline.atomic();
pipeline.hset(&key, "bar", "vaz");
pipeline.hgetall(&key);
pipeline.hexists(&key, "bar");
pipeline.del(&key);
pipeline.set(&key, "0");
pipeline.cmd("INCRBYFLOAT").arg(&key).arg("0.5");
pipeline.del(&key);

let result = test_basics.client.send_transaction(&pipeline, None).await;
assert_eq!(
result,
Ok(Value::Array(vec![
Value::Int(1),
Value::Map(vec![(
Value::BulkString(b"bar".to_vec()),
Value::BulkString(b"vaz".to_vec()),
)]),
Value::Boolean(true),
Value::Int(1),
Value::Okay,
Value::Double(0.5.into()),
Value::Int(1),
]),)
);
});
}
}
4 changes: 3 additions & 1 deletion glide-core/tests/utilities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use futures::Future;
use glide_core::{
client::{Client, StandaloneClient},
connection_request::{self, AuthenticationInfo, NodeAddress},
connection_request::{self, AuthenticationInfo, NodeAddress, ProtocolVersion},
};
use once_cell::sync::Lazy;
use rand::{distributions::Alphanumeric, Rng};
Expand Down Expand Up @@ -656,6 +656,7 @@ pub struct TestConfiguration {
pub shared_server: bool,
pub read_from: Option<connection_request::ReadFrom>,
pub database_id: u32,
pub protocol: ProtocolVersion,
}

pub(crate) async fn setup_test_basics_internal(configuration: &TestConfiguration) -> TestBasics {
Expand All @@ -680,6 +681,7 @@ pub(crate) async fn setup_test_basics_internal(configuration: &TestConfiguration
}
let mut connection_request = create_connection_request(&[connection_addr], configuration);
connection_request.cluster_mode_enabled = false;
connection_request.protocol = configuration.protocol.into();
let client = StandaloneClient::create_client(connection_request)
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export function transactionTest(
{ [field]: value },
1,
[null],
0,
false,
3,
field + "3",
2,
Expand Down
6 changes: 3 additions & 3 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,18 @@ def transaction_test(
3,
2,
0,
"0.5",
0.5,
1,
"PONG",
OK,
["timeout", "1000"],
2,
value2,
5,
"10.5",
10.5,
1,
[value, None, value2],
[key, value, key2, value2, key3, "10.5"],
{key: value, key2: value2, key3: "10.5"},
2,
None,
4,
Expand Down

0 comments on commit 60ac1c7

Please sign in to comment.