Skip to content

Commit

Permalink
refactor: rename ConnectorSchema/SourceSchema to FormatEncode (#19174)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Oct 30, 2024
1 parent 98baacd commit b6051d8
Show file tree
Hide file tree
Showing 18 changed files with 296 additions and 292 deletions.
6 changes: 3 additions & 3 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ impl TestCase {
columns,
constraints,
if_not_exists,
source_schema,
format_encode,
source_watermarks,
append_only,
on_conflict,
Expand All @@ -437,7 +437,7 @@ impl TestCase {
wildcard_idx,
..
} => {
let source_schema = source_schema.map(|schema| schema.into_v2_with_warning());
let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());

create_table::handle_create_table(
handler_args,
Expand All @@ -446,7 +446,7 @@ impl TestCase {
wildcard_idx,
constraints,
if_not_exists,
source_schema,
format_encode,
source_watermarks,
append_only,
on_conflict,
Expand Down
47 changes: 22 additions & 25 deletions src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::{EncodeType, FormatType};
use risingwave_sqlparser::ast::{
CompatibleSourceSchema, ConnectorSchema, CreateSourceStatement, Encode, Format, ObjectName,
CompatibleFormatEncode, CreateSourceStatement, Encode, Format, FormatEncodeOptions, ObjectName,
SqlOption, Statement,
};
use risingwave_sqlparser::parser::Parser;
Expand Down Expand Up @@ -120,7 +120,7 @@ pub fn fetch_source_catalog_with_db_schema_id(
/// and if the FORMAT and ENCODE are modified.
pub fn check_format_encode(
original_source: &SourceCatalog,
new_connector_schema: &ConnectorSchema,
new_format_encode: &FormatEncodeOptions,
) -> Result<()> {
let StreamSourceInfo {
format, row_encode, ..
Expand All @@ -137,9 +137,7 @@ pub fn check_format_encode(
.into());
};

if new_connector_schema.format != old_format
|| new_connector_schema.row_encode != old_row_encode
{
if new_format_encode.format != old_format || new_format_encode.row_encode != old_row_encode {
bail_not_implemented!(
"the original definition is FORMAT {:?} ENCODE {:?}, and altering them is not supported yet",
&old_format,
Expand All @@ -153,19 +151,18 @@ pub fn check_format_encode(
/// Refresh the source registry and get the added/dropped columns.
pub async fn refresh_sr_and_get_columns_diff(
original_source: &SourceCatalog,
connector_schema: &ConnectorSchema,
format_encode: &FormatEncodeOptions,
session: &Arc<SessionImpl>,
) -> Result<(StreamSourceInfo, Vec<ColumnCatalog>, Vec<ColumnCatalog>)> {
let mut with_properties = original_source.with_properties.clone();
validate_compatibility(connector_schema, &mut with_properties)?;
validate_compatibility(format_encode, &mut with_properties)?;

if with_properties.is_cdc_connector() {
bail_not_implemented!("altering a cdc source is not supported");
}

let (Some(columns_from_resolve_source), source_info) =
bind_columns_from_source(session, connector_schema, Either::Right(&with_properties))
.await?
bind_columns_from_source(session, format_encode, Either::Right(&with_properties)).await?
else {
// Source without schema registry is rejected.
unreachable!("source without schema registry is rejected")
Expand All @@ -189,33 +186,33 @@ pub async fn refresh_sr_and_get_columns_diff(
Ok((source_info, added_columns, dropped_columns))
}

fn get_connector_schema_from_source(source: &SourceCatalog) -> Result<ConnectorSchema> {
fn get_format_encode_from_source(source: &SourceCatalog) -> Result<FormatEncodeOptions> {
let [stmt]: [_; 1] = Parser::parse_sql(&source.definition)
.context("unable to parse original source definition")?
.try_into()
.unwrap();
let Statement::CreateSource {
stmt: CreateSourceStatement { source_schema, .. },
stmt: CreateSourceStatement { format_encode, .. },
} = stmt
else {
unreachable!()
};
Ok(source_schema.into_v2_with_warning())
Ok(format_encode.into_v2_with_warning())
}

pub async fn handler_refresh_schema(
handler_args: HandlerArgs,
name: ObjectName,
) -> Result<RwPgResponse> {
let (source, _, _) = fetch_source_catalog_with_db_schema_id(&handler_args.session, &name)?;
let connector_schema = get_connector_schema_from_source(&source)?;
handle_alter_source_with_sr(handler_args, name, connector_schema).await
let format_encode = get_format_encode_from_source(&source)?;
handle_alter_source_with_sr(handler_args, name, format_encode).await
}

pub async fn handle_alter_source_with_sr(
handler_args: HandlerArgs,
name: ObjectName,
connector_schema: ConnectorSchema,
format_encode: FormatEncodeOptions,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let (source, database_id, schema_id) = fetch_source_catalog_with_db_schema_id(&session, &name)?;
Expand All @@ -232,9 +229,9 @@ pub async fn handle_alter_source_with_sr(
bail_not_implemented!(issue = 16003, "alter shared source");
}

check_format_encode(&source, &connector_schema)?;
check_format_encode(&source, &format_encode)?;

if !schema_has_schema_registry(&connector_schema) {
if !schema_has_schema_registry(&format_encode) {
return Err(ErrorCode::NotSupported(
"altering a source without schema registry".to_string(),
"try `ALTER SOURCE .. ADD COLUMN ...` instead".to_string(),
Expand All @@ -243,7 +240,7 @@ pub async fn handle_alter_source_with_sr(
}

let (source_info, added_columns, dropped_columns) =
refresh_sr_and_get_columns_diff(&source, &connector_schema, &session).await?;
refresh_sr_and_get_columns_diff(&source, &format_encode, &session).await?;

if !dropped_columns.is_empty() {
bail_not_implemented!(
Expand All @@ -258,10 +255,10 @@ pub async fn handle_alter_source_with_sr(
source.info = source_info;
source.columns.extend(added_columns);
source.definition =
alter_definition_format_encode(&source.definition, connector_schema.row_options.clone())?;
alter_definition_format_encode(&source.definition, format_encode.row_options.clone())?;

let (format_encode_options, format_encode_secret_ref) = resolve_secret_ref_in_with_options(
WithOptions::try_from(connector_schema.row_options())?,
WithOptions::try_from(format_encode.row_options())?,
session.as_ref(),
)?
.into_parts();
Expand Down Expand Up @@ -299,19 +296,19 @@ pub fn alter_definition_format_encode(

match &mut stmt {
Statement::CreateSource {
stmt: CreateSourceStatement { source_schema, .. },
stmt: CreateSourceStatement { format_encode, .. },
}
| Statement::CreateTable {
source_schema: Some(source_schema),
format_encode: Some(format_encode),
..
} => {
match source_schema {
CompatibleSourceSchema::V2(schema) => {
match format_encode {
CompatibleFormatEncode::V2(schema) => {
schema.row_options = format_encode_options;
}
// TODO: Confirm the behavior of legacy source schema.
// Legacy source schema should be rejected by the handler and never reaches here.
CompatibleSourceSchema::RowFormat(_schema) => unreachable!(),
CompatibleFormatEncode::RowFormat(_schema) => unreachable!(),
}
}
_ => unreachable!(),
Expand Down
30 changes: 15 additions & 15 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Encode,
ObjectName, Statement, StructField,
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Encode,
FormatEncodeOptions, ObjectName, Statement, StructField,
};
use risingwave_sqlparser::parser::Parser;

Expand All @@ -51,14 +51,14 @@ pub async fn replace_table_with_definition(
table_name: ObjectName,
definition: Statement,
original_catalog: &Arc<TableCatalog>,
source_schema: Option<ConnectorSchema>,
format_encode: Option<FormatEncodeOptions>,
) -> Result<()> {
let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
session,
table_name,
definition,
original_catalog,
source_schema,
format_encode,
None,
)
.await?;
Expand Down Expand Up @@ -86,15 +86,15 @@ pub async fn get_new_table_definition_for_cdc_table(
.unwrap();
let Statement::CreateTable {
columns: original_columns,
source_schema,
format_encode,
..
} = &mut definition
else {
panic!("unexpected statement: {:?}", definition);
};

assert!(
source_schema.is_none(),
format_encode.is_none(),
"source schema should be None for CDC table"
);

Expand Down Expand Up @@ -165,7 +165,7 @@ pub async fn get_replace_table_plan(
table_name: ObjectName,
definition: Statement,
original_catalog: &Arc<TableCatalog>,
source_schema: Option<ConnectorSchema>,
format_encode: Option<FormatEncodeOptions>,
new_version_columns: Option<Vec<ColumnCatalog>>, // only provided in auto schema change
) -> Result<(
Option<Source>,
Expand Down Expand Up @@ -196,7 +196,7 @@ pub async fn get_replace_table_plan(
session,
table_name,
original_catalog,
source_schema,
format_encode,
handler_args.clone(),
col_id_gen,
columns.clone(),
Expand Down Expand Up @@ -326,19 +326,19 @@ pub async fn handle_alter_table_column(
.unwrap();
let Statement::CreateTable {
columns,
source_schema,
format_encode,
..
} = &mut definition
else {
panic!("unexpected statement: {:?}", definition);
};
let source_schema = source_schema
let format_encode = format_encode
.clone()
.map(|source_schema| source_schema.into_v2_with_warning());
.map(|format_encode| format_encode.into_v2_with_warning());

let fail_if_has_schema_registry = || {
if let Some(source_schema) = &source_schema
&& schema_has_schema_registry(source_schema)
if let Some(format_encode) = &format_encode
&& schema_has_schema_registry(format_encode)
{
Err(ErrorCode::NotSupported(
"alter table with schema registry".to_string(),
Expand Down Expand Up @@ -460,14 +460,14 @@ pub async fn handle_alter_table_column(
table_name,
definition,
&original_catalog,
source_schema,
format_encode,
)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

pub fn schema_has_schema_registry(schema: &ConnectorSchema) -> bool {
pub fn schema_has_schema_registry(schema: &FormatEncodeOptions) -> bool {
match schema.row_encode {
Encode::Avro | Encode::Protobuf => true,
Encode::Json => {
Expand Down
20 changes: 10 additions & 10 deletions src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use anyhow::{anyhow, Context};
use fancy_regex::Regex;
use pgwire::pg_response::StatementType;
use risingwave_common::bail_not_implemented;
use risingwave_sqlparser::ast::{ConnectorSchema, ObjectName, Statement};
use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport;

Expand All @@ -29,15 +29,15 @@ use super::{HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result};
use crate::TableCatalog;

fn get_connector_schema_from_table(table: &TableCatalog) -> Result<Option<ConnectorSchema>> {
fn get_format_encode_from_table(table: &TableCatalog) -> Result<Option<FormatEncodeOptions>> {
let [stmt]: [_; 1] = Parser::parse_sql(&table.definition)
.context("unable to parse original table definition")?
.try_into()
.unwrap();
let Statement::CreateTable { source_schema, .. } = stmt else {
let Statement::CreateTable { format_encode, .. } = stmt else {
unreachable!()
};
Ok(source_schema.map(|schema| schema.into_v2_with_warning()))
Ok(format_encode.map(|schema| schema.into_v2_with_warning()))
}

pub async fn handle_refresh_schema(
Expand All @@ -51,9 +51,9 @@ pub async fn handle_refresh_schema(
bail_not_implemented!("alter table with incoming sinks");
}

let connector_schema = {
let connector_schema = get_connector_schema_from_table(&original_table)?;
if !connector_schema
let format_encode = {
let format_encode = get_format_encode_from_table(&original_table)?;
if !format_encode
.as_ref()
.is_some_and(schema_has_schema_registry)
{
Expand All @@ -63,12 +63,12 @@ pub async fn handle_refresh_schema(
)
.into());
}
connector_schema.unwrap()
format_encode.unwrap()
};

let definition = alter_definition_format_encode(
&original_table.definition,
connector_schema.row_options.clone(),
format_encode.row_options.clone(),
)?;

let [definition]: [_; 1] = Parser::parse_sql(&definition)
Expand All @@ -81,7 +81,7 @@ pub async fn handle_refresh_schema(
table_name,
definition,
&original_table,
Some(connector_schema),
Some(format_encode),
)
.await;

Expand Down
Loading

0 comments on commit b6051d8

Please sign in to comment.