Skip to content

Commit

Permalink
fix: aggregation serialization (#97)
Browse files Browse the repository at this point in the history
* fix: aggregation serialization

* test: add include/exclude cases

* chore: align null fields with upstream
  • Loading branch information
sunng87 authored Oct 28, 2024
1 parent 13687fc commit 2cc1388
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 54 deletions.
106 changes: 71 additions & 35 deletions src/parser/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use std::time::{Duration, SystemTime};
///
/// if empty listed labels, meaning no grouping
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "ser", derive(serde::Serialize))]
pub enum LabelModifier {
Include(Labels),
Exclude(Labels),
Expand Down Expand Up @@ -200,6 +199,34 @@ impl BinModifier {
}
}

#[cfg(feature = "ser")]
pub(crate) fn serialize_grouping<S>(
this: &Option<LabelModifier>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(2))?;
match this {
Some(LabelModifier::Include(l)) => {
map.serialize_entry("grouping", l)?;
map.serialize_entry("without", &false)?;
}
Some(LabelModifier::Exclude(l)) => {
map.serialize_entry("grouping", l)?;
map.serialize_entry("without", &true)?;
}
None => {
map.serialize_entry("grouping", &(vec![] as Vec<String>))?;
map.serialize_entry("without", &false)?;
}
}

map.end()
}

#[cfg(feature = "ser")]
pub(crate) fn serialize_bin_modifier<S>(
this: &Option<BinModifier>,
Expand Down Expand Up @@ -257,6 +284,44 @@ where
map.end()
}

#[cfg(feature = "ser")]
pub(crate) fn serialize_at_modifier<S>(
this: &Option<AtModifier>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(2))?;
match this {
Some(AtModifier::Start) => {
map.serialize_entry("startOrEnd", &Some("start"))?;
map.serialize_entry("timestamp", &None::<u128>)?;
}
Some(AtModifier::End) => {
map.serialize_entry("startOrEnd", &Some("end"))?;
map.serialize_entry("timestamp", &None::<u128>)?;
}
Some(AtModifier::At(time)) => {
map.serialize_entry("startOrEnd", &None::<&str>)?;
map.serialize_entry(
"timestamp",
&time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis(),
)?;
}
None => {
map.serialize_entry("startOrEnd", &None::<&str>)?;
map.serialize_entry("timestamp", &None::<u128>)?;
}
}

map.end()
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Offset {
Pos(Duration),
Expand Down Expand Up @@ -317,39 +382,6 @@ impl fmt::Display for AtModifier {
}
}

#[cfg(feature = "ser")]
impl serde::Serialize for AtModifier {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(2))?;
match self {
AtModifier::Start => {
map.serialize_entry("startOrEnd", &Some("start"))?;
map.serialize_entry("timestamp", &None::<u128>)?;
}
AtModifier::End => {
map.serialize_entry("startOrEnd", &Some("end"))?;
map.serialize_entry("timestamp", &None::<u128>)?;
}
AtModifier::At(time) => {
map.serialize_entry("startOrEnd", &None::<&str>)?;
map.serialize_entry(
"timestamp",
&time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis(),
)?;
}
}

map.end()
}
}

impl TryFrom<TokenId> for AtModifier {
type Error = String;

Expand Down Expand Up @@ -465,6 +497,8 @@ pub struct AggregateExpr {
/// Parameter used by some aggregators.
pub param: Option<Box<Expr>>,
/// modifier is optional for some aggregation operators, like sum.
#[cfg_attr(feature = "ser", serde(flatten))]
#[cfg_attr(feature = "ser", serde(serialize_with = "serialize_grouping"))]
pub modifier: Option<LabelModifier>,
}

Expand Down Expand Up @@ -658,6 +692,7 @@ pub struct SubqueryExpr {
#[cfg_attr(feature = "ser", serde(serialize_with = "Offset::serialize_offset"))]
pub offset: Option<Offset>,
#[cfg_attr(feature = "ser", serde(flatten))]
#[cfg_attr(feature = "ser", serde(serialize_with = "serialize_at_modifier"))]
pub at: Option<AtModifier>,
#[cfg_attr(
feature = "ser",
Expand Down Expand Up @@ -797,6 +832,7 @@ pub struct VectorSelector {
#[cfg_attr(feature = "ser", serde(serialize_with = "Offset::serialize_offset"))]
pub offset: Option<Offset>,
#[cfg_attr(feature = "ser", serde(flatten))]
#[cfg_attr(feature = "ser", serde(serialize_with = "serialize_at_modifier"))]
pub at: Option<AtModifier>,
}

Expand Down Expand Up @@ -1030,7 +1066,7 @@ impl Eq for Extension {}
#[cfg_attr(feature = "ser", serde(tag = "type", rename_all = "camelCase"))]
pub enum Expr {
/// Aggregate represents an aggregation operation on a Vector.
#[cfg_attr(feature = "ser", serde(rename = "aggregateExpr"))]
#[cfg_attr(feature = "ser", serde(rename = "aggregation"))]
Aggregate(AggregateExpr),

/// Unary represents a unary operation on another expression.
Expand Down
Loading

0 comments on commit 2cc1388

Please sign in to comment.