Skip to content

Commit

Permalink
[bench] Adds a simple benchmark for network serde
Browse files Browse the repository at this point in the history
This introduces a basic benchmark code that perf-tests the current multi-layer serialization used by replicated loglet. Note that on deserialization we don't deserialize the envelopes in this bench.

```

>>  cargo bench -p restate-bifrost --bench replicated_loglet_serde

replicated-loglet-serde/serialize
                        time:   [43.885 µs 43.930 µs 43.981 µs]
                        change: [-21.059% -20.563% -19.964%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 40 measurements (5.00%)
  2 (5.00%) high mild
replicated-loglet-serde/deserialize
                        time:   [2.7985 µs 2.8079 µs 2.8256 µs]
                        change: [+0.5121% +0.9651% +1.4768%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 2 outliers among 40 measurements (5.00%)
  1 (2.50%) high mild
  1 (2.50%) high severe

```
  • Loading branch information
AhmedSoliman committed Dec 18, 2024
1 parent 5b98f8e commit 5b4873e
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 1 deletion.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,29 @@ xxhash-rust = { workspace = true, features = ["xxh3"] }
restate-core = { workspace = true, features = ["test-util"] }
restate-log-server = { workspace = true }
restate-metadata-store = { workspace = true }
restate-storage-api = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
restate-wal-protocol = { workspace = true, features = ["serde"] }

bytestring = { workspace = true }
criterion = { workspace = true, features = ["async_tokio"] }
enumset = { workspace = true }
googletest = { workspace = true }
paste = { workspace = true }
pprof = { version = "0.14", features = ["criterion", "flamegraph", "frame-pointer"] }
prost = { workspace = true }
rlimit = { workspace = true }
tempfile = { workspace = true }
test-log = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
tracing-test = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dev-dependencies]
tikv-jemallocator = { workspace = true, features = ["unprefixed_malloc_on_supported_platforms", "profiling"] }


[[bench]]
name = "replicated_loglet_serde"
harness = false
199 changes: 199 additions & 0 deletions crates/bifrost/benches/replicated_loglet_serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;
use std::time::Duration;

use bytes::{Bytes, BytesMut};
use bytestring::ByteString;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use pprof::flamegraph::Options;
use prost::Message as _;
use rand::distributions::Alphanumeric;
use rand::{random, Rng};

use restate_bifrost::InputRecord;
use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber, ProducerId};
use restate_types::identifiers::{InvocationId, LeaderEpoch, PartitionProcessorRpcRequestId};
use restate_types::invocation::{
InvocationTarget, ServiceInvocation, ServiceInvocationSpanContext,
};
use restate_types::logs::{LogId, Record};
use restate_types::net::codec::{serialize_message, MessageBodyExt, WireDecode};
use restate_types::net::replicated_loglet::{Append, CommonRequestHeader};
use restate_types::protobuf::node::Message;
use restate_types::replicated_loglet::ReplicatedLogletId;
use restate_types::time::MillisSinceEpoch;
use restate_types::GenerationalNodeId;
use restate_wal_protocol::{Command, Destination, Envelope};

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

pub fn flamegraph_options<'a>() -> Options<'a> {
#[allow(unused_mut)]
let mut options = Options::default();
if cfg!(target_os = "macos") {
// Ignore different thread origins to merge traces. This seems not needed on Linux.
options.base = vec!["__pthread_joiner_wake".to_string(), "_main".to_string()];
}
options
}

fn rand_string(len: usize) -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(len)
.map(char::from)
.collect()
}

pub fn generate_envelope() -> Arc<Envelope> {
let source_partition_id = random::<u16>().into();
let partition_key = random();
let leader_epoch = LeaderEpoch::from(random::<u64>());
let node_id = GenerationalNodeId::new(random(), random());
let idempotency_key: ByteString = rand_string(15).into();

let request_id = PartitionProcessorRpcRequestId::new();
let inv_source = restate_types::invocation::Source::Ingress(request_id);
let handler: ByteString = format!("aFunction_{}", rand_string(1)).into();

let header = restate_wal_protocol::Header {
source: restate_wal_protocol::Source::Processor {
partition_id: source_partition_id,
partition_key: Some(partition_key),
leader_epoch,
node_id: node_id.as_plain(),
generational_node_id: Some(node_id),
},
dest: Destination::Processor {
partition_key,
dedup: Some(DedupInformation {
producer_id: ProducerId::self_producer(),
sequence_number: restate_storage_api::deduplication_table::DedupSequenceNumber::Esn(
EpochSequenceNumber {
leader_epoch: LeaderEpoch::from(random::<u64>()),
sequence_number: random(),
},
),
}),
},
};
let command = Command::Invoke(ServiceInvocation {
invocation_id: InvocationId::generate(
&InvocationTarget::service("MyWonderfulService", handler.clone()),
Some(&idempotency_key),
),
invocation_target: InvocationTarget::Service {
name: "AnotherService".into(),
handler,
},
argument: "DataSent".to_string().into(),
source: inv_source,
span_context: ServiceInvocationSpanContext::default(),
headers: vec![restate_types::invocation::Header::new(
"content-type",
"application/json",
)],
execution_time: Some(MillisSinceEpoch::after(Duration::from_secs(10))),
completion_retention_duration: Some(Duration::from_secs(10)),
idempotency_key: Some(idempotency_key),
response_sink: Some(
restate_types::invocation::ServiceInvocationResponseSink::Ingress { request_id },
),
submit_notification_sink: Some(
restate_types::invocation::SubmitNotificationSink::Ingress { request_id },
),
});

Envelope::new(header, command).into()
}

fn serialize_append_message(payloads: Arc<[Record]>) -> anyhow::Result<Message> {
let append_message = Append {
header: CommonRequestHeader {
log_id: LogId::from(12u16),
segment_index: 2.into(),
loglet_id: ReplicatedLogletId::new(12u16.into(), 4.into()),
},
payloads,
};

let body = serialize_message(
append_message,
restate_types::net::ProtocolVersion::Flexbuffers,
)
.unwrap();

let message = Message {
header: Some(restate_types::protobuf::node::Header {
my_nodes_config_version: Some(restate_types::protobuf::common::Version { value: 5 }),
my_logs_version: None,
my_schema_version: None,
my_partition_table_version: None,
msg_id: random(),
in_response_to: None,
span_context: None,
}),
body: Some(body),
};
Ok(message)
}

fn deserialize_append_message(serialized_message: Bytes) -> anyhow::Result<Append> {
let protocol_version = restate_types::net::ProtocolVersion::Flexbuffers;
let msg = Message::decode(serialized_message)?;
let body = msg.body.unwrap();
// we ignore non-deserializable messages (serde errors, or control signals in drain)
let mut msg_body = body.try_as_binary_body(restate_types::net::ProtocolVersion::Flexbuffers)?;
Ok(Append::decode(&mut msg_body.payload, protocol_version)?)
}

fn replicated_loglet_append_serde(c: &mut Criterion) {
let mut group = c.benchmark_group("replicated-loglet-serde");
let batch: Vec<Record> = vec![generate_envelope(); 10]
.into_iter()
.map(|r| InputRecord::from(r).into_record())
.collect();

let payloads: Arc<[Record]> = batch.into();

let mut buf = BytesMut::new();
let message = serialize_append_message(payloads.clone()).unwrap();
message.encode(&mut buf).unwrap();
let serialized = buf.freeze();

group
.sample_size(40)
.measurement_time(Duration::from_secs(20))
.bench_function("serialize", |bencher| {
bencher.iter(|| {
let mut buf = BytesMut::new();
let message = black_box(serialize_append_message(payloads.clone()).unwrap());
black_box(message.encode(&mut buf)).unwrap();
});
})
.bench_function("deserialize", |bencher| {
bencher.iter(|| {
black_box(deserialize_append_message(serialized.clone())).unwrap();
});
});
group.finish();
}

criterion_group!(
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(997, Output::Flamegraph(Some(flamegraph_options()))));
targets = replicated_loglet_append_serde
);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion crates/bifrost/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<T> Clone for InputRecord<T> {
// This is a zero-cost transformation. The type is erased at runtime, but the underlying
// layout is identical.
impl<T: StorageEncode> InputRecord<T> {
pub(crate) fn into_record(self) -> Record {
pub fn into_record(self) -> Record {
Record::from_parts(self.created_at, self.keys, PolyBytes::Typed(self.body))
}
}
Expand Down

0 comments on commit 5b4873e

Please sign in to comment.