Skip to content

Commit

Permalink
FIX: handle none workerstate in StealResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
Kobzol committed Apr 5, 2020
1 parent 2927b96 commit 0513f5c
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 8 deletions.
28 changes: 23 additions & 5 deletions src/protocol/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,7 @@ mod tests {
task_spec_to_memory, ClientTaskSpec, DirectTaskSpec, FromClientMessage, KeyInMemoryMsg,
ToClientMessage, UpdateGraphMsg,
};
use crate::protocol::protocol::{
asyncwrite_to_sink, serialize_single_packet, split_packet_into_parts, Batch, DaskCodec,
DaskPacket, DaskPacketPart, Frame, MessageWrapper, SerializedMemory,
};
use crate::protocol::protocol::{asyncwrite_to_sink, serialize_single_packet, split_packet_into_parts, Batch, DaskCodec, DaskPacket, DaskPacketPart, Frame, MessageWrapper, SerializedMemory, SerializedTransport};
use crate::Result;
use bytes::{Buf, BufMut, BytesMut};
use futures::SinkExt;
Expand All @@ -587,7 +584,7 @@ mod tests {
use crate::common::Map;
use crate::protocol::key::{to_dask_key, DaskKey};
use crate::protocol::protocol::IntoInner;
use crate::protocol::workermsg::RegisterWorkerResponseMsg;
use crate::protocol::workermsg::{RegisterWorkerResponseMsg, FromWorkerMessage};
use crate::test_util::{bytes_to_msg, load_bin_test_data};
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
Expand Down Expand Up @@ -890,6 +887,27 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parse_steal_response_state_none() -> Result<()> {
let main = load_bin_test_data("data/steal-response-state-none.bin");
let msg: MessageWrapper<FromWorkerMessage<SerializedTransport>> =
rmp_serde::from_slice(main.as_slice())?;
match msg {
MessageWrapper::MessageList(v) => {
assert_eq!(v.len(), 1);
match &v[0] {
FromWorkerMessage::StealResponse(msg) => {
assert!(msg.state.is_none());
}
_ => panic!()
}
}
_ => panic!(),
}

Ok(())
}

#[tokio::test]
async fn serialize_key_in_memory() -> Result<()> {
let msg = ToClientMessage::KeyInMemory(KeyInMemoryMsg {
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/workermsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ pub enum WorkerState {
Error,
Rescheduled,
Constrained,
LongRunning,
LongRunning
}

#[derive(Serialize, Deserialize, Debug)]
pub struct StealResponseMsg {
pub key: DaskKey,
pub state: WorkerState,
pub state: Option<WorkerState>,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion src/server/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl Core {
};

// This needs to correspond with behavior in worker!
let success = match msg.state {
let success = match msg.state.unwrap_or(WorkerState::Error) {
WorkerState::Waiting | WorkerState::Ready => true,
_ => false,
};
Expand Down
1 change: 1 addition & 0 deletions tests/data/steal-response-state-none.bin
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
���op�steal-response�key�<('sum-partial-4372a19b33e6131e2a9e5de4ef35499c', 0, 0, 0, 4)�state�

0 comments on commit 0513f5c

Please sign in to comment.