Skip to content

Commit

Permalink
Fix api for jepsen
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Apr 17, 2023
1 parent 98327f1 commit 9c639ff
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
6 changes: 6 additions & 0 deletions src/raft/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,9 @@ impl From<tokio::time::error::Elapsed> for APIError {
Self::Timeout
}
}

impl From<simd_json::Error> for APIError {
fn from(e: simd_json::Error) -> Self {
Self::Other(e.to_string())
}
}
29 changes: 20 additions & 9 deletions src/raft/api/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use crate::raft::{
api::{APIError, APIRequest, APIResult, ToAPIResult},
store::TremorSet,
};
use axum::{extract, routing::post, Router};
use axum::{extract, routing::post, Json, Router};
use http::StatusCode;
use simd_json::OwnedValue;
use tokio::time::timeout;

use super::API_WORKER_TIMEOUT;
Expand All @@ -29,14 +30,24 @@ pub(crate) fn endpoints() -> Router<APIRequest> {
.route("/consistent_read", post(consistent_read))
}

#[derive(Deserialize)]
struct KVSet {
key: String,
value: OwnedValue,
}

async fn write(
extract::State(state): extract::State<APIRequest>,
extract::OriginalUri(uri): extract::OriginalUri,
extract::Json(body): extract::Json<TremorSet>,
extract::Json(body): extract::Json<KVSet>,
) -> APIResult<Vec<u8>> {
let tremor_set = TremorSet {
key: body.key,
value: simd_json::to_vec(&body.value)?,
};
let res = state
.raft
.client_write(body.into())
.client_write(tremor_set.into())
.await
.to_api_result(&uri, &state)
.await?;
Expand All @@ -48,10 +59,10 @@ async fn write(
async fn read(
extract::State(state): extract::State<APIRequest>,
extract::Json(key): extract::Json<String>,
) -> APIResult<Vec<u8>> {
) -> APIResult<Json<OwnedValue>> {
let value = timeout(API_WORKER_TIMEOUT, state.raft_manager.kv_get_local(key)).await??;
if let Some(value) = value {
Ok(value)
if let Some(mut value) = value {
Ok(Json(simd_json::from_slice(&mut value)?))
} else {
Err(APIError::HTTP {
status: StatusCode::NOT_FOUND,
Expand All @@ -65,15 +76,15 @@ async fn consistent_read(
extract::State(state): extract::State<APIRequest>,
extract::OriginalUri(uri): extract::OriginalUri,
extract::Json(key): extract::Json<String>,
) -> APIResult<Vec<u8>> {
) -> APIResult<Json<OwnedValue>> {
// this will fail if we are not a leader
state.ensure_leader(Some(uri.clone())).await?;
// here we are safe to read
let value = timeout(API_WORKER_TIMEOUT, state.raft_manager.kv_get_local(key)).await??;
// Ensure that we are still the leader at the end of the read so we can guarantee freshness
state.ensure_leader(Some(uri)).await?;
if let Some(value) = value {
Ok(value)
if let Some(mut value) = value {
Ok(Json(simd_json::from_slice(&mut value)?))
} else {
Err(APIError::HTTP {
status: StatusCode::NOT_FOUND,
Expand Down

0 comments on commit 9c639ff

Please sign in to comment.