Skip to content

Commit

Permalink
code cleanups
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 18, 2024
1 parent 12f807a commit d387ca7
Show file tree
Hide file tree
Showing 8 changed files with 14 additions and 228 deletions.
3 changes: 0 additions & 3 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,6 @@ message UpdateNode {
repeated expr.ExprNode new_exprs = 4;

Check failure on line 178 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "4" with name "new_exprs" on message "UpdateNode" changed option "json_name" from "tableVersionId" to "newExprs".

Check failure on line 178 in proto/batch_plan.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "4" with name "new_exprs" on message "UpdateNode" changed cardinality from "optional with implicit presence" to "repeated".
bool returning = 5;

// // The columns indices in the input schema, representing the columns need to send to streamDML exeuctor.
// repeated uint32 update_column_indices = 5;

// Session id is used to ensure that dml data from the same session should be sent to a fixed worker node and channel.
uint32 session_id = 6;
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub use relation::{
pub use select::{BoundDistinct, BoundSelect};
pub use set_expr::*;
pub use statement::BoundStatement;
pub use update::{BoundUpdate, BoundUpdateV2, UpdateProject};
pub use update::{BoundUpdate, UpdateProject};
pub use values::BoundValues;

use crate::catalog::catalog_service::CatalogReadGuard;
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/binder/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_sqlparser::ast::Statement;

use super::delete::BoundDelete;
use super::fetch_cursor::BoundFetchCursor;
use super::update::BoundUpdateV2;
use super::update::BoundUpdate;
use crate::binder::create_view::BoundCreateView;
use crate::binder::{Binder, BoundInsert, BoundQuery};
use crate::error::Result;
Expand All @@ -28,7 +28,7 @@ use crate::expr::ExprRewriter;
pub enum BoundStatement {
Insert(Box<BoundInsert>),
Delete(Box<BoundDelete>),
Update(Box<BoundUpdateV2>),
Update(Box<BoundUpdate>),
Query(Box<BoundQuery>),
FetchCursor(Box<BoundFetchCursor>),
CreateView(Box<BoundCreateView>),
Expand Down Expand Up @@ -86,7 +86,7 @@ impl Binder {
selection,
returning,
} => Ok(BoundStatement::Update(
self.bind_update_v2(table_name, assignments, selection, returning)?
self.bind_update(table_name, assignments, selection, returning)?
.into(),
)),

Expand Down
211 changes: 6 additions & 205 deletions src/frontend/src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};

use fixedbitset::FixedBitSet;
Expand All @@ -26,68 +25,18 @@ use super::statement::RewriteExprsRecursive;
use super::{Binder, BoundBaseTable};
use crate::catalog::TableId;
use crate::error::{bail_bind_error, ErrorCode, Result, RwError};
use crate::expr::{Expr as _, ExprImpl, InputRef, SubqueryKind};
use crate::expr::{Expr as _, ExprImpl, SubqueryKind};
use crate::user::UserId;
use crate::TableCatalog;

#[derive(Debug, Clone)]
pub struct BoundUpdate {
/// Id of the table to perform updating.
pub table_id: TableId,

/// Version id of the table.
pub table_version_id: TableVersionId,

/// Name of the table to perform updating.
pub table_name: String,

/// Owner of the table to perform updating.
pub owner: UserId,

/// Used for scanning the records to update with the `selection`.
pub table: BoundBaseTable,

pub selection: Option<ExprImpl>,

/// Expression used to project to the updated row. The assigned columns will use the new
/// expression, and the other columns will be simply `InputRef`.
pub exprs: Vec<ExprImpl>,

// used for the 'RETURNING" keyword to indicate the returning items and schema
// if the list is empty and the schema is None, the output schema will be a INT64 as the
// affected row cnt
pub returning_list: Vec<ExprImpl>,

pub returning_schema: Option<Schema>,
}

impl RewriteExprsRecursive for BoundUpdate {
fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
self.selection =
std::mem::take(&mut self.selection).map(|expr| rewriter.rewrite_expr(expr));

let new_exprs = std::mem::take(&mut self.exprs)
.into_iter()
.map(|expr| rewriter.rewrite_expr(expr))
.collect::<Vec<_>>();
self.exprs = new_exprs;

let new_returning_list = std::mem::take(&mut self.returning_list)
.into_iter()
.map(|expr| rewriter.rewrite_expr(expr))
.collect::<Vec<_>>();
self.returning_list = new_returning_list;
}
}

#[derive(Debug, Clone, Copy)]
pub enum UpdateProject {
Expr(usize),
Composite(usize, usize),
}

#[derive(Debug, Clone)]
pub struct BoundUpdateV2 {
pub struct BoundUpdate {
/// Id of the table to perform updating.
pub table_id: TableId,

Expand Down Expand Up @@ -119,7 +68,7 @@ pub struct BoundUpdateV2 {
pub returning_schema: Option<Schema>,
}

impl RewriteExprsRecursive for BoundUpdateV2 {
impl RewriteExprsRecursive for BoundUpdate {
fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) {
self.selection =
std::mem::take(&mut self.selection).map(|expr| rewriter.rewrite_expr(expr));
Expand Down Expand Up @@ -156,161 +105,13 @@ fn get_col_referenced_by_generated_pk(table_catalog: &TableCatalog) -> Result<Fi
}

impl Binder {
// pub(super) fn bind_update(
// &mut self,
// name: ObjectName,
// assignments: Vec<Assignment>,
// selection: Option<Expr>,
// returning_items: Vec<SelectItem>,
// ) -> Result<BoundUpdate> {
// let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?;

// let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, false)?;
// let default_columns_from_catalog =
// table_catalog.default_columns().collect::<BTreeMap<_, _>>();
// if !returning_items.is_empty() && table_catalog.has_generated_column() {
// return Err(RwError::from(ErrorCode::BindError(
// "`RETURNING` clause is not supported for tables with generated columns".to_string(),
// )));
// }

// let table_id = table_catalog.id;
// let owner = table_catalog.owner;
// let table_version_id = table_catalog.version_id().expect("table must be versioned");
// let cols_refed_by_generated_pk = get_col_referenced_by_generated_pk(table_catalog)?;

// let table = self.bind_table(schema_name.as_deref(), &table_name, None)?;

// let selection = selection.map(|expr| self.bind_expr(expr)).transpose()?;

// let mut assignment_exprs = HashMap::new();
// for Assignment { id, value } in assignments {
// // FIXME: Parsing of `id` is not strict. It will even treat `a.b` as `(a, b)`.
// let assignments = match (id.as_slice(), value) {
// // _ = (subquery)
// (_ids, AssignmentValue::Expr(Expr::Subquery(_))) => {
// return Err(ErrorCode::BindError(
// "subquery on the right side of assignment is unsupported".to_owned(),
// )
// .into())
// }
// // col = expr
// ([id], value) => {
// vec![(id.clone(), value)]
// }
// // (col1, col2) = (expr1, expr2)
// // TODO: support `DEFAULT` in multiple assignments
// (ids, AssignmentValue::Expr(Expr::Row(values))) if ids.len() == values.len() => id
// .into_iter()
// .zip_eq_fast(values.into_iter().map(AssignmentValue::Expr))
// .collect(),
// // (col1, col2) = <other expr>
// _ => {
// return Err(ErrorCode::BindError(
// "number of columns does not match number of values".to_owned(),
// )
// .into())
// }
// };

// for (id, value) in assignments {
// let id_expr = self.bind_expr(Expr::Identifier(id.clone()))?;
// let id_index = if let Some(id_input_ref) = id_expr.clone().as_input_ref() {
// let id_index = id_input_ref.index;
// if table
// .table_catalog
// .pk()
// .iter()
// .any(|k| k.column_index == id_index)
// {
// return Err(ErrorCode::BindError(
// "update modifying the PK column is unsupported".to_owned(),
// )
// .into());
// }
// if table
// .table_catalog
// .generated_col_idxes()
// .contains(&id_index)
// {
// return Err(ErrorCode::BindError(
// "update modifying the generated column is unsupported".to_owned(),
// )
// .into());
// }
// if cols_refed_by_generated_pk.contains(id_index) {
// return Err(ErrorCode::BindError(
// "update modifying the column referenced by generated columns that are part of the primary key is not allowed".to_owned(),
// )
// .into());
// }
// id_index
// } else {
// unreachable!()
// };

// let value_expr = match value {
// AssignmentValue::Expr(expr) => {
// self.bind_expr(expr)?.cast_assign(id_expr.return_type())?
// }
// AssignmentValue::Default => default_columns_from_catalog
// .get(&id_index)
// .cloned()
// .unwrap_or_else(|| ExprImpl::literal_null(id_expr.return_type())),
// };

// match assignment_exprs.entry(id_expr) {
// Entry::Occupied(_) => {
// return Err(ErrorCode::BindError(
// "multiple assignments to same column".to_owned(),
// )
// .into())
// }
// Entry::Vacant(v) => {
// v.insert(value_expr);
// }
// }
// }
// }

// let exprs = table
// .table_catalog
// .columns()
// .iter()
// .enumerate()
// .filter_map(|(i, c)| {
// (!c.is_generated()).then_some(InputRef::new(i, c.data_type().clone()).into())
// })
// .map(|c| assignment_exprs.remove(&c).unwrap_or(c))
// .collect_vec();

// let (returning_list, fields) = self.bind_returning_list(returning_items)?;
// let returning = !returning_list.is_empty();

// Ok(BoundUpdate {
// table_id,
// table_version_id,
// table_name,
// owner,
// table,
// selection,
// exprs,
// returning_list,
// returning_schema: if returning {
// Some(Schema { fields })
// } else {
// None
// },
// })
// }

pub(super) fn bind_update_v2(
pub(super) fn bind_update(
&mut self,
name: ObjectName,
assignments: Vec<Assignment>,
selection: Option<Expr>,
returning_items: Vec<SelectItem>,
) -> Result<BoundUpdateV2> {
) -> Result<BoundUpdate> {
let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?;

let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, false)?;
Expand Down Expand Up @@ -441,7 +242,7 @@ impl Binder {
let (returning_list, fields) = self.bind_returning_list(returning_items)?;
let returning = !returning_list.is_empty();

Ok(BoundUpdateV2 {
Ok(BoundUpdate {
table_id,
table_version_id,
table_name,
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::UpdateNode;
Expand Down
5 changes: 1 addition & 4 deletions src/frontend/src/optimizer/plan_node/logical_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::TableVersionId;

use super::generic::GenericPlanRef;
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, Logical,
LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream,
};
use crate::catalog::TableId;
use crate::error::Result;
use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor};
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/planner/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl Planner {
match stmt {
BoundStatement::Insert(i) => self.plan_insert(*i),
BoundStatement::Delete(d) => self.plan_delete(*d),
BoundStatement::Update(u) => self.plan_update_v2(*u),
BoundStatement::Update(u) => self.plan_update(*u),
BoundStatement::Query(q) => self.plan_query(*q),
BoundStatement::FetchCursor(_) => unimplemented!(),
BoundStatement::CreateView(c) => self.plan_query(*c.query),
Expand Down
12 changes: 2 additions & 10 deletions src/frontend/src/planner/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use risingwave_common::types::{DataType, Scalar};
use risingwave_pb::expr::expr_node::Type;

use super::Planner;
use crate::binder::{BoundUpdateV2, UpdateProject};
use crate::binder::{BoundUpdate, UpdateProject};
use crate::error::Result;
use crate::expr::{ExprImpl, FunctionCall, InputRef, Literal};
use crate::optimizer::plan_node::generic::GenericPlanRef;
Expand All @@ -26,7 +26,7 @@ use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{PlanRef, PlanRoot};

impl Planner {
pub(super) fn plan_update_v2(&mut self, update: BoundUpdateV2) -> Result<PlanRoot> {
pub(super) fn plan_update(&mut self, update: BoundUpdate) -> Result<PlanRoot> {
let scan = self.plan_base_table(&update.table)?;
let input = if let Some(expr) = update.selection {
self.plan_where(scan, expr)?
Expand All @@ -35,14 +35,6 @@ impl Planner {
};

let returning = !update.returning_list.is_empty();
// let update_column_indices = update
// .table
// .table_catalog
// .columns()
// .iter()
// .enumerate()
// .filter_map(|(i, c)| (!c.is_generated()).then_some(i))
// .collect_vec();

let schema_len = input.schema().len();

Expand Down

0 comments on commit d387ca7

Please sign in to comment.