Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: parse Loki labels in protobuf write path #5305

Merged
merged 8 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ humantime-serde.workspace = true
hyper = { version = "0.14", features = ["full"] }
influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" }
itertools.workspace = true
json5 = "0.4"
jsonb.workspace = true
lazy_static.workspace = true
log-query.workspace = true
Expand All @@ -86,6 +85,7 @@ prometheus.workspace = true
promql-parser.workspace = true
prost.workspace = true
query.workspace = true
quoted-string = "0.6"
rand.workspace = true
regex.workspace = true
reqwest.workspace = true
Expand Down Expand Up @@ -123,6 +123,7 @@ client = { workspace = true, features = ["testing"] }
common-base.workspace = true
common-test-util.workspace = true
criterion = "0.5"
json5 = "0.4"
mysql_async = { version = "0.33", default-features = false, features = [
"default-rustls",
] }
Expand All @@ -149,3 +150,7 @@ harness = false
[[bench]]
name = "to_http_output"
harness = false

[[bench]]
name = "loki_labels"
harness = false
41 changes: 41 additions & 0 deletions src/servers/benches/loki_labels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use servers::error::Result;
use servers::http::loki::parse_loki_labels;

// cargo bench loki_labels

fn json5_parse(input: &str) -> Result<BTreeMap<String, String>> {
let input = input.replace("=", ":");
let result: BTreeMap<String, String> = json5::from_str(&input).unwrap();
Ok(result)
}

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("loki_labels");
let input = r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#;

group.bench_function("json5", |b| b.iter(|| json5_parse(black_box(input))));
group.bench_function("hand_parse", |b| {
b.iter(|| parse_loki_labels(black_box(input)))
});
group.finish(); // Important to call finish() on the group
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
9 changes: 4 additions & 5 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,9 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to parse payload as json5"))]
ParseJson5 {
#[snafu(source)]
error: json5::Error,
#[snafu(display("Invalid Loki labels: {}", msg))]
InvalidLokiLabels {
msg: String,
#[snafu(implicit)]
location: Location,
},
Expand Down Expand Up @@ -666,7 +665,7 @@ impl ErrorExt for Error {
| MissingQueryContext { .. }
| MysqlValueConversion { .. }
| ParseJson { .. }
| ParseJson5 { .. }
| InvalidLokiLabels { .. }
| InvalidLokiPayload { .. }
| UnsupportedContentType { .. }
| TimestampOverflow { .. }
Expand Down
162 changes: 144 additions & 18 deletions src/servers/src/http/loki.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ use axum::{Extension, TypedHeader};
use bytes::Bytes;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::{Output, OutputData};
use common_telemetry::warn;
use common_telemetry::{error, warn};
use hashbrown::HashMap;
use lazy_static::lazy_static;
use loki_api::prost_types::Timestamp;
use prost::Message;
use quoted_string::test_utils::TestSpec;
use session::context::{Channel, QueryContext};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::{
DecodeOtlpRequestSnafu, InvalidLokiPayloadSnafu, ParseJson5Snafu, ParseJsonSnafu, Result,
UnsupportedContentTypeSnafu,
DecodeOtlpRequestSnafu, InvalidLokiLabelsSnafu, InvalidLokiPayloadSnafu, ParseJsonSnafu,
Result, UnsupportedContentTypeSnafu,
};
use crate::http::event::{LogState, JSON_CONTENT_TYPE, PB_CONTENT_TYPE};
use crate::http::extractor::LogTableName;
Expand Down Expand Up @@ -191,8 +192,7 @@ async fn handle_json_req(
l.iter()
.filter_map(|(k, v)| v.as_str().map(|v| (k.clone(), v.to_string())))
.collect::<BTreeMap<String, String>>()
})
.unwrap_or_default();
});

// process each line
for (line_index, line) in lines.iter().enumerate() {
Expand Down Expand Up @@ -230,7 +230,7 @@ async fn handle_json_req(
// TODO(shuiyisong): we'll ignore structured metadata for now

let mut row = init_row(schemas.len(), ts, line_text);
process_labels(&mut column_indexer, schemas, &mut row, labels.iter());
process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());

rows.push(row);
}
Expand All @@ -255,13 +255,11 @@ async fn handle_pb_req(
let mut rows = Vec::with_capacity(cnt);

for stream in req.streams {
// parse labels for each row
// encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145
// use very dirty hack to parse labels
// TODO(shuiyisong): remove json5 and parse the string directly
let labels = stream.labels.replace("=", ":");
// use btreemap to keep order
let labels: BTreeMap<String, String> = json5::from_str(&labels).context(ParseJson5Snafu)?;
let labels = parse_loki_labels(&stream.labels)
.inspect_err(|e| {
error!(e; "failed to parse loki labels");
})
.ok();

// process entries
for entry in stream.entries {
Expand All @@ -273,7 +271,7 @@ async fn handle_pb_req(
let line = entry.line;

let mut row = init_row(schemas.len(), prost_ts_to_nano(&ts), line);
process_labels(&mut column_indexer, schemas, &mut row, labels.iter());
process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());

rows.push(row);
}
Expand All @@ -282,6 +280,81 @@ async fn handle_pb_req(
Ok(rows)
}

/// since we're hand-parsing the labels, if any error is encountered, we'll just skip the label
/// note: pub here for bench usage
/// ref:
/// 1. encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145
/// 2. test data: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go
pub fn parse_loki_labels(labels: &str) -> Result<BTreeMap<String, String>> {
let mut labels = labels.trim();
ensure!(
labels.len() >= 2,
InvalidLokiLabelsSnafu {
msg: "labels string too short"
}
);
ensure!(
labels.starts_with("{"),
InvalidLokiLabelsSnafu {
msg: "missing `{` at the beginning"
}
);
ensure!(
labels.ends_with("}"),
InvalidLokiLabelsSnafu {
msg: "missing `}` at the end"
}
);

let mut result = BTreeMap::new();
labels = &labels[1..labels.len() - 1];

while !labels.is_empty() {
// parse key
let first_index = labels.find("=").context(InvalidLokiLabelsSnafu {
msg: format!("missing `=` near: {}", labels),
})?;
let key = &labels[..first_index];
labels = &labels[first_index + 1..];

// parse value
let qs = quoted_string::parse::<TestSpec>(labels)
.map_err(|e| {
InvalidLokiLabelsSnafu {
msg: format!(
"failed to parse quoted string near: {}, reason: {}",
labels, e.1
),
}
.build()
})?
.quoted_string;

labels = &labels[qs.len()..];

let value = quoted_string::to_content::<TestSpec>(qs).map_err(|e| {
InvalidLokiLabelsSnafu {
msg: format!("failed to unquote the string: {}, reason: {}", qs, e),
}
.build()
})?;

// insert key and value
result.insert(key.to_string(), value.to_string());

if labels.is_empty() {
break;
}
ensure!(
labels.starts_with(","),
InvalidLokiLabelsSnafu { msg: "missing `,`" }
);
labels = labels[1..].trim_start();
}

Ok(result)
}

#[inline]
fn prost_ts_to_nano(ts: &Timestamp) -> i64 {
ts.seconds * 1_000_000_000 + ts.nanos as i64
Expand All @@ -303,12 +376,16 @@ fn init_row(schema_len: usize, ts: i64, line: String) -> Vec<GreptimeValue> {
row
}

fn process_labels<'a>(
fn process_labels(
column_indexer: &mut HashMap<String, u16>,
schemas: &mut Vec<ColumnSchema>,
row: &mut Vec<GreptimeValue>,
labels: impl Iterator<Item = (&'a String, &'a String)>,
labels: Option<&BTreeMap<String, String>>,
) {
let Some(labels) = labels else {
return;
};

// insert labels
for (k, v) in labels {
if let Some(index) = column_indexer.get(k) {
Expand Down Expand Up @@ -359,9 +436,12 @@ macro_rules! unwrap_or_warn_continue {

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

use loki_api::prost_types::Timestamp;

use crate::http::loki::prost_ts_to_nano;
use crate::error::Error::InvalidLokiLabels;
use crate::http::loki::{parse_loki_labels, prost_ts_to_nano};

#[test]
fn test_ts_to_nano() {
Expand All @@ -374,4 +454,50 @@ mod tests {
};
assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888);
}

#[test]
fn test_parse_loki_labels() {
let mut expected = BTreeMap::new();
expected.insert("job".to_string(), "foobar".to_string());
expected.insert("cluster".to_string(), "foo-central1".to_string());
expected.insert("namespace".to_string(), "bar".to_string());
expected.insert("container_name".to_string(), "buzz".to_string());

// perfect case
let valid_labels =
r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#;
let re = parse_loki_labels(valid_labels);
assert!(re.is_ok());
assert_eq!(re.unwrap(), expected);

// too short
let too_short = r#"}"#;
let re = parse_loki_labels(too_short);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));

// missing start
let missing_start = r#"job="foobar"}"#;
let re = parse_loki_labels(missing_start);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));

// missing start
let missing_end = r#"{job="foobar""#;
let re = parse_loki_labels(missing_end);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));

// missing equal
let missing_equal = r#"{job"foobar"}"#;
let re = parse_loki_labels(missing_equal);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));

// missing quote
let missing_quote = r#"{job=foobar}"#;
let re = parse_loki_labels(missing_quote);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));

// missing comma
let missing_comma = r#"{job="foobar" cluster="foo-central1"}"#;
let re = parse_loki_labels(missing_comma);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
}
}
9 changes: 5 additions & 4 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) {
// init loki request
let req: PushRequest = PushRequest {
streams: vec![StreamAdapter {
labels: r#"{service="test",source="integration","wadaxi"="do anything"}"#.to_string(),
labels: r#"{service="test",source="integration",wadaxi="do anything"}"#.to_string(),
entries: vec![
EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
Expand Down Expand Up @@ -1953,7 +1953,8 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
"streams": [
{
"stream": {
"source": "test"
"source": "test",
"sender": "integration"
},
"values": [
[ "1735901380059465984", "this is line one" ],
Expand Down Expand Up @@ -1987,7 +1988,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());

// test schema
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_json_schema",
&client,
Expand All @@ -1997,7 +1998,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
.await;

// test content
let expected = "[[1735901380059465984,\"this is line one\",\"test\"],[1735901398478897920,\"this is line two\",\"test\"]]";
let expected = "[[1735901380059465984,\"this is line one\",\"integration\",\"test\"],[1735901398478897920,\"this is line two\",\"integration\",\"test\"]]";
validate_data(
"loki_json_content",
&client,
Expand Down
Loading