Skip to content

Commit

Permalink
switch the 'bit' type to be a 'uint64' in Materialize
Browse files Browse the repository at this point in the history
  • Loading branch information
ParkMyCar committed Jan 17, 2025
1 parent 8f0ea63 commit e6b3ef2
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 41 deletions.
31 changes: 30 additions & 1 deletion src/mysql-util/src/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,35 @@ fn pack_val_as_datum(
ScalarType::Int16 => packer.push(Datum::from(from_value_opt::<i16>(value)?)),
ScalarType::UInt32 => packer.push(Datum::from(from_value_opt::<u32>(value)?)),
ScalarType::Int32 => packer.push(Datum::from(from_value_opt::<i32>(value)?)),
ScalarType::UInt64 => packer.push(Datum::from(from_value_opt::<u64>(value)?)),
ScalarType::UInt64 => {
if let Some(MySqlColumnMeta::Bit(precision)) = &col_desc.meta {
let mut value = from_value_opt::<Vec<u8>>(value)?;

// Ensure we have the correct number of bytes.
let precision_bytes = (precision + 7) / 8;
if value.len() != usize::cast_from(precision_bytes) {
return Err(anyhow::anyhow!("'bit' column out of range!"));
}
// Be defensive and prune any bits that come over the wire and are
// greater than our precision.
let bit_index = precision % 8;
if bit_index != 0 {
let mask = !(u8::MAX << bit_index);
if value.len() > 0 {
value[0] &= mask;
}
}

// Based on experimentation the value coming across the wire is
// encoded in big-endian.
let mut buf = [0u8; 8];
buf[(8 - value.len())..].copy_from_slice(value.as_slice());
let value = u64::from_be_bytes(buf);
packer.push(Datum::from(value))
} else {
packer.push(Datum::from(from_value_opt::<u64>(value)?))
}
}
ScalarType::Int64 => packer.push(Datum::from(from_value_opt::<i64>(value)?)),
ScalarType::Float32 => packer.push(Datum::from(from_value_opt::<f32>(value)?)),
ScalarType::Float64 => packer.push(Datum::from(from_value_opt::<f64>(value)?)),
Expand Down Expand Up @@ -198,6 +226,7 @@ fn pack_val_as_datum(
))?;
}
}
Some(MySqlColumnMeta::Bit(_)) => unreachable!("parsed as a u64"),
None => {
packer.push(Datum::String(&from_value_opt::<String>(value)?));
}
Expand Down
5 changes: 5 additions & 0 deletions src/mysql-util/src/desc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ message ProtoMySqlColumnMetaTimestamp {
uint32 precision = 1;
}

message ProtoMySqlColumnMetaBit {
uint32 precision = 1;
}

message ProtoMySqlColumnDesc {
string name = 1;
optional mz_repr.relation_and_scalar.ProtoColumnType column_type = 2;
Expand All @@ -44,6 +48,7 @@ message ProtoMySqlColumnDesc {
ProtoMySqlColumnMetaYear year = 5;
ProtoMySqlColumnMetaDate date = 6;
ProtoMySqlColumnMetaTimestamp timestamp = 7;
ProtoMySqlColumnMetaBit bit = 8;
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/mysql-util/src/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ pub enum MySqlColumnMeta {
Date,
/// The described column is a timestamp value with a set precision.
Timestamp(u32),
/// The described column is a `bit` column, with the given possibly precision.
Bit(u32),
}

impl IsCompatible for Option<MySqlColumnMeta> {
Expand All @@ -195,6 +197,9 @@ impl IsCompatible for Option<MySqlColumnMeta> {
Some(MySqlColumnMeta::Timestamp(precision)),
Some(MySqlColumnMeta::Timestamp(other_precision)),
) => precision <= other_precision,
// We always cast bit columns to u64's and the max precision of a bit column
// is 64 bits, so any bit column is always compatible with another.
(Some(MySqlColumnMeta::Bit(_)), Some(MySqlColumnMeta::Bit(_))) => true,
_ => false,
}
}
Expand Down Expand Up @@ -226,6 +231,9 @@ impl RustType<ProtoMySqlColumnDesc> for MySqlColumnDesc {
precision: *precision,
}))
}
MySqlColumnMeta::Bit(precision) => Some(Meta::Bit(ProtoMySqlColumnMetaBit {
precision: *precision,
})),
}),
}
}
Expand All @@ -245,6 +253,7 @@ impl RustType<ProtoMySqlColumnDesc> for MySqlColumnDesc {
Meta::Year(_) => Some(Ok(MySqlColumnMeta::Year)),
Meta::Date(_) => Some(Ok(MySqlColumnMeta::Date)),
Meta::Timestamp(e) => Some(Ok(MySqlColumnMeta::Timestamp(e.precision))),
Meta::Bit(e) => Some(Ok(MySqlColumnMeta::Bit(e.precision))),
})
.transpose()?,
})
Expand Down
71 changes: 42 additions & 29 deletions src/mysql-util/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,13 @@ impl MySqlTableSchema {
// Collect the parsed data types or errors for later reporting.
match parse_data_type(&info, &self.schema_name, &self.name) {
Err(err) => error_cols.push(err),
Ok(scalar_type) => columns.push(MySqlColumnDesc {
Ok((scalar_type, meta)) => columns.push(MySqlColumnDesc {
name: info.column_name,
column_type: Some(ColumnType {
scalar_type,
nullable: &info.is_nullable == "YES",
}),
meta: None,
meta,
}),
}
}
Expand Down Expand Up @@ -346,35 +346,35 @@ fn parse_data_type(
info: &InfoSchema,
schema_name: &str,
table_name: &str,
) -> Result<ScalarType, UnsupportedDataType> {
) -> Result<(ScalarType, Option<MySqlColumnMeta>), UnsupportedDataType> {
let unsigned = info.column_type.contains("unsigned");

match info.data_type.as_str() {
let scalar_type = match info.data_type.as_str() {
"tinyint" | "smallint" => {
if unsigned {
Ok(ScalarType::UInt16)
ScalarType::UInt16
} else {
Ok(ScalarType::Int16)
ScalarType::Int16
}
}
"mediumint" | "int" => {
if unsigned {
Ok(ScalarType::UInt32)
ScalarType::UInt32
} else {
Ok(ScalarType::Int32)
ScalarType::Int32
}
}
"bigint" => {
if unsigned {
Ok(ScalarType::UInt64)
ScalarType::UInt64
} else {
Ok(ScalarType::Int64)
ScalarType::Int64
}
}
"float" => Ok(ScalarType::Float32),
"double" => Ok(ScalarType::Float64),
"date" => Ok(ScalarType::Date),
"datetime" | "timestamp" => Ok(ScalarType::Timestamp {
"float" => ScalarType::Float32,
"double" => ScalarType::Float64,
"date" => ScalarType::Date,
"datetime" | "timestamp" => ScalarType::Timestamp {
// both mysql and our scalar type use a max six-digit fractional-second precision
// this is bounds-checked in the TryFrom impl
precision: info
Expand All @@ -387,8 +387,8 @@ fn parse_data_type(
column_name: info.column_name.clone(),
intended_type: None,
})?,
}),
"time" => Ok(ScalarType::Time),
},
"time" => ScalarType::Time,
"decimal" | "numeric" => {
// validate the precision is within the bounds of our numeric type
// here since we don't use this precision on the ScalarType itself
Expand All @@ -401,7 +401,7 @@ fn parse_data_type(
intended_type: None,
})?
}
Ok(ScalarType::Numeric {
ScalarType::Numeric {
max_scale: info
.numeric_scale
.map(NumericMaxScale::try_from)
Expand All @@ -412,9 +412,9 @@ fn parse_data_type(
column_name: info.column_name.clone(),
intended_type: None,
})?,
})
}
}
"char" => Ok(ScalarType::Char {
"char" => ScalarType::Char {
length: info
.character_maximum_length
.and_then(|f| Some(CharLength::try_from(f)))
Expand All @@ -425,8 +425,8 @@ fn parse_data_type(
column_name: info.column_name.clone(),
intended_type: None,
})?,
}),
"varchar" => Ok(ScalarType::VarChar {
},
"varchar" => ScalarType::VarChar {
max_length: info
.character_maximum_length
.and_then(|f| Some(VarCharMaxLength::try_from(f)))
Expand All @@ -437,24 +437,37 @@ fn parse_data_type(
column_name: info.column_name.clone(),
intended_type: None,
})?,
}),
"text" | "tinytext" | "mediumtext" | "longtext" => Ok(ScalarType::String),
},
"text" | "tinytext" | "mediumtext" | "longtext" => ScalarType::String,
"binary" | "varbinary" | "tinyblob" | "blob" | "mediumblob" | "longblob" => {
Ok(ScalarType::Bytes)
ScalarType::Bytes
}
"json" => Ok(ScalarType::Jsonb),
"json" => ScalarType::Jsonb,
// TODO(mysql): Support the `bit` type natively in Materialize.
"bit" => Ok(ScalarType::Bytes),
"bit" => {
let precision = match info.numeric_precision {
Some(x @ 0..=64) => u32::try_from(x).expect("known good value"),
prec => {
mz_ore::soft_panic_or_log!(
"found invalid bit precision, {prec:?}, falling back"
);
64u32
}
};
return Ok((ScalarType::UInt64, Some(MySqlColumnMeta::Bit(precision))));
}
typ => {
tracing::warn!(?typ, "found unsupported data type");
Err(UnsupportedDataType {
return Err(UnsupportedDataType {
column_type: info.column_type.clone(),
qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
column_name: info.column_name.clone(),
intended_type: None,
})
});
}
}
};

Ok((scalar_type, None))
}

/// Parse the specified column as a TEXT COLUMN. We only support the set of types that are
Expand Down
45 changes: 34 additions & 11 deletions test/mysql-cdc/types-bit.td
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,53 @@ CREATE DATABASE public;
USE public;

# Insert data pre-snapshot
CREATE TABLE t1 (f1 BIT(4), f2 BIT(1));
CREATE TABLE t1 (f1 BIT(11), f2 BIT(1));
INSERT INTO t1 VALUES (8, 0);
INSERT INTO t1 VALUES (b'0100', b'1');
INSERT INTO t1 VALUES (13, 1);
INSERT INTO t1 VALUES (b'11100000100', b'1');
INSERT INTO t1 VALUES (b'0000', b'0');
INSERT INTO t1 VALUES (b'11111111111', b'0');

CREATE TABLE t2 (f1 BIT(64));
INSERT INTO t2 VALUES (0);
INSERT INTO t2 VALUES (1);
INSERT INTO t2 VALUES (b'11111111');
INSERT INTO t2 VALUES (b'1111111111111111111111111111111111111111111111111111111111111111');

> CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn;

> CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public.t1);

> CREATE TABLE t2 FROM SOURCE mz_source (REFERENCE public.t2);

> SELECT COUNT(*) > 0 FROM t1;
true

> SELECT COUNT(*) > 0 FROM t2;
true

# Insert the same data post-snapshot
$ mysql-execute name=mysql
INSERT INTO t1 SELECT * FROM t1;

# MySQL does not have a proper boolean type
> SELECT pg_typeof(f1), pg_typeof(f2) FROM t1 LIMIT 1;
bytea bytea

> SELECT * FROM t1;
"\\x00" "\\x00"
"\\x00" "\\x00"
"\\x04" "\\x01"
"\\x04" "\\x01"
"\\x08" "\\x00"
"\\x08" "\\x00"
uint8 uint8

> SELECT * FROM t1 ORDER BY f1 DESC;
0 0
0 0
8 0
8 0
13 1
13 1
1796 1
1796 1
2047 0
2047 0

> SELECT * FROM t2 ORDER BY f1 DESC;
0
1
255
18446744073709551615

0 comments on commit e6b3ef2

Please sign in to comment.