From 0513f5c83d42d34cfda01febf7b87482ec250d0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Sun, 5 Apr 2020 09:33:18 +0200 Subject: [PATCH] FIX: handle none workerstate in StealResponse --- src/protocol/protocol.rs | 28 +++++++++++++++++++----- src/protocol/workermsg.rs | 4 ++-- src/server/core.rs | 2 +- tests/data/steal-response-state-none.bin | 1 + 4 files changed, 27 insertions(+), 8 deletions(-) create mode 100644 tests/data/steal-response-state-none.bin diff --git a/src/protocol/protocol.rs b/src/protocol/protocol.rs index d9b76ee..9b5f692 100644 --- a/src/protocol/protocol.rs +++ b/src/protocol/protocol.rs @@ -575,10 +575,7 @@ mod tests { task_spec_to_memory, ClientTaskSpec, DirectTaskSpec, FromClientMessage, KeyInMemoryMsg, ToClientMessage, UpdateGraphMsg, }; - use crate::protocol::protocol::{ - asyncwrite_to_sink, serialize_single_packet, split_packet_into_parts, Batch, DaskCodec, - DaskPacket, DaskPacketPart, Frame, MessageWrapper, SerializedMemory, - }; + use crate::protocol::protocol::{asyncwrite_to_sink, serialize_single_packet, split_packet_into_parts, Batch, DaskCodec, DaskPacket, DaskPacketPart, Frame, MessageWrapper, SerializedMemory, SerializedTransport}; use crate::Result; use bytes::{Buf, BufMut, BytesMut}; use futures::SinkExt; @@ -587,7 +584,7 @@ mod tests { use crate::common::Map; use crate::protocol::key::{to_dask_key, DaskKey}; use crate::protocol::protocol::IntoInner; - use crate::protocol::workermsg::RegisterWorkerResponseMsg; + use crate::protocol::workermsg::{RegisterWorkerResponseMsg, FromWorkerMessage}; use crate::test_util::{bytes_to_msg, load_bin_test_data}; use std::collections::hash_map::DefaultHasher; use std::hash::Hasher; @@ -890,6 +887,27 @@ mod tests { Ok(()) } + #[tokio::test] + async fn parse_steal_response_state_none() -> Result<()> { + let main = load_bin_test_data("data/steal-response-state-none.bin"); + let msg: MessageWrapper> = + rmp_serde::from_slice(main.as_slice())?; + match msg { + MessageWrapper::MessageList(v) => { + assert_eq!(v.len(), 1); + match &v[0] { + FromWorkerMessage::StealResponse(msg) => { + assert!(msg.state.is_none()); + } + _ => panic!() + } + } + _ => panic!(), + } + + Ok(()) + } + #[tokio::test] async fn serialize_key_in_memory() -> Result<()> { let msg = ToClientMessage::KeyInMemory(KeyInMemoryMsg { diff --git a/src/protocol/workermsg.rs b/src/protocol/workermsg.rs index 88ed037..de83496 100644 --- a/src/protocol/workermsg.rs +++ b/src/protocol/workermsg.rs @@ -188,13 +188,13 @@ pub enum WorkerState { Error, Rescheduled, Constrained, - LongRunning, + LongRunning } #[derive(Serialize, Deserialize, Debug)] pub struct StealResponseMsg { pub key: DaskKey, - pub state: WorkerState, + pub state: Option, } #[derive(Serialize, Deserialize, Debug)] diff --git a/src/server/core.rs b/src/server/core.rs index 7350423..a3533de 100644 --- a/src/server/core.rs +++ b/src/server/core.rs @@ -345,7 +345,7 @@ impl Core { }; // This needs to correspond with behavior in worker! - let success = match msg.state { + let success = match msg.state.unwrap_or(WorkerState::Error) { WorkerState::Waiting | WorkerState::Ready => true, _ => false, }; diff --git a/tests/data/steal-response-state-none.bin b/tests/data/steal-response-state-none.bin new file mode 100644 index 0000000..fe683aa --- /dev/null +++ b/tests/data/steal-response-state-none.bin @@ -0,0 +1 @@ +‘ƒ¢op®steal-response£keyŁ<('sum-partial-4372a19b33e6131e2a9e5de4ef35499c', 0, 0, 0, 4)„stateĄ \ No newline at end of file